001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.IOException;
020import java.net.InetAddress;
021import java.net.InetSocketAddress;
022import java.net.ServerSocket;
023import java.net.Socket;
024import java.net.SocketException;
025import java.net.SocketTimeoutException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.UnknownHostException;
029import java.util.HashMap;
030import java.util.Map;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.LinkedBlockingQueue;
033import java.util.concurrent.TimeUnit;
034
035import javax.net.ServerSocketFactory;
036
037import org.apache.activemq.Service;
038import org.apache.activemq.ThreadPriorities;
039import org.apache.activemq.command.BrokerInfo;
040import org.apache.activemq.openwire.OpenWireFormatFactory;
041import org.apache.activemq.transport.Transport;
042import org.apache.activemq.transport.TransportLoggerFactory;
043import org.apache.activemq.transport.TransportServer;
044import org.apache.activemq.transport.TransportServerThreadSupport;
045import org.apache.activemq.util.IOExceptionSupport;
046import org.apache.activemq.util.InetAddressUtil;
047import org.apache.activemq.util.IntrospectionSupport;
048import org.apache.activemq.util.ServiceListener;
049import org.apache.activemq.util.ServiceStopper;
050import org.apache.activemq.util.ServiceSupport;
051import org.apache.activemq.wireformat.WireFormat;
052import org.apache.activemq.wireformat.WireFormatFactory;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * A TCP based implementation of {@link TransportServer}
058 * 
059 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
060 * 
061 */
062
063public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
064
065    private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
066    protected ServerSocket serverSocket;
067    protected int backlog = 5000;
068    protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
069    protected final TcpTransportFactory transportFactory;
070    protected long maxInactivityDuration = 30000;
071    protected long maxInactivityDurationInitalDelay = 10000;
072    protected int minmumWireFormatVersion;
073    protected boolean useQueueForAccept=true;
074       
075    /**
076     * trace=true -> the Transport stack where this TcpTransport
077     * object will be, will have a TransportLogger layer
078     * trace=false -> the Transport stack where this TcpTransport
079     * object will be, will NOT have a TransportLogger layer, and therefore
080     * will never be able to print logging messages.
081     * This parameter is most probably set in Connection or TransportConnector URIs.
082     */
083    protected boolean trace = false;
084
085    protected int soTimeout = 0;
086    protected int socketBufferSize = 64 * 1024;
087    protected int connectionTimeout =  30000;
088
089    /**
090     * Name of the LogWriter implementation to use.
091     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
092     * This parameter is most probably set in Connection or TransportConnector URIs.
093     */
094    protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
095    /**
096     * Specifies if the TransportLogger will be manageable by JMX or not.
097     * Also, as long as there is at least 1 TransportLogger which is manageable,
098     * a TransportLoggerControl MBean will me created.
099     */
100    protected boolean dynamicManagement = false;
101    /**
102     * startLogging=true -> the TransportLogger object of the Transport stack
103     * will initially write messages to the log.
104     * startLogging=false -> the TransportLogger object of the Transport stack
105     * will initially NOT write messages to the log.
106     * This parameter only has an effect if trace == true.
107     * This parameter is most probably set in Connection or TransportConnector URIs.
108     */
109    protected boolean startLogging = true;
110    protected final ServerSocketFactory serverSocketFactory;
111    protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
112    protected Thread socketHandlerThread;
113    /**
114     * The maximum number of sockets allowed for this server
115     */
116    protected int maximumConnections = Integer.MAX_VALUE;
117    protected int currentTransportCount=0;
118  
119    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
120        super(location);
121        this.transportFactory = transportFactory;
122        this.serverSocketFactory = serverSocketFactory;
123        
124    }
125
126    public void bind() throws IOException {
127        URI bind = getBindLocation();
128
129        String host = bind.getHost();
130        host = (host == null || host.length() == 0) ? "localhost" : host;
131        InetAddress addr = InetAddress.getByName(host);
132
133        try {
134
135            this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
136            configureServerSocket(this.serverSocket);
137            
138        } catch (IOException e) {
139            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
140        }
141        try {
142            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
143                .getFragment()));
144        } catch (URISyntaxException e) {
145
146            // it could be that the host name contains invalid characters such
147            // as _ on unix platforms
148            // so lets try use the IP address instead
149            try {
150                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
151            } catch (URISyntaxException e2) {
152                throw IOExceptionSupport.create(e2);
153            }
154        }
155    }
156
157    private void configureServerSocket(ServerSocket socket) throws SocketException {
158        socket.setSoTimeout(2000);
159        if (transportOptions != null) {
160            IntrospectionSupport.setProperties(socket, transportOptions);
161        }
162    }
163
164    /**
165     * @return Returns the wireFormatFactory.
166     */
167    public WireFormatFactory getWireFormatFactory() {
168        return wireFormatFactory;
169    }
170
171    /**
172     * @param wireFormatFactory The wireFormatFactory to set.
173     */
174    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
175        this.wireFormatFactory = wireFormatFactory;
176    }
177
178    /**
179     * Associates a broker info with the transport server so that the transport
180     * can do discovery advertisements of the broker.
181     * 
182     * @param brokerInfo
183     */
184    public void setBrokerInfo(BrokerInfo brokerInfo) {
185    }
186
187    public long getMaxInactivityDuration() {
188        return maxInactivityDuration;
189    }
190
191    public void setMaxInactivityDuration(long maxInactivityDuration) {
192        this.maxInactivityDuration = maxInactivityDuration;
193    }
194    
195    public long getMaxInactivityDurationInitalDelay() {
196        return this.maxInactivityDurationInitalDelay;
197    }
198
199    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
200        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
201    }
202
203    public int getMinmumWireFormatVersion() {
204        return minmumWireFormatVersion;
205    }
206
207    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
208        this.minmumWireFormatVersion = minmumWireFormatVersion;
209    }
210
211    public boolean isTrace() {
212        return trace;
213    }
214
215    public void setTrace(boolean trace) {
216        this.trace = trace;
217    }
218    
219    public String getLogWriterName() {
220        return logWriterName;
221    }
222
223    public void setLogWriterName(String logFormat) {
224        this.logWriterName = logFormat;
225    }        
226
227    public boolean isDynamicManagement() {
228        return dynamicManagement;
229    }
230
231    public void setDynamicManagement(boolean useJmx) {
232        this.dynamicManagement = useJmx;
233    }
234
235    public boolean isStartLogging() {
236        return startLogging;
237    }
238
239
240    public void setStartLogging(boolean startLogging) {
241        this.startLogging = startLogging;
242    }
243    
244    /**
245     * @return the backlog
246     */
247    public int getBacklog() {
248        return backlog;
249    }
250
251    /**
252     * @param backlog the backlog to set
253     */
254    public void setBacklog(int backlog) {
255        this.backlog = backlog;
256    }
257
258    /**
259     * @return the useQueueForAccept
260     */
261    public boolean isUseQueueForAccept() {
262        return useQueueForAccept;
263    }
264
265    /**
266     * @param useQueueForAccept the useQueueForAccept to set
267     */
268    public void setUseQueueForAccept(boolean useQueueForAccept) {
269        this.useQueueForAccept = useQueueForAccept;
270    }
271    
272
273    /**
274     * pull Sockets from the ServerSocket
275     */
276    public void run() {
277        while (!isStopped()) {
278            Socket socket = null;
279            try {
280                socket = serverSocket.accept();
281                if (socket != null) {
282                    if (isStopped() || getAcceptListener() == null) {
283                        socket.close();
284                    } else {
285                        if (useQueueForAccept) {
286                            socketQueue.put(socket);
287                        }else {
288                            handleSocket(socket);
289                        }
290                    }
291                }
292            } catch (SocketTimeoutException ste) {
293                // expect this to happen
294            } catch (Exception e) {
295                if (!isStopping()) {
296                    onAcceptError(e);
297                } else if (!isStopped()) {
298                    LOG.warn("run()", e);
299                    onAcceptError(e);
300                }
301            }
302        }
303    }
304
305    /**
306     * Allow derived classes to override the Transport implementation that this
307     * transport server creates.
308     * 
309     * @param socket
310     * @param format
311     * @return
312     * @throws IOException
313     */
314    protected  Transport createTransport(Socket socket, WireFormat format) throws IOException {
315        return new TcpTransport(format, socket);
316    }
317
318    /**
319     * @return pretty print of this
320     */
321    public String toString() {
322        return "" + getBindLocation();
323    }
324
325    /**
326     * @param socket 
327     * @param inetAddress
328     * @return real hostName
329     * @throws UnknownHostException
330     */
331    protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
332        String result = null;
333        if (socket.isBound()) {
334            if (socket.getInetAddress().isAnyLocalAddress()) {
335                // make it more human readable and useful, an alternative to 0.0.0.0
336                result = InetAddressUtil.getLocalHostName();
337            } else {
338                result = socket.getInetAddress().getCanonicalHostName();
339            }
340        } else {
341            result = bindAddress.getCanonicalHostName();
342        }
343        return result;
344    }
345    
346    protected void doStart() throws Exception {
347        if(useQueueForAccept) {
348            Runnable run = new Runnable() {
349                public void run() {
350                    try {
351                        while (!isStopped() && !isStopping()) {
352                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
353                            if (sock != null) {
354                                handleSocket(sock);
355                            }
356                        }
357    
358                    } catch (InterruptedException e) {
359                        LOG.info("socketQueue interuppted - stopping");
360                        if (!isStopping()) {
361                            onAcceptError(e);
362                        }
363                    }
364    
365                }
366    
367            };
368            socketHandlerThread = new Thread(null, run,
369                    "ActiveMQ Transport Server Thread Handler: " + toString(),
370                    getStackSize());
371            socketHandlerThread.setDaemon(true);
372            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
373            socketHandlerThread.start();
374        }
375        super.doStart();
376        
377    }
378
379    protected void doStop(ServiceStopper stopper) throws Exception {
380        super.doStop(stopper);
381        if (serverSocket != null) {
382            serverSocket.close();
383        }
384    }
385
386    public InetSocketAddress getSocketAddress() {
387        return (InetSocketAddress)serverSocket.getLocalSocketAddress();
388    }
389
390    protected final void handleSocket(Socket socket) {
391        try {
392            if (this.currentTransportCount >= this.maximumConnections) {
393                throw new ExceededMaximumConnectionsException("Exceeded the maximum " + 
394                    "number of allowed client connections. See the 'maximumConnections' " + 
395                    "property on the TCP transport configuration URI in the ActiveMQ " + 
396                    "configuration file (e.g., activemq.xml)"); 
397                
398            } else {
399                HashMap<String, Object> options = new HashMap<String, Object>();
400                options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
401                options.put("maxInactivityDurationInitalDelay", 
402                    Long.valueOf(maxInactivityDurationInitalDelay));
403                options.put("minmumWireFormatVersion", 
404                    Integer.valueOf(minmumWireFormatVersion));
405                options.put("trace", Boolean.valueOf(trace));
406                options.put("soTimeout", Integer.valueOf(soTimeout));
407                options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
408                options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
409                options.put("logWriterName", logWriterName);
410                options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
411                options.put("startLogging", Boolean.valueOf(startLogging));
412                options.putAll(transportOptions);
413
414                WireFormat format = wireFormatFactory.createWireFormat();
415                Transport transport = createTransport(socket, format);
416
417                if (transport instanceof ServiceSupport) {
418                    ((ServiceSupport) transport).addServiceListener(this);
419                }
420
421                Transport configuredTransport = 
422                    transportFactory.serverConfigure( transport, format, options);
423
424                getAcceptListener().onAccept(configuredTransport);
425            }
426        } catch (SocketTimeoutException ste) {
427            // expect this to happen
428        } catch (Exception e) {
429            if (!isStopping()) {
430                onAcceptError(e);
431            } else if (!isStopped()) {
432                LOG.warn("run()", e);
433                onAcceptError(e);
434            }
435        }
436        
437    }    
438
439        public int getSoTimeout() {
440                return soTimeout;
441        }
442
443        public void setSoTimeout(int soTimeout) {
444                this.soTimeout = soTimeout;
445        }
446
447        public int getSocketBufferSize() {
448                return socketBufferSize;
449        }
450
451        public void setSocketBufferSize(int socketBufferSize) {
452                this.socketBufferSize = socketBufferSize;
453        }
454
455        public int getConnectionTimeout() {
456                return connectionTimeout;
457        }
458
459        public void setConnectionTimeout(int connectionTimeout) {
460                this.connectionTimeout = connectionTimeout;
461        }
462
463    /**
464     * @return the maximumConnections
465     */
466    public int getMaximumConnections() {
467        return maximumConnections;
468    }
469
470    /**
471     * @param maximumConnections the maximumConnections to set
472     */
473    public void setMaximumConnections(int maximumConnections) {
474        this.maximumConnections = maximumConnections;
475    }
476
477    
478    public void started(Service service) {
479       this.currentTransportCount++;
480    }
481
482    public void stopped(Service service) {
483        this.currentTransportCount--;
484    }
485}