001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.ByteArrayInputStream;
020import java.io.DataOutputStream;
021import java.io.EOFException;
022import java.io.IOException;
023import java.net.Socket;
024import java.net.URI;
025import java.net.UnknownHostException;
026import java.nio.ByteBuffer;
027import java.nio.channels.SelectionKey;
028import java.nio.channels.SocketChannel;
029import java.util.HashMap;
030
031import javax.net.SocketFactory;
032
033import org.apache.activemq.command.Command;
034import org.apache.activemq.transport.Transport;
035import org.apache.activemq.transport.nio.NIOOutputStream;
036import org.apache.activemq.transport.nio.SelectorManager;
037import org.apache.activemq.transport.nio.SelectorSelection;
038import org.apache.activemq.transport.tcp.TcpTransport;
039import org.apache.activemq.util.ByteArrayOutputStream;
040import org.apache.activemq.util.ByteSequence;
041import org.apache.activemq.util.DataByteArrayInputStream;
042import org.apache.activemq.util.IOExceptionSupport;
043import org.apache.activemq.util.ServiceStopper;
044import org.apache.activemq.wireformat.WireFormat;
045
046/**
047 * An implementation of the {@link Transport} interface for using Stomp over NIO
048 *
049 * 
050 */
051public class StompNIOTransport extends TcpTransport {
052
053    private SocketChannel channel;
054    private SelectorSelection selection;
055
056    private ByteBuffer inputBuffer;
057    ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
058    boolean processedHeaders = false;
059    String action;
060    HashMap<String, String> headers;
061    int contentLength = -1;
062    int readLength = 0;
063    int previousByte = -1;
064
065    public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
066        super(wireFormat, socketFactory, remoteLocation, localLocation);
067    }
068
069    public StompNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
070        super(wireFormat, socket);
071    }
072
073    protected void initializeStreams() throws IOException {
074        channel = socket.getChannel();
075        channel.configureBlocking(false);
076
077        // listen for events telling us when the socket is readable.
078        selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
079            public void onSelect(SelectorSelection selection) {
080                serviceRead();
081            }
082
083            public void onError(SelectorSelection selection, Throwable error) {
084                if (error instanceof IOException) {
085                    onException((IOException)error);
086                } else {
087                    onException(IOExceptionSupport.create(error));
088                }
089            }
090        });
091
092        inputBuffer = ByteBuffer.allocate(8 * 1024);
093        NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
094        this.dataOut = new DataOutputStream(outPutStream);
095        this.buffOut = outPutStream;
096    }
097
098    private void serviceRead() {
099        try {
100
101           while (true) {
102               // read channel
103               int readSize = channel.read(inputBuffer);
104               // channel is closed, cleanup
105               if (readSize == -1) {
106                   onException(new EOFException());
107                   selection.close();
108                   break;
109               }
110               // nothing more to read, break
111               if (readSize == 0) {
112                   break;
113               }
114
115               inputBuffer.flip();
116
117               int b;
118               ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
119
120               int i = 0;
121               while(i++ < readSize) {
122                   b = input.read();
123                   // skip repeating nulls
124                   if (!processedHeaders && previousByte == 0 && b == 0) {
125                       continue;
126                   }
127
128                   if (!processedHeaders) {
129                       currentCommand.write(b);
130                       // end of headers section, parse action and header
131                       if (previousByte == '\n' && b == '\n') {
132                           if (wireFormat instanceof StompWireFormat) {
133                               DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
134                               action = ((StompWireFormat)wireFormat).parseAction(data);
135                               headers = ((StompWireFormat)wireFormat).parseHeaders(data);
136                               String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
137                               if (contentLengthHeader != null) {
138                                   contentLength = ((StompWireFormat)wireFormat).parseContentLength(contentLengthHeader);
139                               } else {
140                                   contentLength = -1;
141                               }
142                           }
143                           processedHeaders = true;
144                           currentCommand.reset();
145                       }
146                   } else {
147
148                       if (contentLength == -1) {
149                           // end of command reached, unmarshal
150                           if (b == 0) {
151                               processCommand();
152                           } else {
153                               currentCommand.write(b);
154                           }
155                       } else {
156                           // read desired content length
157                           if (readLength++ == contentLength) {
158                               processCommand();
159                               readLength = 0;
160                           } else {
161                               currentCommand.write(b);
162                           }
163                       }
164                   }
165
166                   previousByte = b;
167               }
168               // clear the buffer
169               inputBuffer.clear();
170
171           }
172        } catch (IOException e) {
173            onException(e);
174        } catch (Throwable e) {
175            onException(IOExceptionSupport.create(e));
176        }
177    }
178
179    private void processCommand() throws Exception {
180        StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
181        doConsume(frame);
182        processedHeaders = false;
183        currentCommand.reset();
184        contentLength = -1;
185    }
186
187    protected void doStart() throws Exception {
188        connect();
189        selection.setInterestOps(SelectionKey.OP_READ);
190        selection.enable();
191    }
192
193    protected void doStop(ServiceStopper stopper) throws Exception {
194        try {
195            selection.close();
196        } catch (Exception e) {
197                e.printStackTrace();
198        }
199        super.doStop(stopper);
200    }
201}