001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.IOException;
020import java.net.URI;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.LinkedList;
024import java.util.List;
025import java.util.Map;
026import java.util.Properties;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.CopyOnWriteArrayList;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicReference;
034import java.util.concurrent.locks.ReentrantReadWriteLock;
035
036import javax.management.ObjectName;
037import javax.transaction.xa.XAResource;
038
039import org.apache.activemq.broker.ft.MasterBroker;
040import org.apache.activemq.broker.region.ConnectionStatistics;
041import org.apache.activemq.broker.region.RegionBroker;
042import org.apache.activemq.command.BrokerId;
043import org.apache.activemq.command.BrokerInfo;
044import org.apache.activemq.command.Command;
045import org.apache.activemq.command.CommandTypes;
046import org.apache.activemq.command.ConnectionControl;
047import org.apache.activemq.command.ConnectionError;
048import org.apache.activemq.command.ConnectionId;
049import org.apache.activemq.command.ConnectionInfo;
050import org.apache.activemq.command.ConsumerControl;
051import org.apache.activemq.command.ConsumerId;
052import org.apache.activemq.command.ConsumerInfo;
053import org.apache.activemq.command.ControlCommand;
054import org.apache.activemq.command.DataArrayResponse;
055import org.apache.activemq.command.DestinationInfo;
056import org.apache.activemq.command.ExceptionResponse;
057import org.apache.activemq.command.FlushCommand;
058import org.apache.activemq.command.IntegerResponse;
059import org.apache.activemq.command.KeepAliveInfo;
060import org.apache.activemq.command.Message;
061import org.apache.activemq.command.MessageAck;
062import org.apache.activemq.command.MessageDispatch;
063import org.apache.activemq.command.MessageDispatchNotification;
064import org.apache.activemq.command.MessagePull;
065import org.apache.activemq.command.ProducerAck;
066import org.apache.activemq.command.ProducerId;
067import org.apache.activemq.command.ProducerInfo;
068import org.apache.activemq.command.RemoveSubscriptionInfo;
069import org.apache.activemq.command.Response;
070import org.apache.activemq.command.SessionId;
071import org.apache.activemq.command.SessionInfo;
072import org.apache.activemq.command.ShutdownInfo;
073import org.apache.activemq.command.TransactionId;
074import org.apache.activemq.command.TransactionInfo;
075import org.apache.activemq.command.WireFormatInfo;
076import org.apache.activemq.network.*;
077import org.apache.activemq.security.MessageAuthorizationPolicy;
078import org.apache.activemq.state.CommandVisitor;
079import org.apache.activemq.state.ConnectionState;
080import org.apache.activemq.state.ConsumerState;
081import org.apache.activemq.state.ProducerState;
082import org.apache.activemq.state.SessionState;
083import org.apache.activemq.state.TransactionState;
084import org.apache.activemq.thread.DefaultThreadPools;
085import org.apache.activemq.thread.Task;
086import org.apache.activemq.thread.TaskRunner;
087import org.apache.activemq.thread.TaskRunnerFactory;
088import org.apache.activemq.transaction.Transaction;
089import org.apache.activemq.transport.DefaultTransportListener;
090import org.apache.activemq.transport.ResponseCorrelator;
091import org.apache.activemq.transport.Transport;
092import org.apache.activemq.transport.TransportDisposedIOException;
093import org.apache.activemq.transport.TransportFactory;
094import org.apache.activemq.util.*;
095import org.slf4j.Logger;
096import org.slf4j.LoggerFactory;
097import org.slf4j.MDC;
098
099public class TransportConnection implements Connection, Task, CommandVisitor {
100    private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
101    private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
102    private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
103    // Keeps track of the broker and connector that created this connection.
104    protected final Broker broker;
105    protected final TransportConnector connector;
106    // Keeps track of the state of the connections.
107    // protected final ConcurrentHashMap localConnectionStates=new
108    // ConcurrentHashMap();
109    protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
110    // The broker and wireformat info that was exchanged.
111    protected BrokerInfo brokerInfo;
112    protected final List<Command> dispatchQueue = new LinkedList<Command>();
113    protected TaskRunner taskRunner;
114    protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
115    protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
116    private MasterBroker masterBroker;
117    private final Transport transport;
118    private MessageAuthorizationPolicy messageAuthorizationPolicy;
119    private WireFormatInfo wireFormatInfo;
120    // Used to do async dispatch.. this should perhaps be pushed down into the
121    // transport layer..
122    private boolean inServiceException;
123    private final ConnectionStatistics statistics = new ConnectionStatistics();
124    private boolean manageable;
125    private boolean slow;
126    private boolean markedCandidate;
127    private boolean blockedCandidate;
128    private boolean blocked;
129    private boolean connected;
130    private boolean active;
131    private boolean starting;
132    private boolean pendingStop;
133    private long timeStamp;
134    private final AtomicBoolean stopping = new AtomicBoolean(false);
135    private final CountDownLatch stopped = new CountDownLatch(1);
136    private final AtomicBoolean asyncException = new AtomicBoolean(false);
137    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
138    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
139    private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
140    private ConnectionContext context;
141    private boolean networkConnection;
142    private boolean faultTolerantConnection;
143    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
144    private DemandForwardingBridge duplexBridge;
145    private final TaskRunnerFactory taskRunnerFactory;
146    private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
147    private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
148    private String duplexNetworkConnectorId;
149
150    /**
151     * @param connector
152     * @param transport
153     * @param broker
154     * @param taskRunnerFactory
155     *            - can be null if you want direct dispatch to the transport
156     *            else commands are sent async.
157     */
158    public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
159            TaskRunnerFactory taskRunnerFactory) {
160        this.connector = connector;
161        this.broker = broker;
162        this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
163        RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
164        brokerConnectionStates = rb.getConnectionStates();
165        if (connector != null) {
166            this.statistics.setParent(connector.getStatistics());
167        }
168        this.taskRunnerFactory = taskRunnerFactory;
169        this.transport = transport;
170        this.transport.setTransportListener(new DefaultTransportListener() {
171            @Override
172            public void onCommand(Object o) {
173                serviceLock.readLock().lock();
174                try {
175                    if (!(o instanceof Command)) {
176                        throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
177                    }
178                    Command command = (Command) o;
179                    Response response = service(command);
180                    if (response != null) {
181                        dispatchSync(response);
182                    }
183                } finally {
184                    serviceLock.readLock().unlock();
185                }
186            }
187
188            @Override
189            public void onException(IOException exception) {
190                serviceLock.readLock().lock();
191                try {
192                    serviceTransportException(exception);
193                } finally {
194                    serviceLock.readLock().unlock();
195                }
196            }
197        });
198        connected = true;
199    }
200
201    /**
202     * Returns the number of messages to be dispatched to this connection
203     * 
204     * @return size of dispatch queue
205     */
206    public int getDispatchQueueSize() {
207        synchronized (dispatchQueue) {
208            return dispatchQueue.size();
209        }
210    }
211
212    public void serviceTransportException(IOException e) {
213        BrokerService bService = connector.getBrokerService();
214        if (bService.isShutdownOnSlaveFailure()) {
215            if (brokerInfo != null) {
216                if (brokerInfo.isSlaveBroker()) {
217                    LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e);
218                    try {
219                        doStop();
220                        bService.stop();
221                    } catch (Exception ex) {
222                        LOG.warn("Failed to stop the master", ex);
223                    }
224                }
225            }
226        }
227        if (!stopping.get()) {
228            transportException.set(e);
229            if (TRANSPORTLOG.isDebugEnabled()) {
230                TRANSPORTLOG.debug("Transport failed: " + e, e);
231            } else if (TRANSPORTLOG.isInfoEnabled()) {
232                TRANSPORTLOG.info("Transport failed: " + e);
233            }
234            stopAsync();
235        }
236    }
237
238    /**
239     * Calls the serviceException method in an async thread. Since handling a
240     * service exception closes a socket, we should not tie up broker threads
241     * since client sockets may hang or cause deadlocks.
242     * 
243     * @param e
244     */
245    public void serviceExceptionAsync(final IOException e) {
246        if (asyncException.compareAndSet(false, true)) {
247            new Thread("Async Exception Handler") {
248                @Override
249                public void run() {
250                    serviceException(e);
251                }
252            }.start();
253        }
254    }
255
256    /**
257     * Closes a clients connection due to a detected error. Errors are ignored
258     * if: the client is closing or broker is closing. Otherwise, the connection
259     * error transmitted to the client before stopping it's transport.
260     */
261    public void serviceException(Throwable e) {
262        // are we a transport exception such as not being able to dispatch
263        // synchronously to a transport
264        if (e instanceof IOException) {
265            serviceTransportException((IOException) e);
266        } else if (e.getClass() == BrokerStoppedException.class) {
267            // Handle the case where the broker is stopped
268            // But the client is still connected.
269            if (!stopping.get()) {
270                if (SERVICELOG.isDebugEnabled()) {
271                    SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
272                }
273                ConnectionError ce = new ConnectionError();
274                ce.setException(e);
275                dispatchSync(ce);
276                // Wait a little bit to try to get the output buffer to flush
277                // the exption notification to the client.
278                try {
279                    Thread.sleep(500);
280                } catch (InterruptedException ie) {
281                    Thread.currentThread().interrupt();
282                }
283                // Worst case is we just kill the connection before the
284                // notification gets to him.
285                stopAsync();
286            }
287        } else if (!stopping.get() && !inServiceException) {
288            inServiceException = true;
289            try {
290                SERVICELOG.warn("Async error occurred: " + e, e);
291                ConnectionError ce = new ConnectionError();
292                ce.setException(e);
293                dispatchAsync(ce);
294            } finally {
295                inServiceException = false;
296            }
297        }
298    }
299
300    public Response service(Command command) {
301        MDC.put("activemq.connector", connector.getUri().toString());
302        Response response = null;
303        boolean responseRequired = command.isResponseRequired();
304        int commandId = command.getCommandId();
305        try {
306            response = command.visit(this);
307        } catch (Throwable e) {
308            if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
309                SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
310                        + " command: " + command + ", exception: " + e, e);
311            }
312            if (responseRequired) {
313                response = new ExceptionResponse(e);
314                if(e instanceof java.lang.SecurityException){
315                    //still need to close this down - incase the peer of this transport doesn't play nice
316                    delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage());
317                }
318            } else {
319                serviceException(e);
320            }
321        }
322        if (responseRequired) {
323            if (response == null) {
324                response = new Response();
325            }
326            response.setCorrelationId(commandId);
327        }
328        // The context may have been flagged so that the response is not
329        // sent.
330        if (context != null) {
331            if (context.isDontSendReponse()) {
332                context.setDontSendReponse(false);
333                response = null;
334            }
335            context = null;
336        }
337        MDC.remove("activemq.connector");
338        return response;
339    }
340
341    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
342        return null;
343    }
344
345    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
346        broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
347        return null;
348    }
349
350    public Response processWireFormat(WireFormatInfo info) throws Exception {
351        wireFormatInfo = info;
352        protocolVersion.set(info.getVersion());
353        return null;
354    }
355
356    public Response processShutdown(ShutdownInfo info) throws Exception {
357        stopAsync();
358        return null;
359    }
360
361    public Response processFlush(FlushCommand command) throws Exception {
362        return null;
363    }
364
365    public Response processBeginTransaction(TransactionInfo info) throws Exception {
366        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
367        context = null;
368        if (cs != null) {
369            context = cs.getContext();
370        }
371        if (cs == null) {
372            throw new NullPointerException("Context is null");
373        }
374        // Avoid replaying dup commands
375        if (cs.getTransactionState(info.getTransactionId()) == null) {
376            cs.addTransactionState(info.getTransactionId());
377            broker.beginTransaction(context, info.getTransactionId());
378        }
379        return null;
380    }
381
382    public Response processEndTransaction(TransactionInfo info) throws Exception {
383        // No need to do anything. This packet is just sent by the client
384        // make sure he is synced with the server as commit command could
385        // come from a different connection.
386        return null;
387    }
388
389    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
390        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
391        context = null;
392        if (cs != null) {
393            context = cs.getContext();
394        }
395        if (cs == null) {
396            throw new NullPointerException("Context is null");
397        }
398        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
399        if (transactionState == null) {
400            throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
401                    + info.getTransactionId());
402        }
403        // Avoid dups.
404        if (!transactionState.isPrepared()) {
405            transactionState.setPrepared(true);
406            int result = broker.prepareTransaction(context, info.getTransactionId());
407            transactionState.setPreparedResult(result);
408            if (result == XAResource.XA_RDONLY) {
409                // we are done, no further rollback or commit from TM
410                cs.removeTransactionState(info.getTransactionId());
411            }
412            IntegerResponse response = new IntegerResponse(result);
413            return response;
414        } else {
415            IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
416            return response;
417        }
418    }
419
420    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
421        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
422        context = cs.getContext();
423        cs.removeTransactionState(info.getTransactionId());
424        broker.commitTransaction(context, info.getTransactionId(), true);
425        return null;
426    }
427
428    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
429        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
430        context = cs.getContext();
431        cs.removeTransactionState(info.getTransactionId());
432        broker.commitTransaction(context, info.getTransactionId(), false);
433        return null;
434    }
435
436    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
437        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
438        context = cs.getContext();
439        cs.removeTransactionState(info.getTransactionId());
440        broker.rollbackTransaction(context, info.getTransactionId());
441        return null;
442    }
443
444    public Response processForgetTransaction(TransactionInfo info) throws Exception {
445        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
446        context = cs.getContext();
447        broker.forgetTransaction(context, info.getTransactionId());
448        return null;
449    }
450
451    public Response processRecoverTransactions(TransactionInfo info) throws Exception {
452        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
453        context = cs.getContext();
454        TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
455        return new DataArrayResponse(preparedTransactions);
456    }
457
458    public Response processMessage(Message messageSend) throws Exception {
459        ProducerId producerId = messageSend.getProducerId();
460        ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
461        if (producerExchange.canDispatch(messageSend)) {
462            broker.send(producerExchange, messageSend);
463        }
464        return null;
465    }
466
467    public Response processMessageAck(MessageAck ack) throws Exception {
468        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
469        broker.acknowledge(consumerExchange, ack);
470        return null;
471    }
472
473    public Response processMessagePull(MessagePull pull) throws Exception {
474        return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
475    }
476
477    public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
478        broker.processDispatchNotification(notification);
479        return null;
480    }
481
482    public Response processAddDestination(DestinationInfo info) throws Exception {
483        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
484        broker.addDestinationInfo(cs.getContext(), info);
485        if (info.getDestination().isTemporary()) {
486            cs.addTempDestination(info);
487        }
488        return null;
489    }
490
491    public Response processRemoveDestination(DestinationInfo info) throws Exception {
492        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
493        broker.removeDestinationInfo(cs.getContext(), info);
494        if (info.getDestination().isTemporary()) {
495            cs.removeTempDestination(info.getDestination());
496        }
497        return null;
498    }
499
500    public Response processAddProducer(ProducerInfo info) throws Exception {
501        SessionId sessionId = info.getProducerId().getParentId();
502        ConnectionId connectionId = sessionId.getParentId();
503        TransportConnectionState cs = lookupConnectionState(connectionId);
504        SessionState ss = cs.getSessionState(sessionId);
505        if (ss == null) {
506            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
507                    + sessionId);
508        }
509        // Avoid replaying dup commands
510        if (!ss.getProducerIds().contains(info.getProducerId())) {
511            broker.addProducer(cs.getContext(), info);
512            try {
513                ss.addProducer(info);
514            } catch (IllegalStateException e) {
515                broker.removeProducer(cs.getContext(), info);
516            }
517        }
518        return null;
519    }
520
521    public Response processRemoveProducer(ProducerId id) throws Exception {
522        SessionId sessionId = id.getParentId();
523        ConnectionId connectionId = sessionId.getParentId();
524        TransportConnectionState cs = lookupConnectionState(connectionId);
525        SessionState ss = cs.getSessionState(sessionId);
526        if (ss == null) {
527            throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
528                    + sessionId);
529        }
530        ProducerState ps = ss.removeProducer(id);
531        if (ps == null) {
532            throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
533        }
534        removeProducerBrokerExchange(id);
535        broker.removeProducer(cs.getContext(), ps.getInfo());
536        return null;
537    }
538
539    public Response processAddConsumer(ConsumerInfo info) throws Exception {
540        SessionId sessionId = info.getConsumerId().getParentId();
541        ConnectionId connectionId = sessionId.getParentId();
542        TransportConnectionState cs = lookupConnectionState(connectionId);
543        SessionState ss = cs.getSessionState(sessionId);
544        if (ss == null) {
545            throw new IllegalStateException(broker.getBrokerName()
546                    + " Cannot add a consumer to a session that had not been registered: " + sessionId);
547        }
548        // Avoid replaying dup commands
549        if (!ss.getConsumerIds().contains(info.getConsumerId())) {
550            broker.addConsumer(cs.getContext(), info);
551            try {
552                ss.addConsumer(info);
553            } catch (IllegalStateException e) {
554                broker.removeConsumer(cs.getContext(), info);
555            }
556        }
557        return null;
558    }
559
560    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
561        SessionId sessionId = id.getParentId();
562        ConnectionId connectionId = sessionId.getParentId();
563        TransportConnectionState cs = lookupConnectionState(connectionId);
564        if (cs == null) {
565            throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
566                    + connectionId);
567        }
568        SessionState ss = cs.getSessionState(sessionId);
569        if (ss == null) {
570            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
571                    + sessionId);
572        }
573        ConsumerState consumerState = ss.removeConsumer(id);
574        if (consumerState == null) {
575            throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
576        }
577        ConsumerInfo info = consumerState.getInfo();
578        info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
579        broker.removeConsumer(cs.getContext(), consumerState.getInfo());
580        removeConsumerBrokerExchange(id);
581        return null;
582    }
583
584    public Response processAddSession(SessionInfo info) throws Exception {
585        ConnectionId connectionId = info.getSessionId().getParentId();
586        TransportConnectionState cs = lookupConnectionState(connectionId);
587        // Avoid replaying dup commands
588        if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
589            broker.addSession(cs.getContext(), info);
590            try {
591                cs.addSession(info);
592            } catch (IllegalStateException e) {
593                e.printStackTrace();
594                broker.removeSession(cs.getContext(), info);
595            }
596        }
597        return null;
598    }
599
600    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
601        ConnectionId connectionId = id.getParentId();
602        TransportConnectionState cs = lookupConnectionState(connectionId);
603        if (cs == null) {
604            throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
605        }
606        SessionState session = cs.getSessionState(id);
607        if (session == null) {
608            throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
609        }
610        // Don't let new consumers or producers get added while we are closing
611        // this down.
612        session.shutdown();
613        // Cascade the connection stop to the consumers and producers.
614        for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) {
615            ConsumerId consumerId = (ConsumerId) iter.next();
616            try {
617                processRemoveConsumer(consumerId, lastDeliveredSequenceId);
618            } catch (Throwable e) {
619                LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
620            }
621        }
622        for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) {
623            ProducerId producerId = (ProducerId) iter.next();
624            try {
625                processRemoveProducer(producerId);
626            } catch (Throwable e) {
627                LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
628            }
629        }
630        cs.removeSession(id);
631        broker.removeSession(cs.getContext(), session.getInfo());
632        return null;
633    }
634
635    public Response processAddConnection(ConnectionInfo info) throws Exception {
636        // if the broker service has slave attached, wait for the slave to be
637        // attached to allow client connection. slave connection is fine
638        if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
639                && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
640            ServiceSupport.dispose(transport);
641            return new ExceptionResponse(new Exception("Master's slave not attached yet."));
642        }
643        // Older clients should have been defaulting this field to true.. but
644        // they were not.
645        if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
646            info.setClientMaster(true);
647        }
648        TransportConnectionState state;
649        // Make sure 2 concurrent connections by the same ID only generate 1
650        // TransportConnectionState object.
651        synchronized (brokerConnectionStates) {
652            state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
653            if (state == null) {
654                state = new TransportConnectionState(info, this);
655                brokerConnectionStates.put(info.getConnectionId(), state);
656            }
657            state.incrementReference();
658        }
659        // If there are 2 concurrent connections for the same connection id,
660        // then last one in wins, we need to sync here
661        // to figure out the winner.
662        synchronized (state.getConnectionMutex()) {
663            if (state.getConnection() != this) {
664                LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
665                state.getConnection().stop();
666                LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
667                        + state.getConnection().getRemoteAddress());
668                state.setConnection(this);
669                state.reset(info);
670            }
671        }
672        registerConnectionState(info.getConnectionId(), state);
673        LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress());
674        this.faultTolerantConnection=info.isFaultTolerant();
675        // Setup the context.
676        String clientId = info.getClientId();
677        context = new ConnectionContext();
678        context.setBroker(broker);
679        context.setClientId(clientId);
680        context.setClientMaster(info.isClientMaster());
681        context.setConnection(this);
682        context.setConnectionId(info.getConnectionId());
683        context.setConnector(connector);
684        context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
685        context.setNetworkConnection(networkConnection);
686        context.setFaultTolerant(faultTolerantConnection);
687        context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
688        context.setUserName(info.getUserName());
689        context.setWireFormatInfo(wireFormatInfo);
690        context.setReconnect(info.isFailoverReconnect());
691        this.manageable = info.isManageable();
692        state.setContext(context);
693        state.setConnection(this);
694       
695        try {
696            broker.addConnection(context, info);
697        } catch (Exception e) {
698            synchronized (brokerConnectionStates) {
699                brokerConnectionStates.remove(info.getConnectionId());
700            }
701            unregisterConnectionState(info.getConnectionId());
702            LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " +  e.toString());
703            if (LOG.isDebugEnabled()) {
704                LOG.debug("Exception detail:", e);
705            }
706            throw e;
707        }
708        if (info.isManageable()) {
709            // send ConnectionCommand
710            ConnectionControl command = this.connector.getConnectionControl();
711            command.setFaultTolerant(broker.isFaultTolerantConfiguration());
712            dispatchAsync(command);
713        }
714        return null;
715    }
716
717    public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
718            throws InterruptedException {
719        LOG.debug("remove connection id: " + id);
720        TransportConnectionState cs = lookupConnectionState(id);
721        if (cs != null) {
722            // Don't allow things to be added to the connection state while we
723            // are
724            // shutting down.
725            cs.shutdown();
726            // Cascade the connection stop to the sessions.
727            for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
728                SessionId sessionId = (SessionId) iter.next();
729                try {
730                    processRemoveSession(sessionId, lastDeliveredSequenceId);
731                } catch (Throwable e) {
732                    SERVICELOG.warn("Failed to remove session " + sessionId, e);
733                }
734            }
735            // Cascade the connection stop to temp destinations.
736            for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext();) {
737                DestinationInfo di = (DestinationInfo) iter.next();
738                try {
739                    broker.removeDestination(cs.getContext(), di.getDestination(), 0);
740                } catch (Throwable e) {
741                    SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
742                }
743                iter.remove();
744            }
745            try {
746                broker.removeConnection(cs.getContext(), cs.getInfo(), null);
747            } catch (Throwable e) {
748                SERVICELOG.warn("Failed to remove connection " + cs.getInfo() + ", reason: " + e.toString());
749                if (LOG.isDebugEnabled()) {
750                    SERVICELOG.debug("Exception detail:", e);
751                }
752            }
753            TransportConnectionState state = unregisterConnectionState(id);
754            if (state != null) {
755                synchronized (brokerConnectionStates) {
756                    // If we are the last reference, we should remove the state
757                    // from the broker.
758                    if (state.decrementReference() == 0) {
759                        brokerConnectionStates.remove(id);
760                    }
761                }
762            }
763        }
764        return null;
765    }
766
767    public Response processProducerAck(ProducerAck ack) throws Exception {
768        // A broker should not get ProducerAck messages.
769        return null;
770    }
771
772    public Connector getConnector() {
773        return connector;
774    }
775
776    public void dispatchSync(Command message) {
777        // getStatistics().getEnqueues().increment();
778        try {
779            processDispatch(message);
780        } catch (IOException e) {
781            serviceExceptionAsync(e);
782        }
783    }
784
785    public void dispatchAsync(Command message) {
786        if (!stopping.get()) {
787            // getStatistics().getEnqueues().increment();
788            if (taskRunner == null) {
789                dispatchSync(message);
790            } else {
791                synchronized (dispatchQueue) {
792                    dispatchQueue.add(message);
793                }
794                try {
795                    taskRunner.wakeup();
796                } catch (InterruptedException e) {
797                    Thread.currentThread().interrupt();
798                }
799            }
800        } else {
801            if (message.isMessageDispatch()) {
802                MessageDispatch md = (MessageDispatch) message;
803                Runnable sub = md.getTransmitCallback();
804                broker.postProcessDispatch(md);
805                if (sub != null) {
806                    sub.run();
807                }
808            }
809        }
810    }
811
812    protected void processDispatch(Command command) throws IOException {
813        final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
814        try {
815            if (!stopping.get()) {
816                if (messageDispatch != null) {
817                    broker.preProcessDispatch(messageDispatch);
818                }
819                dispatch(command);
820            }
821        } finally {
822            if (messageDispatch != null) {
823                Runnable sub = messageDispatch.getTransmitCallback();
824                broker.postProcessDispatch(messageDispatch);
825                if (sub != null) {
826                    sub.run();
827                }
828            }
829            // getStatistics().getDequeues().increment();
830        }
831    }
832
833    public boolean iterate() {
834        try {
835            if (stopping.get()) {
836                if (dispatchStopped.compareAndSet(false, true)) {
837                    if (transportException.get() == null) {
838                        try {
839                            dispatch(new ShutdownInfo());
840                        } catch (Throwable ignore) {
841                        }
842                    }
843                    dispatchStoppedLatch.countDown();
844                }
845                return false;
846            }
847            if (!dispatchStopped.get()) {
848                Command command = null;
849                synchronized (dispatchQueue) {
850                    if (dispatchQueue.isEmpty()) {
851                        return false;
852                    }
853                    command = dispatchQueue.remove(0);
854                }
855                processDispatch(command);
856                return true;
857            }
858            return false;
859        } catch (IOException e) {
860            if (dispatchStopped.compareAndSet(false, true)) {
861                dispatchStoppedLatch.countDown();
862            }
863            serviceExceptionAsync(e);
864            return false;
865        }
866    }
867
868    /**
869     * Returns the statistics for this connection
870     */
871    public ConnectionStatistics getStatistics() {
872        return statistics;
873    }
874
875    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
876        return messageAuthorizationPolicy;
877    }
878
879    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
880        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
881    }
882
883    public boolean isManageable() {
884        return manageable;
885    }
886
887    public void start() throws Exception {
888        starting = true;
889        try {
890            synchronized (this) {
891                if (taskRunnerFactory != null) {
892                    taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
893                            + getRemoteAddress());
894                } else {
895                    taskRunner = null;
896                }
897                transport.start();
898                active = true;
899                BrokerInfo info = connector.getBrokerInfo().copy();
900                if (connector.isUpdateClusterClients()) {
901                    info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
902                } else {
903                    info.setPeerBrokerInfos(null);
904                }
905                dispatchAsync(info);
906                
907                connector.onStarted(this);
908            }
909        } catch (Exception e) {
910            // Force clean up on an error starting up.
911            stop();
912            throw e;
913        } finally {
914            // stop() can be called from within the above block,
915            // but we want to be sure start() completes before
916            // stop() runs, so queue the stop until right now:
917            starting = false;
918            if (pendingStop) {
919                LOG.debug("Calling the delayed stop()");
920                stop();
921            }
922        }
923    }
924
925    public void stop() throws Exception {
926        synchronized (this) {
927            pendingStop = true;
928            if (starting) {
929                LOG.debug("stop() called in the middle of start(). Delaying...");
930                return;
931            }
932        }
933        stopAsync();
934        while (!stopped.await(5, TimeUnit.SECONDS)) {
935            LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
936        }
937    }
938
939    public void delayedStop(final int waitTime, final String reason) {
940        if (waitTime > 0) {
941            try {
942                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
943                    public void run() {
944                        try {
945                            Thread.sleep(waitTime);
946                            stopAsync();
947                            LOG.info("Stopping " + transport.getRemoteAddress() + " because " + reason);
948                        } catch (InterruptedException e) {
949                        }
950                    }
951                }, "delayedStop:" + transport.getRemoteAddress());
952            } catch (Throwable t) {
953                LOG.warn("cannot create stopAsync :", t);
954            }
955        }
956    }
957
958    public void stopAsync() {
959        // If we're in the middle of starting
960        // then go no further... for now.
961        if (stopping.compareAndSet(false, true)) {
962            // Let all the connection contexts know we are shutting down
963            // so that in progress operations can notice and unblock.
964            List<TransportConnectionState> connectionStates = listConnectionStates();
965            for (TransportConnectionState cs : connectionStates) {
966                cs.getContext().getStopping().set(true);
967            }
968            try {
969                final Map context = MDCHelper.getCopyOfContextMap();
970                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable(){
971                    public void run() {
972                        serviceLock.writeLock().lock();
973                        try {
974                            MDCHelper.setContextMap(context);
975                            doStop();
976                        } catch (Throwable e) {
977                            LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress()
978                                    + "': ", e);
979                        } finally {
980                            stopped.countDown();
981                            serviceLock.writeLock().unlock();
982                        }
983                    }
984                }, "StopAsync:" + transport.getRemoteAddress());
985            } catch (Throwable t) {
986                LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
987                stopped.countDown();
988            }
989        }
990    }
991
992    @Override
993    public String toString() {
994        return "Transport Connection to: " + transport.getRemoteAddress();
995    }
996
997    protected void doStop() throws Exception, InterruptedException {
998        LOG.debug("Stopping connection: " + transport.getRemoteAddress());
999        connector.onStopped(this);
1000        try {
1001            synchronized (this) {
1002                if (masterBroker != null) {
1003                    masterBroker.stop();
1004                }
1005                if (duplexBridge != null) {
1006                    duplexBridge.stop();
1007                }
1008            }
1009        } catch (Exception ignore) {
1010            LOG.trace("Exception caught stopping", ignore);
1011        }
1012        try {
1013            transport.stop();
1014            LOG.debug("Stopped transport: " + transport.getRemoteAddress());
1015        } catch (Exception e) {
1016            LOG.debug("Could not stop transport: " + e, e);
1017        }
1018        if (taskRunner != null) {
1019            taskRunner.shutdown(1);
1020        }
1021        active = false;
1022        // Run the MessageDispatch callbacks so that message references get
1023        // cleaned up.
1024        synchronized (dispatchQueue) {
1025            for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
1026                Command command = iter.next();
1027                if (command.isMessageDispatch()) {
1028                    MessageDispatch md = (MessageDispatch) command;
1029                    Runnable sub = md.getTransmitCallback();
1030                    broker.postProcessDispatch(md);
1031                    if (sub != null) {
1032                        sub.run();
1033                    }
1034                }
1035            }
1036            dispatchQueue.clear();
1037        }
1038        //
1039        // Remove all logical connection associated with this connection
1040        // from the broker.
1041        if (!broker.isStopped()) {
1042            List<TransportConnectionState> connectionStates = listConnectionStates();
1043            connectionStates = listConnectionStates();
1044            for (TransportConnectionState cs : connectionStates) {
1045                cs.getContext().getStopping().set(true);
1046                try {
1047                    LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
1048                    processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
1049                } catch (Throwable ignore) {
1050                    ignore.printStackTrace();
1051                }
1052            }
1053        }
1054        LOG.debug("Connection Stopped: " + getRemoteAddress());
1055    }
1056
1057    /**
1058     * @return Returns the blockedCandidate.
1059     */
1060    public boolean isBlockedCandidate() {
1061        return blockedCandidate;
1062    }
1063
1064    /**
1065     * @param blockedCandidate
1066     *            The blockedCandidate to set.
1067     */
1068    public void setBlockedCandidate(boolean blockedCandidate) {
1069        this.blockedCandidate = blockedCandidate;
1070    }
1071
1072    /**
1073     * @return Returns the markedCandidate.
1074     */
1075    public boolean isMarkedCandidate() {
1076        return markedCandidate;
1077    }
1078
1079    /**
1080     * @param markedCandidate
1081     *            The markedCandidate to set.
1082     */
1083    public void setMarkedCandidate(boolean markedCandidate) {
1084        this.markedCandidate = markedCandidate;
1085        if (!markedCandidate) {
1086            timeStamp = 0;
1087            blockedCandidate = false;
1088        }
1089    }
1090
1091    /**
1092     * @param slow
1093     *            The slow to set.
1094     */
1095    public void setSlow(boolean slow) {
1096        this.slow = slow;
1097    }
1098
1099    /**
1100     * @return true if the Connection is slow
1101     */
1102    public boolean isSlow() {
1103        return slow;
1104    }
1105
1106    /**
1107     * @return true if the Connection is potentially blocked
1108     */
1109    public boolean isMarkedBlockedCandidate() {
1110        return markedCandidate;
1111    }
1112
1113    /**
1114     * Mark the Connection, so we can deem if it's collectable on the next sweep
1115     */
1116    public void doMark() {
1117        if (timeStamp == 0) {
1118            timeStamp = System.currentTimeMillis();
1119        }
1120    }
1121
1122    /**
1123     * @return if after being marked, the Connection is still writing
1124     */
1125    public boolean isBlocked() {
1126        return blocked;
1127    }
1128
1129    /**
1130     * @return true if the Connection is connected
1131     */
1132    public boolean isConnected() {
1133        return connected;
1134    }
1135
1136    /**
1137     * @param blocked
1138     *            The blocked to set.
1139     */
1140    public void setBlocked(boolean blocked) {
1141        this.blocked = blocked;
1142    }
1143
1144    /**
1145     * @param connected
1146     *            The connected to set.
1147     */
1148    public void setConnected(boolean connected) {
1149        this.connected = connected;
1150    }
1151
1152    /**
1153     * @return true if the Connection is active
1154     */
1155    public boolean isActive() {
1156        return active;
1157    }
1158
1159    /**
1160     * @param active
1161     *            The active to set.
1162     */
1163    public void setActive(boolean active) {
1164        this.active = active;
1165    }
1166
1167    /**
1168     * @return true if the Connection is starting
1169     */
1170    public synchronized boolean isStarting() {
1171        return starting;
1172    }
1173
1174    public synchronized boolean isNetworkConnection() {
1175        return networkConnection;
1176    }
1177    
1178    public boolean isFaultTolerantConnection() {
1179       return this.faultTolerantConnection;
1180    }
1181
1182    protected synchronized void setStarting(boolean starting) {
1183        this.starting = starting;
1184    }
1185
1186    /**
1187     * @return true if the Connection needs to stop
1188     */
1189    public synchronized boolean isPendingStop() {
1190        return pendingStop;
1191    }
1192
1193    protected synchronized void setPendingStop(boolean pendingStop) {
1194        this.pendingStop = pendingStop;
1195    }
1196
1197    public Response processBrokerInfo(BrokerInfo info) {
1198        if (info.isSlaveBroker()) {
1199            BrokerService bService = connector.getBrokerService();
1200            // Do we only support passive slaves - or does the slave want to be
1201            // passive ?
1202            boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
1203            if (passive == false) {
1204                
1205                // stream messages from this broker (the master) to
1206                // the slave
1207                MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
1208                masterBroker = new MasterBroker(parent, transport);
1209                masterBroker.startProcessing();
1210            }
1211            LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName() + " is attached");
1212            bService.slaveConnectionEstablished();
1213        } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1214            // so this TransportConnection is the rear end of a network bridge
1215            // We have been requested to create a two way pipe ...
1216            try {
1217                Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1218                Map<String, String> props = createMap(properties);
1219                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1220                IntrospectionSupport.setProperties(config, props, "");
1221                config.setBrokerName(broker.getBrokerName());
1222
1223                // check for existing duplex connection hanging about
1224
1225                // We first look if existing network connection already exists for the same broker Id and network connector name
1226                // It's possible in case of brief network fault to have this transport connector side of the connection always active
1227                // and the duplex network connector side wanting to open a new one
1228                // In this case, the old connection must be broken
1229                String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); 
1230                CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1231                synchronized (connections) {
1232                    for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
1233                        TransportConnection c = iter.next();
1234                        if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1235                            LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
1236                            c.stopAsync();
1237                            // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1238                            c.getStopped().await(1, TimeUnit.SECONDS);
1239                        }
1240                    }
1241                    setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1242                }
1243                URI uri = broker.getVmConnectorURI();
1244                HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
1245                map.put("network", "true");
1246                map.put("async", "false");
1247                uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
1248                Transport localTransport = TransportFactory.connect(uri);
1249                Transport remoteBridgeTransport = new ResponseCorrelator(transport);
1250                String duplexName = localTransport.toString();
1251                if (duplexName.contains("#")) {
1252                    duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1253                }
1254                MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
1255                listener.setCreatedByDuplex(true);
1256                duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1257                duplexBridge.setBrokerService(broker.getBrokerService());
1258                // now turn duplex off this side
1259                info.setDuplexConnection(false);
1260                duplexBridge.setCreatedByDuplex(true);
1261                duplexBridge.duplexStart(this, brokerInfo, info);
1262                LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId);
1263                return null;
1264            } catch (TransportDisposedIOException e) {
1265                LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
1266                return null;
1267            } catch (Exception e) {
1268                LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId , e);
1269                return null;
1270            }
1271        }
1272        // We only expect to get one broker info command per connection
1273        if (this.brokerInfo != null) {
1274            LOG.warn("Unexpected extra broker info command received: " + info);
1275        }
1276        this.brokerInfo = info;
1277        networkConnection = true;
1278        List<TransportConnectionState> connectionStates = listConnectionStates();
1279        for (TransportConnectionState cs : connectionStates) {
1280            cs.getContext().setNetworkConnection(true);
1281        }
1282        return null;
1283    }
1284
1285    @SuppressWarnings("unchecked")
1286    private HashMap<String, String> createMap(Properties properties) {
1287        return new HashMap(properties);
1288    }
1289
1290    protected void dispatch(Command command) throws IOException {
1291        try {
1292            setMarkedCandidate(true);
1293            transport.oneway(command);
1294        } finally {
1295            setMarkedCandidate(false);
1296        }
1297    }
1298
1299    public String getRemoteAddress() {
1300        return transport.getRemoteAddress();
1301    }
1302
1303    public String getConnectionId() {
1304        List<TransportConnectionState> connectionStates = listConnectionStates();
1305        for (TransportConnectionState cs : connectionStates) {
1306            if (cs.getInfo().getClientId() != null) {
1307                return cs.getInfo().getClientId();
1308            }
1309            return cs.getInfo().getConnectionId().toString();
1310        }
1311        return null;
1312    }
1313        
1314    public void updateClient(ConnectionControl control) {
1315        if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1316                && this.wireFormatInfo.getVersion() >= 6) {
1317            dispatchAsync(control);
1318        }
1319    }
1320
1321    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1322        ProducerBrokerExchange result = producerExchanges.get(id);
1323        if (result == null) {
1324            synchronized (producerExchanges) {
1325                result = new ProducerBrokerExchange();
1326                TransportConnectionState state = lookupConnectionState(id);              
1327                context = state.getContext();
1328                if (context.isReconnect()) {
1329                    result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
1330                }
1331                result.setConnectionContext(context);
1332                SessionState ss = state.getSessionState(id.getParentId());
1333                if (ss != null) {
1334                    result.setProducerState(ss.getProducerState(id));
1335                    ProducerState producerState = ss.getProducerState(id);
1336                    if (producerState != null && producerState.getInfo() != null) {
1337                        ProducerInfo info = producerState.getInfo();
1338                        result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1339                    }
1340                }
1341                producerExchanges.put(id, result);
1342            }
1343        } else {
1344            context = result.getConnectionContext();
1345        }
1346        return result;
1347    }
1348
1349    private void removeProducerBrokerExchange(ProducerId id) {
1350        synchronized (producerExchanges) {
1351            producerExchanges.remove(id);
1352        }
1353    }
1354
1355    private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1356        ConsumerBrokerExchange result = consumerExchanges.get(id);
1357        if (result == null) {
1358            synchronized (consumerExchanges) {
1359                result = new ConsumerBrokerExchange();
1360                TransportConnectionState state = lookupConnectionState(id);
1361                context = state.getContext();
1362                result.setConnectionContext(context);
1363                SessionState ss = state.getSessionState(id.getParentId());
1364                if (ss != null) {
1365                    ConsumerState cs = ss.getConsumerState(id);
1366                    if (cs != null) {
1367                        ConsumerInfo info = cs.getInfo();
1368                        if (info != null) {
1369                            if (info.getDestination() != null && info.getDestination().isPattern()) {
1370                                result.setWildcard(true);
1371                            }
1372                        }
1373                    }
1374                }
1375                consumerExchanges.put(id, result);
1376            }
1377        }
1378        return result;
1379    }
1380
1381    private void removeConsumerBrokerExchange(ConsumerId id) {
1382        synchronized (consumerExchanges) {
1383            consumerExchanges.remove(id);
1384        }
1385    }
1386
1387    public int getProtocolVersion() {
1388        return protocolVersion.get();
1389    }
1390
1391    public Response processControlCommand(ControlCommand command) throws Exception {
1392        String control = command.getCommand();
1393        if (control != null && control.equals("shutdown")) {
1394            System.exit(0);
1395        }
1396        return null;
1397    }
1398
1399    public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1400        return null;
1401    }
1402
1403    public Response processConnectionControl(ConnectionControl control) throws Exception {
1404        if (control != null) {
1405            faultTolerantConnection = control.isFaultTolerant();
1406        }
1407        return null;
1408    }
1409
1410    public Response processConnectionError(ConnectionError error) throws Exception {
1411        return null;
1412    }
1413
1414    public Response processConsumerControl(ConsumerControl control) throws Exception {
1415        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1416        broker.processConsumerControl(consumerExchange, control);
1417        return null;
1418    }
1419
1420    protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1421            TransportConnectionState state) {
1422        TransportConnectionState cs = null;
1423        if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1424            // swap implementations
1425            TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1426            newRegister.intialize(connectionStateRegister);
1427            connectionStateRegister = newRegister;
1428        }
1429        cs = connectionStateRegister.registerConnectionState(connectionId, state);
1430        return cs;
1431    }
1432
1433    protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1434        return connectionStateRegister.unregisterConnectionState(connectionId);
1435    }
1436
1437    protected synchronized List<TransportConnectionState> listConnectionStates() {
1438        return connectionStateRegister.listConnectionStates();
1439    }
1440
1441    protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1442        return connectionStateRegister.lookupConnectionState(connectionId);
1443    }
1444
1445    protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1446        return connectionStateRegister.lookupConnectionState(id);
1447    }
1448
1449    protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1450        return connectionStateRegister.lookupConnectionState(id);
1451    }
1452
1453    protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1454        return connectionStateRegister.lookupConnectionState(id);
1455    }
1456
1457    protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1458        return connectionStateRegister.lookupConnectionState(connectionId);
1459    }
1460
1461    protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1462        this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1463    }
1464
1465    protected synchronized String getDuplexNetworkConnectorId() {
1466        return this.duplexNetworkConnectorId;
1467    }
1468    
1469    protected CountDownLatch getStopped() {
1470        return stopped;
1471    }
1472}