001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.security.GeneralSecurityException;
021import java.security.cert.X509Certificate;
022import java.util.*;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028
029import org.apache.activemq.Service;
030import org.apache.activemq.advisory.AdvisorySupport;
031import org.apache.activemq.broker.BrokerService;
032import org.apache.activemq.broker.BrokerServiceAware;
033import org.apache.activemq.broker.TransportConnection;
034import org.apache.activemq.broker.region.AbstractRegion;
035import org.apache.activemq.broker.region.RegionBroker;
036import org.apache.activemq.broker.region.Subscription;
037import org.apache.activemq.command.ActiveMQDestination;
038import org.apache.activemq.command.ActiveMQMessage;
039import org.apache.activemq.command.ActiveMQTempDestination;
040import org.apache.activemq.command.ActiveMQTopic;
041import org.apache.activemq.command.BrokerId;
042import org.apache.activemq.command.BrokerInfo;
043import org.apache.activemq.command.Command;
044import org.apache.activemq.command.ConnectionError;
045import org.apache.activemq.command.ConnectionId;
046import org.apache.activemq.command.ConnectionInfo;
047import org.apache.activemq.command.ConsumerId;
048import org.apache.activemq.command.ConsumerInfo;
049import org.apache.activemq.command.DataStructure;
050import org.apache.activemq.command.DestinationInfo;
051import org.apache.activemq.command.ExceptionResponse;
052import org.apache.activemq.command.KeepAliveInfo;
053import org.apache.activemq.command.Message;
054import org.apache.activemq.command.MessageAck;
055import org.apache.activemq.command.MessageDispatch;
056import org.apache.activemq.command.NetworkBridgeFilter;
057import org.apache.activemq.command.ProducerInfo;
058import org.apache.activemq.command.RemoveInfo;
059import org.apache.activemq.command.Response;
060import org.apache.activemq.command.SessionInfo;
061import org.apache.activemq.command.ShutdownInfo;
062import org.apache.activemq.command.WireFormatInfo;
063import org.apache.activemq.filter.DestinationFilter;
064import org.apache.activemq.filter.MessageEvaluationContext;
065import org.apache.activemq.thread.DefaultThreadPools;
066import org.apache.activemq.thread.TaskRunnerFactory;
067import org.apache.activemq.transport.DefaultTransportListener;
068import org.apache.activemq.transport.FutureResponse;
069import org.apache.activemq.transport.ResponseCallback;
070import org.apache.activemq.transport.Transport;
071import org.apache.activemq.transport.TransportDisposedIOException;
072import org.apache.activemq.transport.TransportFilter;
073import org.apache.activemq.transport.TransportListener;
074import org.apache.activemq.transport.tcp.SslTransport;
075import org.apache.activemq.util.*;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078import org.slf4j.MDC;
079
080/**
081 * A useful base class for implementing demand forwarding bridges.
082 * 
083 * 
084 */
085public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
086    private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
087    private final TaskRunnerFactory asyncTaskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory();
088    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
089    protected final Transport localBroker;
090    protected final Transport remoteBroker;
091    protected final IdGenerator idGenerator = new IdGenerator();
092    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
093    protected ConnectionInfo localConnectionInfo;
094    protected ConnectionInfo remoteConnectionInfo;
095    protected SessionInfo localSessionInfo;
096    protected ProducerInfo producerInfo;
097    protected String remoteBrokerName = "Unknown";
098    protected String localClientId;
099    protected ConsumerInfo demandConsumerInfo;
100    protected int demandConsumerDispatched;
101    protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
102    protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
103    protected AtomicBoolean disposed = new AtomicBoolean();
104    protected BrokerId localBrokerId;
105    protected ActiveMQDestination[] excludedDestinations;
106    protected ActiveMQDestination[] dynamicallyIncludedDestinations;
107    protected ActiveMQDestination[] staticallyIncludedDestinations;
108    protected ActiveMQDestination[] durableDestinations;
109    protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
110    protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
111    protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
112    protected CountDownLatch startedLatch = new CountDownLatch(2);
113    protected CountDownLatch localStartedLatch = new CountDownLatch(1);
114    protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
115    protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1);
116    protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
117    protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
118    protected NetworkBridgeConfiguration configuration;
119
120    final AtomicLong enqueueCounter = new AtomicLong();
121    final AtomicLong dequeueCounter = new AtomicLong();
122
123    private NetworkBridgeListener networkBridgeListener;
124    private boolean createdByDuplex;
125    private BrokerInfo localBrokerInfo;
126    private BrokerInfo remoteBrokerInfo;
127
128    private final AtomicBoolean started = new AtomicBoolean();
129    private TransportConnection duplexInitiatingConnection;
130    private BrokerService brokerService = null;
131
132    public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
133        this.configuration = configuration;
134        this.localBroker = localBroker;
135        this.remoteBroker = remoteBroker;
136    }
137
138    public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
139        this.localBrokerInfo = localBrokerInfo;
140        this.remoteBrokerInfo = remoteBrokerInfo;
141        this.duplexInitiatingConnection = connection;
142        start();
143        serviceRemoteCommand(remoteBrokerInfo);
144    }
145
146    public void start() throws Exception {
147        if (started.compareAndSet(false, true)) {
148            localBroker.setTransportListener(new DefaultTransportListener() {
149
150                @Override
151                public void onCommand(Object o) {
152                    Command command = (Command) o;
153                    serviceLocalCommand(command);
154                }
155
156                @Override
157                public void onException(IOException error) {
158                    serviceLocalException(error);
159                }
160            });
161            remoteBroker.setTransportListener(new TransportListener() {
162
163                public void onCommand(Object o) {
164                    Command command = (Command) o;
165                    serviceRemoteCommand(command);
166                }
167
168                public void onException(IOException error) {
169                    serviceRemoteException(error);
170                }
171
172                public void transportInterupted() {
173                    // clear any subscriptions - to try and prevent the bridge
174                    // from stalling the broker
175                    if (remoteInterupted.compareAndSet(false, true)) {
176                        LOG.info("Outbound transport to " + remoteBrokerName + " interrupted.");
177                        if (localBridgeStarted.get()) {
178                            clearDownSubscriptions();
179                            synchronized (DemandForwardingBridgeSupport.this) {
180                                try {
181                                    localBroker.oneway(localConnectionInfo.createRemoveCommand());
182                                } catch (TransportDisposedIOException td) {
183                                    LOG.debug("local broker is now disposed", td);
184                                } catch (IOException e) {
185                                    LOG.warn("Caught exception from local start", e);
186                                }
187                            }
188                        }
189                        localBridgeStarted.set(false);
190                        remoteBridgeStarted.set(false);
191                        startedLatch = new CountDownLatch(2);
192                        localStartedLatch = new CountDownLatch(1);
193                    }
194                }
195
196                public void transportResumed() {
197                    if (remoteInterupted.compareAndSet(true, false)) {
198                        // We want to slow down false connects so that we don't
199                        // get in a busy loop.
200                        // False connects can occurr if you using SSH tunnels.
201                        if (!lastConnectSucceeded.get()) {
202                            try {
203                                LOG.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");
204                                Thread.sleep(1000);
205                            } catch (InterruptedException e) {
206                                Thread.currentThread().interrupt();
207                            }
208                        }
209                        lastConnectSucceeded.set(false);
210                        try {
211                            startLocalBridge();
212                            remoteBridgeStarted.set(true);
213                            startedLatch.countDown();
214                            LOG.info("Outbound transport to " + remoteBrokerName + " resumed");
215                        } catch (Throwable e) {
216                            LOG.error("Caught exception  from local start in resume transport", e);
217                            serviceLocalException(e);
218                        }
219                    }
220                }
221            });
222
223            localBroker.start();
224            remoteBroker.start();
225            if (!disposed.get()) {
226                try {
227                    triggerRemoteStartBridge();
228                } catch (IOException e) {
229                    LOG.warn("Caught exception from remote start", e);
230                }
231            } else {
232                LOG.warn ("Bridge was disposed before the start() method was fully executed.");
233                throw new TransportDisposedIOException();
234            }
235        }
236    }
237
238    protected void triggerLocalStartBridge() throws IOException {
239        final Map context = MDCHelper.getCopyOfContextMap();
240        asyncTaskRunner.execute(new Runnable() {
241            public void run() {
242                MDCHelper.setContextMap(context);
243                final String originalName = Thread.currentThread().getName();
244                Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
245                try {
246                    startLocalBridge();
247                } catch (Throwable e) {
248                    serviceLocalException(e);
249                } finally {
250                    Thread.currentThread().setName(originalName);
251                }
252            }
253        });
254    }
255
256    protected void triggerRemoteStartBridge() throws IOException {
257        final Map context = MDCHelper.getCopyOfContextMap();
258        asyncTaskRunner.execute(new Runnable() {
259            public void run() {
260                MDCHelper.setContextMap(context);
261                final String originalName = Thread.currentThread().getName();
262                Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker);
263                try {
264                    startRemoteBridge();
265                } catch (Exception e) {
266                    serviceRemoteException(e);
267                } finally {
268                    Thread.currentThread().setName(originalName);
269                }
270            }
271        });
272    }
273
274    protected void startLocalBridge() throws Throwable {
275        if (localBridgeStarted.compareAndSet(false, true)) {
276            synchronized (this) {
277                if (LOG.isTraceEnabled()) {
278                    LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker);
279                }
280                remoteBrokerNameKnownLatch.await();
281
282                if (!disposed.get()) {
283                    localConnectionInfo = new ConnectionInfo();
284                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
285                    localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
286                    localConnectionInfo.setClientId(localClientId);
287                    localConnectionInfo.setUserName(configuration.getUserName());
288                    localConnectionInfo.setPassword(configuration.getPassword());
289                    Transport originalTransport = remoteBroker;
290                    while (originalTransport instanceof TransportFilter) {
291                        originalTransport = ((TransportFilter) originalTransport).getNext();
292                    }
293                    if (originalTransport instanceof SslTransport) {
294                        X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
295                        localConnectionInfo.setTransportContext(peerCerts);
296                    }
297                    // sync requests that may fail
298                    Object resp = localBroker.request(localConnectionInfo);
299                    if (resp instanceof ExceptionResponse) {
300                        throw ((ExceptionResponse)resp).getException();
301                    }
302                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
303                    localBroker.oneway(localSessionInfo);
304
305                    brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex);
306                    NetworkBridgeListener l = this.networkBridgeListener;
307                    if (l != null) {
308                        l.onStart(this);
309                    }
310                    LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
311
312                } else {
313                    LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed.");
314                }
315                startedLatch.countDown();
316                localStartedLatch.countDown();
317                if (!disposed.get()) {
318                    setupStaticDestinations();
319                } else {
320                    LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment.");
321                }
322            }
323        }
324    }
325
326    protected void startRemoteBridge() throws Exception {
327        if (remoteBridgeStarted.compareAndSet(false, true)) {
328            if (LOG.isTraceEnabled()) {
329                LOG.trace(configuration.getBrokerName() + " starting remote Bridge, localBroker=" + localBroker);
330            }
331            synchronized (this) {
332                if (!isCreatedByDuplex()) {
333                    BrokerInfo brokerInfo = new BrokerInfo();
334                    brokerInfo.setBrokerName(configuration.getBrokerName());
335                    brokerInfo.setBrokerURL(configuration.getBrokerURL());
336                    brokerInfo.setNetworkConnection(true);
337                    brokerInfo.setDuplexConnection(configuration.isDuplex());
338                    // set our properties
339                    Properties props = new Properties();
340                    IntrospectionSupport.getProperties(configuration, props, null);
341                    String str = MarshallingSupport.propertiesToString(props);
342                    brokerInfo.setNetworkProperties(str);
343                    brokerInfo.setBrokerId(this.localBrokerId);
344                    remoteBroker.oneway(brokerInfo);
345                }
346                if (remoteConnectionInfo != null) {
347                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
348                }
349                remoteConnectionInfo = new ConnectionInfo();
350                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
351                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
352                remoteConnectionInfo.setUserName(configuration.getUserName());
353                remoteConnectionInfo.setPassword(configuration.getPassword());
354                remoteBroker.oneway(remoteConnectionInfo);
355
356                SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
357                remoteBroker.oneway(remoteSessionInfo);
358                producerInfo = new ProducerInfo(remoteSessionInfo, 1);
359                producerInfo.setResponseRequired(false);
360                remoteBroker.oneway(producerInfo);
361                // Listen to consumer advisory messages on the remote broker to
362                // determine demand.
363                demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
364                demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
365                String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + configuration.getDestinationFilter();
366                if (configuration.isBridgeTempDestinations()) {
367                    advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
368                }
369                demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
370                demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
371                remoteBroker.oneway(demandConsumerInfo);
372                startedLatch.countDown();
373                if (!disposed.get()) {
374                    triggerLocalStartBridge();
375                }
376            }
377        }
378    }
379
380    public void stop() throws Exception {
381        if (started.compareAndSet(true, false)) {
382            if (disposed.compareAndSet(false, true)) {
383                LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
384                NetworkBridgeListener l = this.networkBridgeListener;
385                if (l != null) {
386                    l.onStop(this);
387                }
388                try {
389                    remoteBridgeStarted.set(false);
390                    final CountDownLatch sendShutdown = new CountDownLatch(1);
391                    final Map map = MDCHelper.getCopyOfContextMap();
392                    asyncTaskRunner.execute(new Runnable() {
393                        public void run() {
394                            try {
395                                MDCHelper.setContextMap(map);
396                                localBroker.oneway(new ShutdownInfo());
397                                sendShutdown.countDown();
398                                remoteBroker.oneway(new ShutdownInfo());
399                            } catch (Throwable e) {
400                                LOG.debug("Caught exception sending shutdown", e);
401                            } finally {
402                                sendShutdown.countDown();
403                            }
404
405                        }
406                    });
407                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
408                        LOG.info("Network Could not shutdown in a timely manner");
409                    }
410                } finally {
411                    ServiceStopper ss = new ServiceStopper();
412                    ss.stop(remoteBroker);
413                    ss.stop(localBroker);
414                    // Release the started Latch since another thread could be
415                    // stuck waiting for it to start up.
416                    startedLatch.countDown();
417                    startedLatch.countDown();
418                    localStartedLatch.countDown();
419                    ss.throwFirstException();
420                }
421            }
422            brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
423            brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
424            LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
425            remoteBrokerNameKnownLatch.countDown();
426        }
427    }
428
429    public void serviceRemoteException(Throwable error) {
430        if (!disposed.get()) {
431            if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
432                LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
433            } else {
434                LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
435            }
436            LOG.debug("The remote Exception was: " + error, error);
437            final Map map = MDCHelper.getCopyOfContextMap();
438            asyncTaskRunner.execute(new Runnable() {
439                public void run() {
440                    MDCHelper.setContextMap(map);
441                    ServiceSupport.dispose(getControllingService());
442                }
443            });
444            fireBridgeFailed();
445        }
446    }
447
448    protected void serviceRemoteCommand(Command command) {
449        if (!disposed.get()) {
450            try {
451                if (command.isMessageDispatch()) {
452                    waitStarted();
453                    MessageDispatch md = (MessageDispatch) command;
454                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
455                    demandConsumerDispatched++;
456                    if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
457                        remoteBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched));
458                        demandConsumerDispatched = 0;
459                    }
460                } else if (command.isBrokerInfo()) {
461                    lastConnectSucceeded.set(true);
462                    remoteBrokerInfo = (BrokerInfo) command;
463                    Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
464                    try {
465                        IntrospectionSupport.getProperties(configuration, props, null);
466                        if (configuration.getExcludedDestinations() != null) {
467                            excludedDestinations = configuration.getExcludedDestinations().toArray(
468                                    new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
469                        }
470                        if (configuration.getStaticallyIncludedDestinations() != null) {
471                            staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
472                                    new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
473                        }
474                        if (configuration.getDynamicallyIncludedDestinations() != null) {
475                            dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations()
476                                    .toArray(
477                                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations()
478                                                    .size()]);
479                        }
480                    } catch (Throwable t) {
481                        LOG.error("Error mapping remote destinations", t);
482                    }
483                    serviceRemoteBrokerInfo(command);
484                    // Let the local broker know the remote broker's ID.
485                    localBroker.oneway(command);
486                    // new peer broker (a consumer can work with remote broker also)
487                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
488                } else if (command.getClass() == ConnectionError.class) {
489                    ConnectionError ce = (ConnectionError) command;
490                    serviceRemoteException(ce.getException());
491                } else {
492                    if (isDuplex()) {
493                        if (command.isMessage()) {
494                            ActiveMQMessage message = (ActiveMQMessage) command;
495                            if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) 
496                                || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
497                                serviceRemoteConsumerAdvisory(message.getDataStructure());
498                            } else {
499                                if (!isPermissableDestination(message.getDestination(), true)) {
500                                    return;
501                                }
502                                if (message.isResponseRequired()) {
503                                    Response reply = new Response();
504                                    reply.setCorrelationId(message.getCommandId());
505                                    localBroker.oneway(message);
506                                    remoteBroker.oneway(reply);
507                                } else {
508                                    localBroker.oneway(message);
509                                }
510                            }
511                        } else {
512                            switch (command.getDataStructureType()) {
513                            case ConnectionInfo.DATA_STRUCTURE_TYPE:
514                            case SessionInfo.DATA_STRUCTURE_TYPE:
515                            case ProducerInfo.DATA_STRUCTURE_TYPE:
516                                localBroker.oneway(command);
517                                break;
518                            case ConsumerInfo.DATA_STRUCTURE_TYPE:
519                                localStartedLatch.await();
520                                if (started.get()) {
521                                    if (!addConsumerInfo((ConsumerInfo) command)) {
522                                        if (LOG.isDebugEnabled()) {
523                                            LOG.debug("Ignoring ConsumerInfo: " + command);
524                                        }
525                                    } else {
526                                        if (LOG.isTraceEnabled()) {
527                                            LOG.trace("Adding ConsumerInfo: " + command);
528                                        }
529                                    }
530                                } else {
531                                    // received a subscription whilst stopping
532                                    LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
533                                }
534                                break;
535                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
536                                // initiator is shutting down, controlled case
537                                // abortive close dealt with by inactivity monitor
538                                LOG.info("Stopping network bridge on shutdown of remote broker");
539                                serviceRemoteException(new IOException(command.toString()));
540                                break;
541                            default:
542                                if (LOG.isDebugEnabled()) {
543                                    LOG.debug("Ignoring remote command: " + command);
544                                }
545                            }
546                        }
547                    } else {
548                        switch (command.getDataStructureType()) {
549                        case KeepAliveInfo.DATA_STRUCTURE_TYPE:
550                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
551                        case ShutdownInfo.DATA_STRUCTURE_TYPE:
552                            break;
553                        default:
554                            LOG.warn("Unexpected remote command: " + command);
555                        }
556                    }
557                }
558            } catch (Throwable e) {
559                if (LOG.isDebugEnabled()) {
560                    LOG.debug("Exception processing remote command: " + command, e);
561                }
562                serviceRemoteException(e);
563            }
564        }
565    }
566
567    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
568        final int networkTTL = configuration.getNetworkTTL();
569        if (data.getClass() == ConsumerInfo.class) {
570            // Create a new local subscription
571            ConsumerInfo info = (ConsumerInfo) data;
572            BrokerId[] path = info.getBrokerPath();
573
574            if (info.isBrowser()) {
575                if (LOG.isDebugEnabled()) {
576                    LOG.info(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", browsers explicitly suppressed");
577                }
578                return;
579            }
580
581            if (path != null && path.length >= networkTTL) {
582                if (LOG.isDebugEnabled()) {
583                    LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info);
584                }
585                return;
586            }
587            if (contains(path, localBrokerPath[0])) {
588                // Ignore this consumer as it's a consumer we locally sent to the broker.
589                if (LOG.isDebugEnabled()) {
590                    LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info);
591                }
592                return;
593            }
594            if (!isPermissableDestination(info.getDestination())) {
595                // ignore if not in the permitted or in the excluded list
596                if (LOG.isDebugEnabled()) {
597                    LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info);
598                }
599                return;
600            }
601
602            // in a cyclic network there can be multiple bridges per broker that can propagate
603            // a network subscription so there is a need to synchronise on a shared entity
604            synchronized (brokerService.getVmConnectorURI()) {
605                if (addConsumerInfo(info)) {
606                    if (LOG.isDebugEnabled()) {
607                        LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
608                    }
609                } else {
610                    if (LOG.isDebugEnabled()) {
611                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info);
612                    }
613                }
614            }
615        } else if (data.getClass() == DestinationInfo.class) {
616            // It's a destination info - we want to pass up
617            // information about temporary destinations
618            DestinationInfo destInfo = (DestinationInfo) data;
619            BrokerId[] path = destInfo.getBrokerPath();
620            if (path != null && path.length >= networkTTL) {
621                if (LOG.isDebugEnabled()) {
622                    LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only");
623                }
624                return;
625            }
626            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
627                // Ignore this consumer as it's a consumer we locally sent to
628                // the broker.
629                if (LOG.isDebugEnabled()) {
630                    LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
631                }
632                return;
633            }
634            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
635            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
636                // re-set connection id so comes from here
637                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
638                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
639            }
640            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
641            if (LOG.isTraceEnabled()) {
642                LOG.trace("bridging destination control command: " + destInfo);
643            }
644            localBroker.oneway(destInfo);
645        } else if (data.getClass() == RemoveInfo.class) {
646            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
647            removeDemandSubscription(id);
648        }
649    }
650
651    public void serviceLocalException(Throwable error) {
652        if (!disposed.get()) {
653            LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
654            LOG.debug("The local Exception was:" + error, error);
655            final Map map = MDCHelper.getCopyOfContextMap();
656            asyncTaskRunner.execute(new Runnable() {
657                public void run() {
658                    MDCHelper.setContextMap(map);
659                    ServiceSupport.dispose(getControllingService());
660                }
661            });
662            fireBridgeFailed();
663        }
664    }
665
666    protected Service getControllingService() {
667        return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
668    }
669
670    protected void addSubscription(DemandSubscription sub) throws IOException {
671        if (sub != null) {
672            localBroker.oneway(sub.getLocalInfo());
673        }
674    }
675
676    protected void removeSubscription(final DemandSubscription sub) throws IOException {
677        if (sub != null) {
678            if (LOG.isDebugEnabled()) {
679                LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
680            }
681            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
682
683            // continue removal in separate thread to free up this thread for outstanding responses
684            final Map map = MDCHelper.getCopyOfContextMap();
685            asyncTaskRunner.execute(new Runnable() {
686                public void run() {
687                    MDCHelper.setContextMap(map);
688                    sub.waitForCompletion();
689                    try {
690                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
691                    } catch (IOException e) {
692                        LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e);
693                    }
694                }
695            });
696        }
697    }
698
699    protected Message configureMessage(MessageDispatch md) {
700        Message message = md.getMessage().copy();
701        // Update the packet to show where it came from.
702        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
703        message.setProducerId(producerInfo.getProducerId());
704        message.setDestination(md.getDestination());
705        if (message.getOriginalTransactionId() == null) {
706            message.setOriginalTransactionId(message.getTransactionId());
707        }
708        message.setTransactionId(null);
709        return message;
710    }
711
712    protected void serviceLocalCommand(Command command) {
713        if (!disposed.get()) {
714            try {
715                if (command.isMessageDispatch()) {
716                    enqueueCounter.incrementAndGet();
717                    final MessageDispatch md = (MessageDispatch) command;
718                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
719                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
720                        
721                        if (suppressMessageDispatch(md, sub)) {
722                            if (LOG.isDebugEnabled()) {
723                                LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage());
724                            }
725                            // still ack as it may be durable
726                            try {
727                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
728                            } finally {
729                                sub.decrementOutstandingResponses();
730                            }
731                            return;
732                        }
733                        
734                        Message message = configureMessage(md);
735                        if (LOG.isDebugEnabled()) {
736                            LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
737                        }
738                        
739                        if (!message.isResponseRequired()) {
740                            
741                            // If the message was originally sent using async
742                            // send, we will preserve that QOS
743                            // by bridging it using an async send (small chance
744                            // of message loss).
745                            try {
746                                remoteBroker.oneway(message);
747                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
748                                dequeueCounter.incrementAndGet();
749                            } finally {
750                                sub.decrementOutstandingResponses();
751                            }
752                            
753                        } else {
754                            
755                            // The message was not sent using async send, so we
756                            // should only ack the local
757                            // broker when we get confirmation that the remote
758                            // broker has received the message.
759                            ResponseCallback callback = new ResponseCallback() {
760                                public void onCompletion(FutureResponse future) {
761                                    try {
762                                        Response response = future.getResult();
763                                        if (response.isException()) {
764                                            ExceptionResponse er = (ExceptionResponse) response;
765                                            serviceLocalException(er.getException());
766                                        } else {
767                                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
768                                            dequeueCounter.incrementAndGet();
769                                        }   
770                                    } catch (IOException e) {
771                                        serviceLocalException(e);
772                                    } finally {
773                                        sub.decrementOutstandingResponses();
774                                    }
775                                }
776                            };
777                            
778                            remoteBroker.asyncRequest(message, callback);
779                            
780                        }
781                    } else {
782                        if (LOG.isDebugEnabled()) {
783                            LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
784                        }
785                    }
786                } else if (command.isBrokerInfo()) {
787                    localBrokerInfo = (BrokerInfo) command;
788                    serviceLocalBrokerInfo(command);
789                } else if (command.isShutdownInfo()) {
790                    LOG.info(configuration.getBrokerName() + " Shutting down");
791                    // Don't shut down the whole connector if the remote side
792                    // was interrupted.
793                    // the local transport is just shutting down temporarily
794                    // until the remote side
795                    // is restored.
796                    if (!remoteInterupted.get()) {
797                        stop();
798                    }
799                } else if (command.getClass() == ConnectionError.class) {
800                    ConnectionError ce = (ConnectionError) command;
801                    serviceLocalException(ce.getException());
802                } else {
803                    switch (command.getDataStructureType()) {
804                    case WireFormatInfo.DATA_STRUCTURE_TYPE:
805                        break;
806                    default:
807                        LOG.warn("Unexpected local command: " + command);
808                    }
809                }
810            } catch (Throwable e) {
811                LOG.warn("Caught an exception processing local command", e);
812                serviceLocalException(e);
813            }
814        }
815    }
816
817    private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
818        // See if this consumer's brokerPath tells us it came from the broker at the other end
819        // of the bridge. I think we should be making this decision based on the message's
820        // broker bread crumbs and not the consumer's? However, the message's broker bread
821        // crumbs are null, which is another matter.   
822        boolean suppress = false;
823        Object consumerInfo = md.getMessage().getDataStructure();
824        if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) {
825            suppress = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
826        }
827        
828        // for durable subs, suppression via filter leaves dangling acks so we need to 
829        // check here and allow the ack irrespective
830        if (!suppress && sub.getLocalInfo().isDurable()) {
831            MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
832            messageEvalContext.setMessageReference(md.getMessage());
833            suppress = !createNetworkBridgeFilter(null).matches(messageEvalContext);
834        }  
835        return suppress;
836    }
837
838    /**
839     * @return Returns the dynamicallyIncludedDestinations.
840     */
841    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
842        return dynamicallyIncludedDestinations;
843    }
844
845    /**
846     * @param dynamicallyIncludedDestinations The
847     *            dynamicallyIncludedDestinations to set.
848     */
849    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
850        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
851    }
852
853    /**
854     * @return Returns the excludedDestinations.
855     */
856    public ActiveMQDestination[] getExcludedDestinations() {
857        return excludedDestinations;
858    }
859
860    /**
861     * @param excludedDestinations The excludedDestinations to set.
862     */
863    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
864        this.excludedDestinations = excludedDestinations;
865    }
866
867    /**
868     * @return Returns the staticallyIncludedDestinations.
869     */
870    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
871        return staticallyIncludedDestinations;
872    }
873
874    /**
875     * @param staticallyIncludedDestinations The staticallyIncludedDestinations
876     *            to set.
877     */
878    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
879        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
880    }
881
882    /**
883     * @return Returns the durableDestinations.
884     */
885    public ActiveMQDestination[] getDurableDestinations() {
886        return durableDestinations;
887    }
888
889    /**
890     * @param durableDestinations The durableDestinations to set.
891     */
892    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
893        this.durableDestinations = durableDestinations;
894    }
895
896    /**
897     * @return Returns the localBroker.
898     */
899    public Transport getLocalBroker() {
900        return localBroker;
901    }
902
903    /**
904     * @return Returns the remoteBroker.
905     */
906    public Transport getRemoteBroker() {
907        return remoteBroker;
908    }
909
910    /**
911     * @return the createdByDuplex
912     */
913    public boolean isCreatedByDuplex() {
914        return this.createdByDuplex;
915    }
916
917    /**
918     * @param createdByDuplex the createdByDuplex to set
919     */
920    public void setCreatedByDuplex(boolean createdByDuplex) {
921        this.createdByDuplex = createdByDuplex;
922    }
923
924    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
925        if (brokerPath != null) {
926            for (int i = 0; i < brokerPath.length; i++) {
927                if (brokerId.equals(brokerPath[i])) {
928                    return true;
929                }
930            }
931        }
932        return false;
933    }
934
935    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
936        if (brokerPath == null || brokerPath.length == 0) {
937            return pathsToAppend;
938        }
939        BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
940        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
941        System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
942        return rc;
943    }
944
945    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
946        if (brokerPath == null || brokerPath.length == 0) {
947            return new BrokerId[] { idToAppend };
948        }
949        BrokerId rc[] = new BrokerId[brokerPath.length + 1];
950        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
951        rc[brokerPath.length] = idToAppend;
952        return rc;
953    }
954
955    protected boolean isPermissableDestination(ActiveMQDestination destination) {
956        return isPermissableDestination(destination, false);
957    }
958
959    protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
960        // Are we not bridging temp destinations?
961        if (destination.isTemporary()) {
962            if (allowTemporary) {
963                return true;
964            } else {
965                return configuration.isBridgeTempDestinations();
966            }
967        }
968
969        ActiveMQDestination[] dests = excludedDestinations;
970        if (dests != null && dests.length > 0) {
971            for (int i = 0; i < dests.length; i++) {
972                ActiveMQDestination match = dests[i];
973                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match);
974                if (match != null && exclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
975                    return false;
976                }
977            }
978        }
979
980        dests = dynamicallyIncludedDestinations;
981        if (dests != null && dests.length > 0) {
982            for (int i = 0; i < dests.length; i++) {
983                ActiveMQDestination match = dests[i];
984                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
985                if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
986                    return true;
987                }
988            }
989
990            return false;
991        }
992        return true;
993    }
994
995    /**
996     * Subscriptions for these destinations are always created
997     */
998    protected void setupStaticDestinations() {
999        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1000        if (dests != null) {
1001            for (int i = 0; i < dests.length; i++) {
1002                ActiveMQDestination dest = dests[i];
1003                DemandSubscription sub = createDemandSubscription(dest);
1004                try {
1005                    addSubscription(sub);
1006                } catch (IOException e) {
1007                    LOG.error("Failed to add static destination " + dest, e);
1008                }
1009                if (LOG.isTraceEnabled()) {
1010                    LOG.trace("bridging messages for static destination: " + dest);
1011                }
1012            }
1013        }
1014    }
1015
1016    protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1017        boolean consumerAdded = false;
1018        ConsumerInfo info = consumerInfo.copy();
1019        addRemoteBrokerToBrokerPath(info);
1020        DemandSubscription sub = createDemandSubscription(info);
1021        if (sub != null) {
1022            if (duplicateSuppressionIsRequired(sub)) {
1023                undoMapRegistration(sub);
1024            } else {
1025                addSubscription(sub);
1026                consumerAdded = true;
1027            }
1028        }
1029        return consumerAdded;
1030    }
1031
1032    private void undoMapRegistration(DemandSubscription sub) {
1033        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1034        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1035    }
1036
1037    /*
1038     * check our existing subs networkConsumerIds against the list of network ids in this subscription
1039     * A match means a duplicate which we suppress for topics and maybe for queues
1040     */
1041    private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1042        final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1043        boolean suppress = false;
1044
1045        if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() ||
1046                consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) {
1047            return suppress;
1048        }
1049
1050        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1051        Collection<Subscription> currentSubs = 
1052            getRegionSubscriptions(consumerInfo.getDestination().isTopic());
1053        for (Subscription sub : currentSubs) {
1054            List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1055            if (!networkConsumers.isEmpty()) {
1056                if (matchFound(candidateConsumers, networkConsumers)) {
1057                    suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1058                    break;
1059                }
1060            }
1061        }
1062        return suppress;
1063    }
1064
1065    private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1066        boolean suppress = false;
1067
1068        if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1069            if (LOG.isDebugEnabled()) {
1070                LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
1071                        + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: " 
1072                        + existingSub.getConsumerInfo()  + ", networkComsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
1073            }
1074            suppress = true;
1075        } else {
1076            // remove the existing lower priority duplicate and allow this candidate
1077            try {
1078                removeDuplicateSubscription(existingSub);
1079
1080                if (LOG.isDebugEnabled()) {
1081                    LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
1082                            + " with sub from " + remoteBrokerName
1083                            + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: " 
1084                            + candidateInfo.getNetworkConsumerIds());
1085                }
1086            } catch (IOException e) {
1087                LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e);
1088            }
1089        }
1090        return suppress;
1091    }
1092
1093    private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1094        for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1095            if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1096                break;
1097            }
1098        }
1099    }
1100
1101    private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1102        boolean found = false;
1103        for (ConsumerId aliasConsumer : networkConsumers) {
1104            if (candidateConsumers.contains(aliasConsumer)) {
1105                found = true;
1106                break;
1107            }
1108        }
1109        return found;
1110    }
1111
1112    private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) {
1113        RegionBroker region = (RegionBroker) brokerService.getRegionBroker();
1114        AbstractRegion abstractRegion = (AbstractRegion) 
1115            (isTopic ? region.getTopicRegion() : region.getQueueRegion());
1116        return abstractRegion.getSubscriptions().values();
1117    }
1118
1119    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1120        //add our original id to ourselves
1121        info.addNetworkConsumerId(info.getConsumerId());
1122        return doCreateDemandSubscription(info);
1123    }
1124
1125    protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1126        DemandSubscription result = new DemandSubscription(info);
1127        result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1128        if (info.getDestination().isTemporary()) {
1129            // reset the local connection Id
1130
1131            ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1132            dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1133        }
1134
1135        if (configuration.isDecreaseNetworkConsumerPriority()) {
1136            byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
1137            if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1138                // The longer the path to the consumer, the less it's consumer priority.
1139                priority -= info.getBrokerPath().length + 1;
1140            }
1141            result.getLocalInfo().setPriority(priority);
1142            if (LOG.isDebugEnabled()) {
1143                LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
1144            }
1145        }
1146        configureDemandSubscription(info, result);
1147        return result;
1148    }
1149
1150    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1151        ConsumerInfo info = new ConsumerInfo();
1152        info.setDestination(destination);
1153        // the remote info held by the DemandSubscription holds the original
1154        // consumerId,
1155        // the local info get's overwritten
1156
1157        info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1158        DemandSubscription result = null;
1159        try {
1160            result = createDemandSubscription(info);
1161        } catch (IOException e) {
1162            LOG.error("Failed to create DemandSubscription ", e);
1163        }
1164        if (result != null) {
1165            result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
1166        }
1167        return result;
1168    }
1169
1170    protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1171        sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1172        sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1173        subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1174        subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1175
1176        if (!info.isDurable()) {
1177            // This works for now since we use a VM connection to the local broker.
1178            // may need to change if we ever subscribe to a remote broker.
1179            sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
1180        } else  {
1181            // need to ack this message if it is ignored as it is durable so
1182            // we check before we send. see: suppressMessageDispatch()
1183        }
1184    }
1185
1186    protected void removeDemandSubscription(ConsumerId id) throws IOException {
1187        DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1188        if (LOG.isDebugEnabled()) {
1189            LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub);
1190        }
1191        if (sub != null) {
1192            removeSubscription(sub);
1193            if (LOG.isDebugEnabled()) {
1194                LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " :  " + sub.getRemoteInfo());
1195            }
1196        }
1197    }
1198
1199    protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1200        boolean removeDone = false;
1201        DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1202        if (sub != null) {
1203            try {
1204                removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1205                removeDone = true;
1206            } catch (IOException e) {
1207                LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e);
1208            }
1209        }
1210        return removeDone;
1211    }
1212
1213    protected void waitStarted() throws InterruptedException {
1214        startedLatch.await();
1215        localBrokerIdKnownLatch.await();
1216    }
1217
1218    protected void clearDownSubscriptions() {
1219        subscriptionMapByLocalId.clear();
1220        subscriptionMapByRemoteId.clear();
1221    }
1222
1223    protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException;
1224
1225    protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException;
1226
1227    protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException;
1228
1229    protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
1230
1231    protected abstract BrokerId[] getRemoteBrokerPath();
1232
1233    public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1234        this.networkBridgeListener = listener;
1235    }
1236
1237    private void fireBridgeFailed() {
1238        NetworkBridgeListener l = this.networkBridgeListener;
1239        if (l != null) {
1240            l.bridgeFailed();
1241        }
1242    }
1243
1244    public String getRemoteAddress() {
1245        return remoteBroker.getRemoteAddress();
1246    }
1247
1248    public String getLocalAddress() {
1249        return localBroker.getRemoteAddress();
1250    }
1251
1252    public String getRemoteBrokerName() {
1253        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1254    }
1255
1256    public String getLocalBrokerName() {
1257        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1258    }
1259
1260    public long getDequeueCounter() {
1261        return dequeueCounter.get();
1262    }
1263
1264    public long getEnqueueCounter() {
1265        return enqueueCounter.get();
1266    }
1267
1268    protected boolean isDuplex() {
1269        return configuration.isDuplex() || createdByDuplex;
1270    }
1271
1272    public void setBrokerService(BrokerService brokerService) {
1273        this.brokerService = brokerService;
1274    }
1275}