001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.net.InetAddress;
024import java.net.InetSocketAddress;
025import java.net.Socket;
026import java.net.SocketException;
027import java.net.SocketTimeoutException;
028import java.net.URI;
029import java.net.UnknownHostException;
030import java.util.HashMap;
031import java.util.Map;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.SynchronousQueue;
034import java.util.concurrent.ThreadFactory;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicReference;
038import javax.net.SocketFactory;
039import org.apache.activemq.Service;
040import org.apache.activemq.thread.DefaultThreadPools;
041import org.apache.activemq.transport.Transport;
042import org.apache.activemq.transport.TransportLoggerFactory;
043import org.apache.activemq.transport.TransportThreadSupport;
044import org.apache.activemq.util.InetAddressUtil;
045import org.apache.activemq.util.IntrospectionSupport;
046import org.apache.activemq.util.ServiceStopper;
047import org.apache.activemq.wireformat.WireFormat;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * An implementation of the {@link Transport} interface using raw tcp/ip
053 * 
054 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
055 * 
056 */
057public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
058    private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
059    protected final URI remoteLocation;
060    protected final URI localLocation;
061    protected final WireFormat wireFormat;
062
063    protected int connectionTimeout = 30000;
064    protected int soTimeout;
065    protected int socketBufferSize = 64 * 1024;
066    protected int ioBufferSize = 8 * 1024;
067    protected boolean closeAsync=true;
068    protected Socket socket;
069    protected DataOutputStream dataOut;
070    protected DataInputStream dataIn;
071    protected TimeStampStream buffOut = null;
072    /**
073     * The Traffic Class to be set on the socket.
074     */
075    protected int trafficClass = 0;
076    /**
077     * Keeps track of attempts to set the Traffic Class on the socket.
078     */
079    private boolean trafficClassSet = false;
080    /**
081     * Prevents setting both the Differentiated Services and Type of Service
082     * transport options at the same time, since they share the same spot in
083     * the TCP/IP packet headers.
084     */
085    protected boolean diffServChosen = false;
086    protected boolean typeOfServiceChosen = false;
087    /**
088     * trace=true -> the Transport stack where this TcpTransport
089     * object will be, will have a TransportLogger layer
090     * trace=false -> the Transport stack where this TcpTransport
091     * object will be, will NOT have a TransportLogger layer, and therefore
092     * will never be able to print logging messages.
093     * This parameter is most probably set in Connection or TransportConnector URIs.
094     */
095    protected boolean trace = false;
096    /**
097     * Name of the LogWriter implementation to use.
098     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
099     * This parameter is most probably set in Connection or TransportConnector URIs.
100     */
101    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
102    /**
103     * Specifies if the TransportLogger will be manageable by JMX or not.
104     * Also, as long as there is at least 1 TransportLogger which is manageable,
105     * a TransportLoggerControl MBean will me created.
106     */
107    protected boolean dynamicManagement = false;
108    /**
109     * startLogging=true -> the TransportLogger object of the Transport stack
110     * will initially write messages to the log.
111     * startLogging=false -> the TransportLogger object of the Transport stack
112     * will initially NOT write messages to the log.
113     * This parameter only has an effect if trace == true.
114     * This parameter is most probably set in Connection or TransportConnector URIs.
115     */
116    protected boolean startLogging = true;
117    /**
118     * Specifies the port that will be used by the JMX server to manage
119     * the TransportLoggers.
120     * This should only be set in an URI by a client (producer or consumer) since
121     * a broker will already create a JMX server.
122     * It is useful for people who test a broker and clients in the same machine
123     * and want to control both via JMX; a different port will be needed.
124     */
125    protected int jmxPort = 1099;
126    protected boolean useLocalHost = false;
127    protected int minmumWireFormatVersion;
128    protected SocketFactory socketFactory;
129    protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
130
131    private Map<String, Object> socketOptions;
132    private Boolean keepAlive;
133    private Boolean tcpNoDelay;
134    private Thread runnerThread;
135    private volatile int receiveCounter;
136
137    /**
138     * Connect to a remote Node - e.g. a Broker
139     * 
140     * @param wireFormat
141     * @param socketFactory
142     * @param remoteLocation
143     * @param localLocation - e.g. local InetAddress and local port
144     * @throws IOException
145     * @throws UnknownHostException
146     */
147    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
148                        URI localLocation) throws UnknownHostException, IOException {
149        this.wireFormat = wireFormat;
150        this.socketFactory = socketFactory;
151        try {
152            this.socket = socketFactory.createSocket();
153        } catch (SocketException e) {
154            this.socket = null;
155        }
156        this.remoteLocation = remoteLocation;
157        this.localLocation = localLocation;
158        setDaemon(false);
159    }
160
161    /**
162     * Initialize from a server Socket
163     * 
164     * @param wireFormat
165     * @param socket
166     * @throws IOException
167     */
168    public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
169        this.wireFormat = wireFormat;
170        this.socket = socket;
171        this.remoteLocation = null;
172        this.localLocation = null;
173        setDaemon(true);
174    }
175
176    /**
177     * A one way asynchronous send
178     */
179    public void oneway(Object command) throws IOException {
180        checkStarted();
181        wireFormat.marshal(command, dataOut);
182        dataOut.flush();
183    }
184
185    /**
186     * @return pretty print of 'this'
187     */
188    @Override
189    public String toString() {
190        return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort()
191                : (localLocation != null ? localLocation : remoteLocation)) ;
192    }
193
194    /**
195     * reads packets from a Socket
196     */
197    public void run() {
198        LOG.trace("TCP consumer thread for " + this + " starting");
199        this.runnerThread=Thread.currentThread();
200        try {
201            while (!isStopped()) {
202                doRun();
203            }
204        } catch (IOException e) {
205            stoppedLatch.get().countDown();
206            onException(e);
207        } catch (Throwable e){
208            stoppedLatch.get().countDown();
209            IOException ioe=new IOException("Unexpected error occured");
210            ioe.initCause(e);
211            onException(ioe);
212        }finally {
213            stoppedLatch.get().countDown();
214        }
215    }
216
217    protected void doRun() throws IOException {
218        try {
219            Object command = readCommand();
220            doConsume(command);
221        } catch (SocketTimeoutException e) {
222        } catch (InterruptedIOException e) {
223        }
224    }
225
226    protected Object readCommand() throws IOException {
227        return wireFormat.unmarshal(dataIn);
228    }
229
230    // Properties
231    // -------------------------------------------------------------------------
232    public String getDiffServ() {
233        // This is the value requested by the user by setting the Tcp Transport
234        // options. If the socket hasn't been created, then this value may not
235        // reflect the value returned by Socket.getTrafficClass().
236        return Integer.toString(this.trafficClass);
237    }
238
239    public void setDiffServ(String diffServ) throws IllegalArgumentException {
240        this.trafficClass = QualityOfServiceUtils.getDSCP(diffServ);
241        this.diffServChosen = true;
242    }
243
244    public int getTypeOfService() {
245        // This is the value requested by the user by setting the Tcp Transport
246        // options. If the socket hasn't been created, then this value may not
247        // reflect the value returned by Socket.getTrafficClass().
248        return this.trafficClass;
249    }
250  
251    public void setTypeOfService(int typeOfService) {
252        this.trafficClass = QualityOfServiceUtils.getToS(typeOfService);
253        this.typeOfServiceChosen = true;
254    }
255
256    public boolean isTrace() {
257        return trace;
258    }
259
260    public void setTrace(boolean trace) {
261        this.trace = trace;
262    }
263    
264    public String getLogWriterName() {
265        return logWriterName;
266    }
267
268    public void setLogWriterName(String logFormat) {
269        this.logWriterName = logFormat;
270    }
271
272    public boolean isDynamicManagement() {
273        return dynamicManagement;
274    }
275
276    public void setDynamicManagement(boolean useJmx) {
277        this.dynamicManagement = useJmx;
278    }
279
280    public boolean isStartLogging() {
281        return startLogging;
282    }
283
284    public void setStartLogging(boolean startLogging) {
285        this.startLogging = startLogging;
286    }
287
288    public int getJmxPort() {
289        return jmxPort;
290    }
291
292    public void setJmxPort(int jmxPort) {
293        this.jmxPort = jmxPort;
294    }
295    
296    public int getMinmumWireFormatVersion() {
297        return minmumWireFormatVersion;
298    }
299
300    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
301        this.minmumWireFormatVersion = minmumWireFormatVersion;
302    }
303
304    public boolean isUseLocalHost() {
305        return useLocalHost;
306    }
307
308    /**
309     * Sets whether 'localhost' or the actual local host name should be used to
310     * make local connections. On some operating systems such as Macs its not
311     * possible to connect as the local host name so localhost is better.
312     */
313    public void setUseLocalHost(boolean useLocalHost) {
314        this.useLocalHost = useLocalHost;
315    }
316
317    public int getSocketBufferSize() {
318        return socketBufferSize;
319    }
320
321    /**
322     * Sets the buffer size to use on the socket
323     */
324    public void setSocketBufferSize(int socketBufferSize) {
325        this.socketBufferSize = socketBufferSize;
326    }
327
328    public int getSoTimeout() {
329        return soTimeout;
330    }
331
332    /**
333     * Sets the socket timeout
334     */
335    public void setSoTimeout(int soTimeout) {
336        this.soTimeout = soTimeout;
337    }
338
339    public int getConnectionTimeout() {
340        return connectionTimeout;
341    }
342
343    /**
344     * Sets the timeout used to connect to the socket
345     */
346    public void setConnectionTimeout(int connectionTimeout) {
347        this.connectionTimeout = connectionTimeout;
348    }
349
350    public Boolean getKeepAlive() {
351        return keepAlive;
352    }
353
354    /**
355     * Enable/disable TCP KEEP_ALIVE mode
356     */
357    public void setKeepAlive(Boolean keepAlive) {
358        this.keepAlive = keepAlive;
359    }
360
361    public Boolean getTcpNoDelay() {
362        return tcpNoDelay;
363    }
364
365    /**
366     * Enable/disable the TCP_NODELAY option on the socket
367     */
368    public void setTcpNoDelay(Boolean tcpNoDelay) {
369        this.tcpNoDelay = tcpNoDelay;
370    }
371
372    /**
373     * @return the ioBufferSize
374     */
375    public int getIoBufferSize() {
376        return this.ioBufferSize;
377    }
378
379    /**
380     * @param ioBufferSize the ioBufferSize to set
381     */
382    public void setIoBufferSize(int ioBufferSize) {
383        this.ioBufferSize = ioBufferSize;
384    }
385    
386    /**
387     * @return the closeAsync
388     */
389    public boolean isCloseAsync() {
390        return closeAsync;
391    }
392
393    /**
394     * @param closeAsync the closeAsync to set
395     */
396    public void setCloseAsync(boolean closeAsync) {
397        this.closeAsync = closeAsync;
398    }
399
400    // Implementation methods
401    // -------------------------------------------------------------------------
402    protected String resolveHostName(String host) throws UnknownHostException {
403        if (isUseLocalHost()) {
404            String localName = InetAddressUtil.getLocalHostName();
405            if (localName != null && localName.equals(host)) {
406                return "localhost";
407            }
408        }
409        return host;
410    }
411
412    /**
413     * Configures the socket for use
414     * 
415     * @param sock
416     * @throws SocketException, IllegalArgumentException if setting the options
417     *         on the socket failed.
418     */
419    protected void initialiseSocket(Socket sock) throws SocketException,
420            IllegalArgumentException {
421        if (socketOptions != null) {
422            IntrospectionSupport.setProperties(socket, socketOptions);
423        }
424
425        try {
426            sock.setReceiveBufferSize(socketBufferSize);
427            sock.setSendBufferSize(socketBufferSize);
428        } catch (SocketException se) {
429            LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
430            LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
431        }
432        sock.setSoTimeout(soTimeout);
433
434        if (keepAlive != null) {
435            sock.setKeepAlive(keepAlive.booleanValue());
436        }
437        if (tcpNoDelay != null) {
438            sock.setTcpNoDelay(tcpNoDelay.booleanValue());
439        }
440        if (!this.trafficClassSet) {
441            this.trafficClassSet = setTrafficClass(sock);
442        }
443    }
444
445    @Override
446    protected void doStart() throws Exception {
447        connect();
448        stoppedLatch.set(new CountDownLatch(1));
449        super.doStart();
450    }
451
452    protected void connect() throws Exception {
453
454        if (socket == null && socketFactory == null) {
455            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
456        }
457
458        InetSocketAddress localAddress = null;
459        InetSocketAddress remoteAddress = null;
460
461        if (localLocation != null) {
462            localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
463                                                 localLocation.getPort());
464        }
465
466        if (remoteLocation != null) {
467            String host = resolveHostName(remoteLocation.getHost());
468            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
469        }
470        // Set the traffic class before the socket is connected when possible so
471        // that the connection packets are given the correct traffic class.
472        this.trafficClassSet = setTrafficClass(socket);
473
474        if (socket != null) {
475
476            if (localAddress != null) {
477                socket.bind(localAddress);
478            }
479
480            // If it's a server accepted socket.. we don't need to connect it
481            // to a remote address.
482            if (remoteAddress != null) {
483                if (connectionTimeout >= 0) {
484                    socket.connect(remoteAddress, connectionTimeout);
485                } else {
486                    socket.connect(remoteAddress);
487                }
488            }
489
490        } else {
491            // For SSL sockets.. you can't create an unconnected socket :(
492            // This means the timout option are not supported either.
493            if (localAddress != null) {
494                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
495                                                    localAddress.getAddress(), localAddress.getPort());
496            } else {
497                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
498            }
499        }
500
501        initialiseSocket(socket);
502        initializeStreams();
503    }
504
505    @Override
506    protected void doStop(ServiceStopper stopper) throws Exception {
507        if (LOG.isDebugEnabled()) {
508            LOG.debug("Stopping transport " + this);
509        }
510
511        // Closing the streams flush the sockets before closing.. if the socket
512        // is hung.. then this hangs the close.
513        // closeStreams();
514        if (socket != null) {
515            if (closeAsync) {
516                //closing the socket can hang also 
517                final CountDownLatch latch = new CountDownLatch(1);
518                
519                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
520    
521                    public void run() {
522                        try {
523                            socket.close();
524                        } catch (IOException e) {
525                            LOG.debug("Caught exception closing socket",e);
526                        }finally {
527                            latch.countDown();
528                        }
529                    }
530                    
531                });
532                latch.await(1,TimeUnit.SECONDS);
533            }else {
534                try {
535                    socket.close();
536                } catch (IOException e) {
537                    LOG.debug("Caught exception closing socket",e);
538                }
539            }
540           
541        }
542    }
543
544    /**
545     * Override so that stop() blocks until the run thread is no longer running.
546     */
547    @Override
548    public void stop() throws Exception {
549        super.stop();
550        CountDownLatch countDownLatch = stoppedLatch.get();
551        if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
552            countDownLatch.await(1,TimeUnit.SECONDS);
553        }
554    }
555
556    protected void initializeStreams() throws Exception {
557        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
558            @Override
559            public int read() throws IOException {
560                receiveCounter++;
561                return super.read();
562            }
563            @Override
564            public int read(byte[] b, int off, int len) throws IOException {
565                receiveCounter++;
566                return super.read(b, off, len);
567            }
568            @Override
569            public long skip(long n) throws IOException {
570                receiveCounter++;
571                return super.skip(n);
572            }
573            @Override
574            protected void fill() throws IOException {
575                receiveCounter++;
576                super.fill();
577            }
578        };
579        this.dataIn = new DataInputStream(buffIn);
580        TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
581        this.dataOut = new DataOutputStream(outputStream);
582        this.buffOut = outputStream;
583    }
584
585    protected void closeStreams() throws IOException {
586        if (dataOut != null) {
587            dataOut.close();
588        }
589        if (dataIn != null) {
590            dataIn.close();
591        }
592    }
593
594    public void setSocketOptions(Map<String, Object> socketOptions) {
595        this.socketOptions = new HashMap<String, Object>(socketOptions);
596    }
597
598    public String getRemoteAddress() {
599        if (socket != null) {
600            return "" + socket.getRemoteSocketAddress();
601        }
602        return null;
603    }
604    
605    @Override
606    public <T> T narrow(Class<T> target) {
607        if (target == Socket.class) {
608            return target.cast(socket);
609        } else if ( target == TimeStampStream.class) {
610            return target.cast(buffOut);
611        }
612        return super.narrow(target);
613    }
614    
615    public int getReceiveCounter() {
616        return receiveCounter;
617    }
618    
619
620    /**
621     * @param sock The socket on which to set the Traffic Class.
622     * @return Whether or not the Traffic Class was set on the given socket.
623     * @throws SocketException if the system does not support setting the
624     *         Traffic Class.
625     * @throws IllegalArgumentException if both the Differentiated Services and
626     *         Type of Services transport options have been set on the same
627     *         connection.
628     */
629    private boolean setTrafficClass(Socket sock) throws SocketException,
630            IllegalArgumentException {
631        if (sock == null
632            || (!this.diffServChosen && !this.typeOfServiceChosen)) {
633            return false;
634        }
635        if (this.diffServChosen && this.typeOfServiceChosen) {
636            throw new IllegalArgumentException("Cannot set both the "
637                + " Differentiated Services and Type of Services transport "
638                + " options on the same connection.");
639        }
640
641        sock.setTrafficClass(this.trafficClass);
642
643        int resultTrafficClass = sock.getTrafficClass();
644        if (this.trafficClass != resultTrafficClass) {
645            // In the case where the user has specified the ECN bits (e.g. in
646            // Type of Service) but the system won't allow the ECN bits to be
647            // set or in the case where setting the traffic class failed for
648            // other reasons, emit a warning.
649            if ((this.trafficClass >> 2) == (resultTrafficClass >> 2)
650                    && (this.trafficClass & 3) != (resultTrafficClass & 3)) {
651                LOG.warn("Attempted to set the Traffic Class to "
652                    + this.trafficClass + " but the result Traffic Class was "
653                    + resultTrafficClass + ". Please check that your system "
654                    + "allows you to set the ECN bits (the first two bits).");
655            } else {
656                LOG.warn("Attempted to set the Traffic Class to "
657                    + this.trafficClass + " but the result Traffic Class was "
658                    + resultTrafficClass + ". Please check that your system "
659                         + "supports java.net.setTrafficClass.");
660            }
661            return false;
662        }
663        // Reset the guards that prevent both the Differentiated Services
664        // option and the Type of Service option from being set on the same
665        // connection.
666        this.diffServChosen = false;
667        this.typeOfServiceChosen = false;
668        return true;
669    }
670}