001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.nio; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.EOFException; 022import java.io.IOException; 023import java.net.Socket; 024import java.net.URI; 025import java.net.UnknownHostException; 026import java.nio.ByteBuffer; 027import java.nio.channels.SelectionKey; 028import java.nio.channels.SocketChannel; 029 030import javax.net.SocketFactory; 031 032import org.apache.activemq.command.Command; 033import org.apache.activemq.transport.Transport; 034import org.apache.activemq.transport.tcp.TcpTransport; 035import org.apache.activemq.util.IOExceptionSupport; 036import org.apache.activemq.util.ServiceStopper; 037import org.apache.activemq.wireformat.WireFormat; 038 039/** 040 * An implementation of the {@link Transport} interface using raw tcp/ip 041 * 042 * 043 */ 044public class NIOTransport extends TcpTransport { 045 046 // private static final Logger log = LoggerFactory.getLogger(NIOTransport.class); 047 private SocketChannel channel; 048 private SelectorSelection selection; 049 private ByteBuffer inputBuffer; 050 private ByteBuffer currentBuffer; 051 private int nextFrameSize; 052 053 public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { 054 super(wireFormat, socketFactory, remoteLocation, localLocation); 055 } 056 057 public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException { 058 super(wireFormat, socket); 059 } 060 061 protected void initializeStreams() throws IOException { 062 channel = socket.getChannel(); 063 channel.configureBlocking(false); 064 065 // listen for events telling us when the socket is readable. 066 selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() { 067 public void onSelect(SelectorSelection selection) { 068 serviceRead(); 069 } 070 071 public void onError(SelectorSelection selection, Throwable error) { 072 if (error instanceof IOException) { 073 onException((IOException)error); 074 } else { 075 onException(IOExceptionSupport.create(error)); 076 } 077 } 078 }); 079 080 // Send the data via the channel 081 // inputBuffer = ByteBuffer.allocateDirect(8*1024); 082 inputBuffer = ByteBuffer.allocate(8 * 1024); 083 currentBuffer = inputBuffer; 084 nextFrameSize = -1; 085 currentBuffer.limit(4); 086 NIOOutputStream outPutStream = new NIOOutputStream(channel, 16 * 1024); 087 this.dataOut = new DataOutputStream(outPutStream); 088 this.buffOut = outPutStream; 089 } 090 091 private void serviceRead() { 092 try { 093 while (true) { 094 095 int readSize = channel.read(currentBuffer); 096 if (readSize == -1) { 097 onException(new EOFException()); 098 selection.close(); 099 break; 100 } 101 if (readSize == 0) { 102 break; 103 } 104 105 if (currentBuffer.hasRemaining()) { 106 continue; 107 } 108 109 // Are we trying to figure out the size of the next frame? 110 if (nextFrameSize == -1) { 111 assert inputBuffer == currentBuffer; 112 113 // If the frame is too big to fit in our direct byte buffer, 114 // Then allocate a non direct byte buffer of the right size 115 // for it. 116 inputBuffer.flip(); 117 nextFrameSize = inputBuffer.getInt() + 4; 118 if (nextFrameSize > inputBuffer.capacity()) { 119 currentBuffer = ByteBuffer.allocate(nextFrameSize); 120 currentBuffer.putInt(nextFrameSize); 121 } else { 122 inputBuffer.limit(nextFrameSize); 123 } 124 125 } else { 126 currentBuffer.flip(); 127 128 Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer))); 129 doConsume((Command)command); 130 131 nextFrameSize = -1; 132 inputBuffer.clear(); 133 inputBuffer.limit(4); 134 currentBuffer = inputBuffer; 135 } 136 137 } 138 139 } catch (IOException e) { 140 onException(e); 141 } catch (Throwable e) { 142 onException(IOExceptionSupport.create(e)); 143 } 144 } 145 146 protected void doStart() throws Exception { 147 connect(); 148 selection.setInterestOps(SelectionKey.OP_READ); 149 selection.enable(); 150 } 151 152 protected void doStop(ServiceStopper stopper) throws Exception { 153 if (selection != null) { 154 selection.close(); 155 } 156 super.doStop(stopper); 157 } 158}