001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.nio;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.EOFException;
022import java.io.IOException;
023import java.net.Socket;
024import java.net.URI;
025import java.net.UnknownHostException;
026import java.nio.ByteBuffer;
027import java.nio.channels.SelectionKey;
028import java.nio.channels.SocketChannel;
029
030import javax.net.SocketFactory;
031
032import org.apache.activemq.command.Command;
033import org.apache.activemq.transport.Transport;
034import org.apache.activemq.transport.tcp.TcpTransport;
035import org.apache.activemq.util.IOExceptionSupport;
036import org.apache.activemq.util.ServiceStopper;
037import org.apache.activemq.wireformat.WireFormat;
038
039/**
040 * An implementation of the {@link Transport} interface using raw tcp/ip
041 * 
042 * 
043 */
044public class NIOTransport extends TcpTransport {
045
046    // private static final Logger log = LoggerFactory.getLogger(NIOTransport.class);
047    private SocketChannel channel;
048    private SelectorSelection selection;
049    private ByteBuffer inputBuffer;
050    private ByteBuffer currentBuffer;
051    private int nextFrameSize;
052
053    public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
054        super(wireFormat, socketFactory, remoteLocation, localLocation);
055    }
056
057    public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
058        super(wireFormat, socket);
059    }
060
061    protected void initializeStreams() throws IOException {
062        channel = socket.getChannel();
063        channel.configureBlocking(false);
064
065        // listen for events telling us when the socket is readable.
066        selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
067            public void onSelect(SelectorSelection selection) {
068                serviceRead();
069            }
070
071            public void onError(SelectorSelection selection, Throwable error) {
072                if (error instanceof IOException) {
073                    onException((IOException)error);
074                } else {
075                    onException(IOExceptionSupport.create(error));
076                }
077            }
078        });
079
080        // Send the data via the channel
081        // inputBuffer = ByteBuffer.allocateDirect(8*1024);
082        inputBuffer = ByteBuffer.allocate(8 * 1024);
083        currentBuffer = inputBuffer;
084        nextFrameSize = -1;
085        currentBuffer.limit(4);
086        NIOOutputStream outPutStream = new NIOOutputStream(channel, 16 * 1024);
087        this.dataOut = new DataOutputStream(outPutStream);
088        this.buffOut = outPutStream;
089    }
090
091    private void serviceRead() {
092        try {
093            while (true) {
094
095                int readSize = channel.read(currentBuffer);
096                if (readSize == -1) {
097                    onException(new EOFException());
098                    selection.close();
099                    break;
100                }
101                if (readSize == 0) {
102                    break;
103                }
104
105                if (currentBuffer.hasRemaining()) {
106                    continue;
107                }
108
109                // Are we trying to figure out the size of the next frame?
110                if (nextFrameSize == -1) {
111                    assert inputBuffer == currentBuffer;
112
113                    // If the frame is too big to fit in our direct byte buffer,
114                    // Then allocate a non direct byte buffer of the right size
115                    // for it.
116                    inputBuffer.flip();
117                    nextFrameSize = inputBuffer.getInt() + 4;
118                    if (nextFrameSize > inputBuffer.capacity()) {
119                        currentBuffer = ByteBuffer.allocate(nextFrameSize);
120                        currentBuffer.putInt(nextFrameSize);
121                    } else {
122                        inputBuffer.limit(nextFrameSize);
123                    }
124
125                } else {
126                    currentBuffer.flip();
127
128                    Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
129                    doConsume((Command)command);
130
131                    nextFrameSize = -1;
132                    inputBuffer.clear();
133                    inputBuffer.limit(4);
134                    currentBuffer = inputBuffer;
135                }
136
137            }
138
139        } catch (IOException e) {
140            onException(e);
141        } catch (Throwable e) {
142            onException(IOExceptionSupport.create(e));
143        }
144    }
145
146    protected void doStart() throws Exception {
147        connect();
148        selection.setInterestOps(SelectionKey.OP_READ);
149        selection.enable();
150    }
151
152    protected void doStop(ServiceStopper stopper) throws Exception {
153        if (selection != null) {
154            selection.close();
155        }
156        super.doStop(stopper);
157    }
158}