001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.udp;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.net.UnknownHostException;
023import java.util.HashMap;
024import java.util.Map;
025
026import org.apache.activemq.openwire.OpenWireFormat;
027import org.apache.activemq.transport.CommandJoiner;
028import org.apache.activemq.transport.InactivityMonitor;
029import org.apache.activemq.transport.Transport;
030import org.apache.activemq.transport.TransportFactory;
031import org.apache.activemq.transport.TransportLoggerFactory;
032import org.apache.activemq.transport.TransportServer;
033import org.apache.activemq.transport.reliable.DefaultReplayStrategy;
034import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
035import org.apache.activemq.transport.reliable.ReliableTransport;
036import org.apache.activemq.transport.reliable.ReplayStrategy;
037import org.apache.activemq.transport.reliable.Replayer;
038import org.apache.activemq.transport.tcp.TcpTransportFactory;
039import org.apache.activemq.util.IOExceptionSupport;
040import org.apache.activemq.util.IntrospectionSupport;
041import org.apache.activemq.util.URISupport;
042import org.apache.activemq.wireformat.WireFormat;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
048 * 
049 */
050public class UdpTransportFactory extends TransportFactory {
051    
052    private static final Logger log = LoggerFactory.getLogger(TcpTransportFactory.class);
053    
054    public TransportServer doBind(final URI location) throws IOException {
055        try {
056            Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
057            if (options.containsKey("port")) {
058                throw new IllegalArgumentException("The port property cannot be specified on a UDP server transport - please use the port in the URI syntax");
059            }
060            WireFormat wf = createWireFormat(options);
061            int port = location.getPort();
062            OpenWireFormat openWireFormat = asOpenWireFormat(wf);
063            UdpTransport transport = (UdpTransport) createTransport(location, wf);
064
065            Transport configuredTransport = configure(transport, wf, options, true);
066            UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport, createReplayStrategy());
067            return server;
068        } catch (URISyntaxException e) {
069            throw IOExceptionSupport.create(e);
070        } catch (Exception e) {
071            throw IOExceptionSupport.create(e);
072        }
073    }
074
075    public Transport configure(Transport transport, WireFormat format, Map options) throws Exception {
076        return configure(transport, format, options, false);
077    }
078
079    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
080        IntrospectionSupport.setProperties(transport, options);
081        final UdpTransport udpTransport = (UdpTransport)transport;
082
083        // deal with fragmentation
084        transport = new CommandJoiner(transport, asOpenWireFormat(format));
085
086        if (udpTransport.isTrace()) {
087            try {
088                transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
089            } catch (Throwable e) {
090                log.error("Could not create TransportLogger object for: " + TransportLoggerFactory.defaultLogWriterName + ", reason: " + e, e);
091            }
092        }
093
094        transport = new InactivityMonitor(transport, format);
095
096        if (format instanceof OpenWireFormat) {
097            transport = configureClientSideNegotiator(transport, format, udpTransport);
098        }
099
100        return transport;
101    }
102
103    protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
104        OpenWireFormat wireFormat = asOpenWireFormat(wf);
105        return new UdpTransport(wireFormat, location);
106    }
107
108    /**
109     * Configures the transport
110     * 
111     * @param acceptServer true if this transport is used purely as an 'accept'
112     *                transport for new connections which work like TCP
113     *                SocketServers where new connections spin up a new separate
114     *                UDP transport
115     */
116    protected Transport configure(Transport transport, WireFormat format, Map options, boolean acceptServer) throws Exception {
117        IntrospectionSupport.setProperties(transport, options);
118        UdpTransport udpTransport = (UdpTransport)transport;
119
120        OpenWireFormat openWireFormat = asOpenWireFormat(format);
121
122        if (udpTransport.isTrace()) {
123            transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
124        }
125
126        transport = new InactivityMonitor(transport, format);
127
128        if (!acceptServer && format instanceof OpenWireFormat) {
129            transport = configureClientSideNegotiator(transport, format, udpTransport);
130        }
131
132        // deal with fragmentation
133
134        if (acceptServer) {
135            // lets not support a buffer of messages to enable reliable
136            // messaging on the 'accept server' transport
137            udpTransport.setReplayEnabled(false);
138
139            // we don't want to do reliable checks on this transport as we
140            // delegate to one that does
141            transport = new CommandJoiner(transport, openWireFormat);
142            return transport;
143        } else {
144            ReliableTransport reliableTransport = new ReliableTransport(transport, udpTransport);
145            Replayer replayer = reliableTransport.getReplayer();
146            reliableTransport.setReplayStrategy(createReplayStrategy(replayer));
147
148            // Joiner must be on outside as the inbound messages must be
149            // processed by the reliable transport first
150            return new CommandJoiner(reliableTransport, openWireFormat);
151        }
152    }
153
154    protected ReplayStrategy createReplayStrategy(Replayer replayer) {
155        if (replayer != null) {
156            return new DefaultReplayStrategy(5);
157        }
158        return new ExceptionIfDroppedReplayStrategy(1);
159    }
160
161    protected ReplayStrategy createReplayStrategy() {
162        return new DefaultReplayStrategy(5);
163    }
164
165    protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) {
166        return new ResponseRedirectInterceptor(transport, udpTransport);
167    }
168
169    protected OpenWireFormat asOpenWireFormat(WireFormat wf) {
170        OpenWireFormat answer = (OpenWireFormat)wf;
171        return answer;
172    }
173}