001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.transport.failover;
019
020import java.io.BufferedReader;
021import java.io.FileNotFoundException;
022import java.io.FileReader;
023import java.io.IOException;
024import java.io.InputStreamReader;
025import java.io.InterruptedIOException;
026import java.net.InetAddress;
027import java.net.MalformedURLException;
028import java.net.URI;
029import java.net.URL;
030import java.util.ArrayList;
031import java.util.HashSet;
032import java.util.Iterator;
033import java.util.LinkedHashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Set;
037import java.util.StringTokenizer;
038import java.util.concurrent.CopyOnWriteArrayList;
039import java.util.concurrent.atomic.AtomicReference;
040import org.apache.activemq.broker.SslContext;
041import org.apache.activemq.command.Command;
042import org.apache.activemq.command.ConnectionControl;
043import org.apache.activemq.command.ConnectionId;
044import org.apache.activemq.command.RemoveInfo;
045import org.apache.activemq.command.Response;
046import org.apache.activemq.state.ConnectionStateTracker;
047import org.apache.activemq.state.Tracked;
048import org.apache.activemq.thread.DefaultThreadPools;
049import org.apache.activemq.thread.Task;
050import org.apache.activemq.thread.TaskRunner;
051import org.apache.activemq.transport.CompositeTransport;
052import org.apache.activemq.transport.DefaultTransportListener;
053import org.apache.activemq.transport.FutureResponse;
054import org.apache.activemq.transport.ResponseCallback;
055import org.apache.activemq.transport.Transport;
056import org.apache.activemq.transport.TransportFactory;
057import org.apache.activemq.transport.TransportListener;
058import org.apache.activemq.util.IOExceptionSupport;
059import org.apache.activemq.util.ServiceSupport;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063
064/**
065 * A Transport that is made reliable by being able to fail over to another
066 * transport when a transport failure is detected.
067 * 
068 * 
069 */
070public class FailoverTransport implements CompositeTransport {
071
072    private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
073    private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
074    private TransportListener transportListener;
075    private boolean disposed;
076    private boolean connected;
077    private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
078    private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
079
080    private final Object reconnectMutex = new Object();
081    private final Object backupMutex = new Object();
082    private final Object sleepMutex = new Object();
083    private final Object listenerMutex = new Object();
084    private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
085    private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
086
087    private URI connectedTransportURI;
088    private URI failedConnectTransportURI;
089    private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
090    private final TaskRunner reconnectTask;
091    private boolean started;
092    private boolean initialized;
093    private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
094    private long maxReconnectDelay = 1000 * 30;
095    private double backOffMultiplier = 2d;
096    private long timeout = -1;
097    private boolean useExponentialBackOff = true;
098    private boolean randomize = true;
099    private int maxReconnectAttempts;
100    private int startupMaxReconnectAttempts;
101    private int connectFailures;
102    private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
103    private Exception connectionFailure;
104    private boolean firstConnection = true;
105    // optionally always have a backup created
106    private boolean backup = false;
107    private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
108    private int backupPoolSize = 1;
109    private boolean trackMessages = false;
110    private boolean trackTransactionProducers = true;
111    private int maxCacheSize = 128 * 1024;
112    private final TransportListener disposedListener = new DefaultTransportListener() {
113    };
114    //private boolean connectionInterruptProcessingComplete;
115
116    private final TransportListener myTransportListener = createTransportListener();
117    private boolean updateURIsSupported=true;
118    private boolean reconnectSupported=true;
119    // remember for reconnect thread
120    private SslContext brokerSslContext;
121    private String updateURIsURL = null;
122    private boolean rebalanceUpdateURIs=true;
123    private boolean doRebalance = false;
124
125    public FailoverTransport() throws InterruptedIOException {
126        brokerSslContext = SslContext.getCurrentSslContext();
127        stateTracker.setTrackTransactions(true);
128        // Setup a task that is used to reconnect the a connection async.
129        reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
130            public boolean iterate() {
131                boolean result = false;
132                boolean buildBackup = true;
133                boolean doReconnect = !disposed;
134                synchronized (backupMutex) {
135                    if ((connectedTransport.get() == null || doRebalance) && !disposed) {
136                        result = doReconnect();
137                        buildBackup = false;
138                    }
139                }
140                if (buildBackup) {
141                    buildBackups();
142                } else {
143                    // build backups on the next iteration
144                    buildBackup = true;
145                    try {
146                        reconnectTask.wakeup();
147                    } catch (InterruptedException e) {
148                        LOG.debug("Reconnect task has been interrupted.", e);
149                    }
150                }
151                return result;
152            }
153
154        }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
155    }
156
157    TransportListener createTransportListener() {
158        return new TransportListener() {
159            public void onCommand(Object o) {
160                Command command = (Command) o;
161                if (command == null) {
162                    return;
163                }
164                if (command.isResponse()) {
165                    Object object = null;
166                    synchronized (requestMap) {
167                        object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
168                    }
169                    if (object != null && object.getClass() == Tracked.class) {
170                        ((Tracked) object).onResponses(command);
171                    }
172                }
173                if (!initialized) {      
174                    initialized = true;
175                }
176                
177                if(command.isConnectionControl()) {
178                    handleConnectionControl((ConnectionControl) command);
179                }
180                if (transportListener != null) {
181                    transportListener.onCommand(command);
182                }
183            }
184
185            public void onException(IOException error) {
186                try {
187                    handleTransportFailure(error);
188                } catch (InterruptedException e) {
189                    Thread.currentThread().interrupt();
190                    transportListener.onException(new InterruptedIOException());
191                }
192            }
193
194            public void transportInterupted() {
195                if (transportListener != null) {
196                    transportListener.transportInterupted();
197                }
198            }
199
200            public void transportResumed() {
201                if (transportListener != null) {
202                    transportListener.transportResumed();
203                }
204            }
205        };
206    }
207
208    public final void disposeTransport(Transport transport) {
209        transport.setTransportListener(disposedListener);
210        ServiceSupport.dispose(transport);
211    }
212
213    public final void handleTransportFailure(IOException e) throws InterruptedException {
214        if (LOG.isTraceEnabled()) {
215            LOG.trace(this + " handleTransportFailure: " + e);
216        }
217        Transport transport = connectedTransport.getAndSet(null);
218        if (transport == null) {
219            // sync with possible in progress reconnect
220            synchronized (reconnectMutex) {
221                transport = connectedTransport.getAndSet(null);
222            }
223        }
224        if (transport != null) {
225
226            disposeTransport(transport);
227
228            boolean reconnectOk = false;
229            synchronized (reconnectMutex) {
230                if (started) {
231                    LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed to " + connectedTransportURI
232                            + " , attempting to automatically reconnect due to: " + e);
233                    LOG.debug("Transport failed with the following exception:", e);
234                    reconnectOk = true;
235                }
236                initialized = false;
237                failedConnectTransportURI = connectedTransportURI;
238                connectedTransportURI = null;
239                connected = false;
240
241                // notify before any reconnect attempt so ack state can be
242                // whacked
243                if (transportListener != null) {
244                    transportListener.transportInterupted();
245                }
246
247                if (reconnectOk) {
248                    reconnectTask.wakeup();
249                }
250            }
251        }
252    }
253
254    public final void handleConnectionControl(ConnectionControl control) {
255        String reconnectStr = control.getReconnectTo();
256        if (reconnectStr != null) {
257            reconnectStr = reconnectStr.trim();
258            if (reconnectStr.length() > 0) {
259                try {
260                    URI uri = new URI(reconnectStr);
261                    if (isReconnectSupported()) {
262                        reconnect(uri);
263                        LOG.info("Reconnected to: " + uri);
264                    }
265                } catch (Exception e) {
266                    LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
267                }
268            }
269        }
270        processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
271    }
272
273    private final void processNewTransports(boolean rebalance, String newTransports) {
274        if (newTransports != null) {
275            newTransports = newTransports.trim();
276            if (newTransports.length() > 0 && isUpdateURIsSupported()) {
277                List<URI> list = new ArrayList<URI>();
278                StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
279                while (tokenizer.hasMoreTokens()) {
280                    String str = tokenizer.nextToken();
281                    try {
282                        URI uri = new URI(str);
283                        list.add(uri);
284                    } catch (Exception e) {
285                        LOG.error("Failed to parse broker address: " + str, e);
286                    }
287                }
288                if (list.isEmpty() == false) {
289                    try {
290                        updateURIs(rebalance, list.toArray(new URI[list.size()]));
291                    } catch (IOException e) {
292                        LOG.error("Failed to update transport URI's from: " + newTransports, e);
293                    }
294                }
295
296            }
297        }
298    }
299
300    public void start() throws Exception {
301        synchronized (reconnectMutex) {
302            LOG.debug("Started.");
303            if (started) {
304                return;
305            }
306            started = true;
307            stateTracker.setMaxCacheSize(getMaxCacheSize());
308            stateTracker.setTrackMessages(isTrackMessages());
309            stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
310            if (connectedTransport.get() != null) {
311                stateTracker.restore(connectedTransport.get());
312            } else {
313                reconnect(false);
314            }
315        }
316    }
317
318    public void stop() throws Exception {
319        Transport transportToStop = null;
320        synchronized (reconnectMutex) {
321            LOG.debug("Stopped.");
322            if (!started) {
323                return;
324            }
325            started = false;
326            disposed = true;
327            connected = false;
328            for (BackupTransport t : backups) {
329                t.setDisposed(true);
330            }
331            backups.clear();
332
333            if (connectedTransport.get() != null) {
334                transportToStop = connectedTransport.getAndSet(null);
335            }
336            reconnectMutex.notifyAll();
337        }
338        synchronized (sleepMutex) {
339            sleepMutex.notifyAll();
340        }
341        reconnectTask.shutdown();
342        if (transportToStop != null) {
343            transportToStop.stop();
344        }
345    }
346
347    public long getInitialReconnectDelay() {
348        return initialReconnectDelay;
349    }
350
351    public void setInitialReconnectDelay(long initialReconnectDelay) {
352        this.initialReconnectDelay = initialReconnectDelay;
353    }
354
355    public long getMaxReconnectDelay() {
356        return maxReconnectDelay;
357    }
358
359    public void setMaxReconnectDelay(long maxReconnectDelay) {
360        this.maxReconnectDelay = maxReconnectDelay;
361    }
362
363    public long getReconnectDelay() {
364        return reconnectDelay;
365    }
366
367    public void setReconnectDelay(long reconnectDelay) {
368        this.reconnectDelay = reconnectDelay;
369    }
370
371    public double getReconnectDelayExponent() {
372        return backOffMultiplier;
373    }
374
375    public void setReconnectDelayExponent(double reconnectDelayExponent) {
376        this.backOffMultiplier = reconnectDelayExponent;
377    }
378
379    public Transport getConnectedTransport() {
380        return connectedTransport.get();
381    }
382
383    public URI getConnectedTransportURI() {
384        return connectedTransportURI;
385    }
386
387    public int getMaxReconnectAttempts() {
388        return maxReconnectAttempts;
389    }
390
391    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
392        this.maxReconnectAttempts = maxReconnectAttempts;
393    }
394
395    public int getStartupMaxReconnectAttempts() {
396        return this.startupMaxReconnectAttempts;
397    }
398
399    public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
400        this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
401    }
402
403    public long getTimeout() {
404        return timeout;
405    }
406
407    public void setTimeout(long timeout) {
408        this.timeout = timeout;
409    }
410
411    /**
412     * @return Returns the randomize.
413     */
414    public boolean isRandomize() {
415        return randomize;
416    }
417
418    /**
419     * @param randomize
420     *            The randomize to set.
421     */
422    public void setRandomize(boolean randomize) {
423        this.randomize = randomize;
424    }
425
426    public boolean isBackup() {
427        return backup;
428    }
429
430    public void setBackup(boolean backup) {
431        this.backup = backup;
432    }
433
434    public int getBackupPoolSize() {
435        return backupPoolSize;
436    }
437
438    public void setBackupPoolSize(int backupPoolSize) {
439        this.backupPoolSize = backupPoolSize;
440    }
441
442    public boolean isTrackMessages() {
443        return trackMessages;
444    }
445
446    public void setTrackMessages(boolean trackMessages) {
447        this.trackMessages = trackMessages;
448    }
449
450    public boolean isTrackTransactionProducers() {
451        return this.trackTransactionProducers;
452    }
453
454    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
455        this.trackTransactionProducers = trackTransactionProducers;
456    }
457
458    public int getMaxCacheSize() {
459        return maxCacheSize;
460    }
461
462    public void setMaxCacheSize(int maxCacheSize) {
463        this.maxCacheSize = maxCacheSize;
464    }
465
466    /**
467     * @return Returns true if the command is one sent when a connection is
468     *         being closed.
469     */
470    private boolean isShutdownCommand(Command command) {
471        return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo));
472    }
473
474    public void oneway(Object o) throws IOException {
475
476        Command command = (Command) o;
477        Exception error = null;
478        try {
479
480            synchronized (reconnectMutex) {
481
482                if (isShutdownCommand(command) && connectedTransport.get() == null) {
483                    if (command.isShutdownInfo()) {
484                        // Skipping send of ShutdownInfo command when not
485                        // connected.
486                        return;
487                    }
488                    if (command instanceof RemoveInfo || command.isMessageAck()) {
489                        // Simulate response to RemoveInfo command or ack (as it
490                        // will be stale)
491                        stateTracker.track(command);
492                        Response response = new Response();
493                        response.setCorrelationId(command.getCommandId());
494                        myTransportListener.onCommand(response);
495                        return;
496                    }
497                }
498                // Keep trying until the message is sent.
499                for (int i = 0; !disposed; i++) {
500                    try {
501
502                        // Wait for transport to be connected.
503                        Transport transport = connectedTransport.get();
504                        long start = System.currentTimeMillis();
505                        boolean timedout = false;
506                        while (transport == null && !disposed && connectionFailure == null
507                                && !Thread.currentThread().isInterrupted()) {
508                            LOG.trace("Waiting for transport to reconnect..: " + command);
509                            long end = System.currentTimeMillis();
510                            if (timeout > 0 && (end - start > timeout)) {
511                                timedout = true;
512                                LOG.info("Failover timed out after " + (end - start) + "ms");
513                                break;
514                            }
515                            try {
516                                reconnectMutex.wait(100);
517                            } catch (InterruptedException e) {
518                                Thread.currentThread().interrupt();
519                                LOG.debug("Interupted: " + e, e);
520                            }
521                            transport = connectedTransport.get();
522                        }
523
524                        if (transport == null) {
525                            // Previous loop may have exited due to use being
526                            // disposed.
527                            if (disposed) {
528                                error = new IOException("Transport disposed.");
529                            } else if (connectionFailure != null) {
530                                error = connectionFailure;
531                            } else if (timedout == true) {
532                                error = new IOException("Failover timeout of " + timeout + " ms reached.");
533                            } else {
534                                error = new IOException("Unexpected failure.");
535                            }
536                            break;
537                        }
538
539                        // If it was a request and it was not being tracked by
540                        // the state tracker,
541                        // then hold it in the requestMap so that we can replay
542                        // it later.
543                        Tracked tracked = stateTracker.track(command);
544                        synchronized (requestMap) {
545                            if (tracked != null && tracked.isWaitingForResponse()) {
546                                requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
547                            } else if (tracked == null && command.isResponseRequired()) {
548                                requestMap.put(Integer.valueOf(command.getCommandId()), command);
549                            }
550                        }
551
552                        // Send the message.
553                        try {
554                            transport.oneway(command);
555                            stateTracker.trackBack(command);
556                        } catch (IOException e) {
557
558                            // If the command was not tracked.. we will retry in
559                            // this method
560                            if (tracked == null) {
561
562                                // since we will retry in this method.. take it
563                                // out of the request
564                                // map so that it is not sent 2 times on
565                                // recovery
566                                if (command.isResponseRequired()) {
567                                    requestMap.remove(Integer.valueOf(command.getCommandId()));
568                                }
569
570                                // Rethrow the exception so it will handled by
571                                // the outer catch
572                                throw e;
573                            }
574
575                        }
576
577                        return;
578
579                    } catch (IOException e) {
580                        if (LOG.isDebugEnabled()) {
581                            LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
582                        }
583                        handleTransportFailure(e);
584                    }
585                }
586            }
587        } catch (InterruptedException e) {
588            // Some one may be trying to stop our thread.
589            Thread.currentThread().interrupt();
590            throw new InterruptedIOException();
591        }
592        if (!disposed) {
593            if (error != null) {
594                if (error instanceof IOException) {
595                    throw (IOException) error;
596                }
597                throw IOExceptionSupport.create(error);
598            }
599        }
600    }
601
602    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
603        throw new AssertionError("Unsupported Method");
604    }
605
606    public Object request(Object command) throws IOException {
607        throw new AssertionError("Unsupported Method");
608    }
609
610    public Object request(Object command, int timeout) throws IOException {
611        throw new AssertionError("Unsupported Method");
612    }
613
614    public void add(boolean rebalance, URI u[]) {
615        boolean newURI = false;
616        for (int i = 0; i < u.length; i++) {
617            if (!contains(u[i])) {
618                uris.add(u[i]);
619                newURI = true;
620            }
621        }
622        if (newURI) {
623            reconnect(rebalance);
624        }
625    }
626
627    public void remove(boolean rebalance, URI u[]) {
628        for (int i = 0; i < u.length; i++) {
629            uris.remove(u[i]);
630        }
631        // rebalance is automatic if any connected to removed/stopped broker
632    }
633
634    public void add(boolean rebalance, String u) {
635        try {
636            URI newURI = new URI(u);
637            if (contains(newURI)==false) {
638                uris.add(newURI);
639                reconnect(rebalance);
640            }
641       
642        } catch (Exception e) {
643            LOG.error("Failed to parse URI: " + u);
644        }
645    }
646
647    public void reconnect(boolean rebalance) {
648        synchronized (reconnectMutex) {
649            if (started) {
650                if (rebalance) {
651                    doRebalance = true;
652                }
653                LOG.debug("Waking up reconnect task");
654                try {
655                    reconnectTask.wakeup();
656                } catch (InterruptedException e) {
657                    Thread.currentThread().interrupt();
658                }
659            } else {
660                LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
661            }
662        }
663    }
664
665    private List<URI> getConnectList() {
666        ArrayList<URI> l = new ArrayList<URI>(uris);
667        boolean removed = false;
668        if (failedConnectTransportURI != null) {
669            removed = l.remove(failedConnectTransportURI);
670        }
671        if (randomize) {
672            // Randomly, reorder the list by random swapping
673            for (int i = 0; i < l.size(); i++) {
674                int p = (int) (Math.random() * 100 % l.size());
675                URI t = l.get(p);
676                l.set(p, l.get(i));
677                l.set(i, t);
678            }
679        }
680        if (removed) {
681            l.add(failedConnectTransportURI);
682        }
683        LOG.debug("urlList connectionList:" + l + ", from: " + uris);
684        return l;
685    }
686
687    public TransportListener getTransportListener() {
688        return transportListener;
689    }
690
691    public void setTransportListener(TransportListener commandListener) {
692        synchronized (listenerMutex) {
693            this.transportListener = commandListener;
694            listenerMutex.notifyAll();
695        }
696    }
697
698    public <T> T narrow(Class<T> target) {
699
700        if (target.isAssignableFrom(getClass())) {
701            return target.cast(this);
702        }
703        Transport transport = connectedTransport.get();
704        if (transport != null) {
705            return transport.narrow(target);
706        }
707        return null;
708
709    }
710
711    protected void restoreTransport(Transport t) throws Exception, IOException {
712        t.start();
713        // send information to the broker - informing it we are an ft client
714        ConnectionControl cc = new ConnectionControl();
715        cc.setFaultTolerant(true);
716        t.oneway(cc);
717        stateTracker.restore(t);
718        Map tmpMap = null;
719        synchronized (requestMap) {
720            tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
721        }
722        for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
723            Command command = iter2.next();
724            if (LOG.isTraceEnabled()) {
725                LOG.trace("restore requestMap, replay: " + command);
726            }
727            t.oneway(command);
728        }
729    }
730
731    public boolean isUseExponentialBackOff() {
732        return useExponentialBackOff;
733    }
734
735    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
736        this.useExponentialBackOff = useExponentialBackOff;
737    }
738
739    @Override
740    public String toString() {
741        return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
742    }
743
744    public String getRemoteAddress() {
745        Transport transport = connectedTransport.get();
746        if (transport != null) {
747            return transport.getRemoteAddress();
748        }
749        return null;
750    }
751
752    public boolean isFaultTolerant() {
753        return true;
754    }
755
756    final boolean doReconnect() {
757        Exception failure = null;
758        synchronized (reconnectMutex) {
759
760            // If updateURIsURL is specified, read the file and add any new
761            // transport URI's to this FailOverTransport. 
762            // Note: Could track file timestamp to avoid unnecessary reading.
763            String fileURL = getUpdateURIsURL();
764            if (fileURL != null) {
765                BufferedReader in = null;
766                String newUris = null;
767                StringBuffer buffer = new StringBuffer();
768
769                try {
770                    in = new BufferedReader(getURLStream(fileURL));
771                    while (true) {
772                        String line = in.readLine();
773                        if (line == null) {
774                            break;
775                        }
776                        buffer.append(line);
777                    }
778                    newUris = buffer.toString();
779                } catch (IOException ioe) {
780                    LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
781                } finally {
782                    if (in != null) {
783                        try {
784                            in.close();
785                        } catch (IOException ioe) {
786                            // ignore
787                        }
788                    }
789                }
790                
791                processNewTransports(isRebalanceUpdateURIs(), newUris);
792            }
793
794            if (disposed || connectionFailure != null) {
795                reconnectMutex.notifyAll();
796            }
797
798            if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) {
799                return false;
800            } else {
801                List<URI> connectList = getConnectList();
802                if (connectList.isEmpty()) {
803                    failure = new IOException("No uris available to connect to.");
804                } else {
805                    if (doRebalance) {
806                        if (connectList.get(0).equals(connectedTransportURI)) {
807                            // already connected to first in the list, no need to rebalance
808                            doRebalance = false;
809                            return false;
810                        } else {
811                            LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
812                            try {
813                                Transport transport = this.connectedTransport.getAndSet(null);
814                                if (transport != null) {
815                                    disposeTransport(transport);
816                                }
817                            } catch (Exception e) {
818                                LOG.debug("Caught an exception stopping existing transport for rebalance", e);
819                            }
820                        }
821                        doRebalance = false;
822                    }
823                    if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
824                        reconnectDelay = initialReconnectDelay;
825                    }
826                    synchronized (backupMutex) {
827                        if (backup && !backups.isEmpty()) {
828                            BackupTransport bt = backups.remove(0);
829                            Transport t = bt.getTransport();
830                            URI uri = bt.getUri();
831                            t.setTransportListener(myTransportListener);
832                            try {
833                                if (started) {
834                                    restoreTransport(t);
835                                }
836                                reconnectDelay = initialReconnectDelay;
837                                failedConnectTransportURI = null;
838                                connectedTransportURI = uri;
839                                connectedTransport.set(t);
840                                reconnectMutex.notifyAll();
841                                connectFailures = 0;
842                                LOG.info("Successfully reconnected to backup " + uri);
843                                return false;
844                            } catch (Exception e) {
845                                LOG.debug("Backup transport failed", e);
846                            }
847                        }
848                    }
849
850                    Iterator<URI> iter = connectList.iterator();
851                    while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
852                        URI uri = iter.next();
853                        Transport t = null;
854                        try {
855                            LOG.debug("Attempting connect to: " + uri);
856                            SslContext.setCurrentSslContext(brokerSslContext);
857                            t = TransportFactory.compositeConnect(uri);
858                            t.setTransportListener(myTransportListener);
859                            t.start();
860
861                            if (started) {
862                                restoreTransport(t);
863                            }
864
865                            LOG.debug("Connection established");
866                            reconnectDelay = initialReconnectDelay;
867                            connectedTransportURI = uri;
868                            connectedTransport.set(t);
869                            reconnectMutex.notifyAll();
870                            connectFailures = 0;
871                            // Make sure on initial startup, that the
872                            // transportListener
873                            // has been initialized for this instance.
874                            synchronized (listenerMutex) {
875                                if (transportListener == null) {
876                                    try {
877                                        // if it isn't set after 2secs - it
878                                        // probably never will be
879                                        listenerMutex.wait(2000);
880                                    } catch (InterruptedException ex) {
881                                    }
882                                }
883                            }
884                            if (transportListener != null) {
885                                transportListener.transportResumed();
886                            } else {
887                                LOG.debug("transport resumed by transport listener not set");
888                            }
889                            if (firstConnection) {
890                                firstConnection = false;
891                                LOG.info("Successfully connected to " + uri);
892                            } else {
893                                LOG.info("Successfully reconnected to " + uri);
894                            }
895                            connected = true;
896                            return false;
897                        } catch (Exception e) {
898                            failure = e;
899                            LOG.debug("Connect fail to: " + uri + ", reason: " + e);
900                            if (t != null) {
901                                try {
902                                    t.stop();
903                                } catch (Exception ee) {
904                                    LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
905                                }
906                            }
907                        } finally {
908                            SslContext.setCurrentSslContext(null);
909                        }
910                    }
911                }
912            }
913            int reconnectAttempts = 0;
914            if (firstConnection) {
915                if (this.startupMaxReconnectAttempts != 0) {
916                    reconnectAttempts = this.startupMaxReconnectAttempts;
917                }
918            }
919            if (reconnectAttempts == 0) {
920                reconnectAttempts = this.maxReconnectAttempts;
921            }
922            if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
923                LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
924                connectionFailure = failure;
925
926                // Make sure on initial startup, that the transportListener has
927                // been initialized
928                // for this instance.
929                synchronized (listenerMutex) {
930                    if (transportListener == null) {
931                        try {
932                            listenerMutex.wait(2000);
933                        } catch (InterruptedException ex) {
934                        }
935                    }
936                }
937
938                if (transportListener != null) {
939                    if (connectionFailure instanceof IOException) {
940                        transportListener.onException((IOException) connectionFailure);
941                    } else {
942                        transportListener.onException(IOExceptionSupport.create(connectionFailure));
943                    }
944                }
945                reconnectMutex.notifyAll();
946                return false;
947            }
948        }
949        if (!disposed) {
950
951            LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
952            synchronized (sleepMutex) {
953                try {
954                    sleepMutex.wait(reconnectDelay);
955                } catch (InterruptedException e) {
956                    Thread.currentThread().interrupt();
957                }
958            }
959
960            if (useExponentialBackOff) {
961                // Exponential increment of reconnect delay.
962                reconnectDelay *= backOffMultiplier;
963                if (reconnectDelay > maxReconnectDelay) {
964                    reconnectDelay = maxReconnectDelay;
965                }
966            }
967        }
968        return !disposed;
969    }
970
971    final boolean buildBackups() {
972        synchronized (backupMutex) {
973            if (!disposed && backup && backups.size() < backupPoolSize) {
974                List<URI> connectList = getConnectList();
975                // removed disposed backups
976                List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
977                for (BackupTransport bt : backups) {
978                    if (bt.isDisposed()) {
979                        disposedList.add(bt);
980                    }
981                }
982                backups.removeAll(disposedList);
983                disposedList.clear();
984                for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize;) {
985                    URI uri = iter.next();
986                    if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
987                        try {
988                            SslContext.setCurrentSslContext(brokerSslContext);
989                            BackupTransport bt = new BackupTransport(this);
990                            bt.setUri(uri);
991                            if (!backups.contains(bt)) {
992                                Transport t = TransportFactory.compositeConnect(uri);
993                                t.setTransportListener(bt);
994                                t.start();
995                                bt.setTransport(t);
996                                backups.add(bt);
997                            }
998                        } catch (Exception e) {
999                            LOG.debug("Failed to build backup ", e);
1000                        } finally {
1001                            SslContext.setCurrentSslContext(null);
1002                        }
1003                    }
1004                }
1005            }
1006        }
1007        return false;
1008    }
1009
1010    public boolean isDisposed() {
1011        return disposed;
1012    }
1013
1014    public boolean isConnected() {
1015        return connected;
1016    }
1017
1018    public void reconnect(URI uri) throws IOException {
1019        add(true, new URI[] { uri });
1020    }
1021
1022    public boolean isReconnectSupported() {
1023        return this.reconnectSupported;
1024    }
1025    
1026    public void setReconnectSupported(boolean value) {
1027        this.reconnectSupported=value;
1028    }
1029   
1030    public boolean isUpdateURIsSupported() {
1031        return this.updateURIsSupported;
1032    }
1033    
1034    public void setUpdateURIsSupported(boolean value) {
1035        this.updateURIsSupported=value;
1036    }
1037
1038    public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
1039        if (isUpdateURIsSupported()) {
1040            List<URI> copy = new ArrayList<URI>(this.updated);
1041            List<URI> add = new ArrayList<URI>();
1042            if (updatedURIs != null && updatedURIs.length > 0) {
1043                Set<URI> set = new HashSet<URI>();
1044                for (int i = 0; i < updatedURIs.length; i++) {
1045                    URI uri = updatedURIs[i];
1046                    if (uri != null) {
1047                        set.add(uri);
1048                    }
1049                }
1050                for (URI uri : set) {
1051                    if (copy.remove(uri) == false) {
1052                        add.add(uri);
1053                    }
1054                }
1055                synchronized (reconnectMutex) {
1056                    this.updated.clear();
1057                    this.updated.addAll(add);
1058                    for (URI uri : copy) {
1059                        this.uris.remove(uri);
1060                    }
1061                    add(rebalance, add.toArray(new URI[add.size()]));
1062                }
1063            }
1064        }
1065    }
1066    
1067    /**
1068     * @return the updateURIsURL
1069     */
1070    public String getUpdateURIsURL() {
1071        return this.updateURIsURL;
1072    }
1073
1074    /**
1075     * @param updateURIsURL the updateURIsURL to set
1076     */
1077    public void setUpdateURIsURL(String updateURIsURL) {
1078        this.updateURIsURL = updateURIsURL;
1079    }
1080    
1081    /**
1082     * @return the rebalanceUpdateURIs
1083     */
1084    public boolean isRebalanceUpdateURIs() {
1085        return this.rebalanceUpdateURIs;
1086    }
1087
1088    /**
1089     * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
1090     */
1091    public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
1092        this.rebalanceUpdateURIs = rebalanceUpdateURIs;
1093    }
1094
1095    public int getReceiveCounter() {
1096        Transport transport = connectedTransport.get();
1097        if (transport == null) {
1098            return 0;
1099        }
1100        return transport.getReceiveCounter();
1101    }
1102
1103    public void connectionInterruptProcessingComplete(ConnectionId connectionId) {
1104        synchronized (reconnectMutex) {
1105            stateTracker.connectionInterruptProcessingComplete(this, connectionId);
1106        }
1107    }
1108    
1109    public ConnectionStateTracker getStateTracker() {
1110        return stateTracker;
1111    }
1112    
1113    private boolean contains(URI newURI) {
1114
1115        boolean result = false;
1116        try {
1117        for (URI uri:uris) {
1118            if (newURI.getPort()==uri.getPort()) {
1119                InetAddress newAddr = InetAddress.getByName(newURI.getHost());
1120                InetAddress addr = InetAddress.getByName(uri.getHost());
1121                if (addr.equals(newAddr)) {
1122                    result = true;
1123                    break;
1124                }
1125            }
1126        }
1127        }catch(IOException e) {
1128            result = true;
1129            LOG.error("Failed to verify URI " + newURI + " already known: " + e);
1130        }
1131        return result;
1132    }
1133    
1134    private InputStreamReader getURLStream(String path) throws IOException {
1135        InputStreamReader result = null;
1136        URL url = null;
1137        try {
1138            url = new URL(path);
1139            result = new InputStreamReader(url.openStream());
1140        } catch (MalformedURLException e) {
1141            // ignore - it could be a path to a a local file
1142        }
1143        if (result == null) {
1144            result = new FileReader(path);
1145        }
1146        return result;
1147    }
1148}