001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.discovery.simple;
018
019import java.io.IOException;
020import java.net.URI;
021import java.util.Map;
022import java.util.concurrent.SynchronousQueue;
023import java.util.concurrent.ThreadFactory;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027
028import org.apache.activemq.command.DiscoveryEvent;
029import org.apache.activemq.thread.DefaultThreadPools;
030import org.apache.activemq.transport.discovery.DiscoveryAgent;
031import org.apache.activemq.transport.discovery.DiscoveryListener;
032import org.apache.activemq.util.MDCHelper;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * A simple DiscoveryAgent that allows static configuration of the discovered
038 * services.
039 * 
040 * 
041 */
042public class SimpleDiscoveryAgent implements DiscoveryAgent {
043
044    private final static Logger LOG = LoggerFactory.getLogger(SimpleDiscoveryAgent.class);
045    private long initialReconnectDelay = 1000;
046    private long maxReconnectDelay = 1000 * 30;
047    private long backOffMultiplier = 2;
048    private boolean useExponentialBackOff=true;
049    private int maxReconnectAttempts;
050    private final Object sleepMutex = new Object();
051    private long minConnectTime = 5000;
052    private DiscoveryListener listener;
053    private String services[] = new String[] {};
054    private final AtomicBoolean running = new AtomicBoolean(false);
055
056    class SimpleDiscoveryEvent extends DiscoveryEvent {
057
058        private int connectFailures;
059        private long reconnectDelay = initialReconnectDelay;
060        private long connectTime = System.currentTimeMillis();
061        private AtomicBoolean failed = new AtomicBoolean(false);
062
063        public SimpleDiscoveryEvent(String service) {
064            super(service);
065        }
066
067    }
068
069    public void setDiscoveryListener(DiscoveryListener listener) {
070        this.listener = listener;
071    }
072
073    public void registerService(String name) throws IOException {
074    }
075
076    public void start() throws Exception {
077        running.set(true);
078        for (int i = 0; i < services.length; i++) {
079            listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
080        }
081    }
082
083    public void stop() throws Exception {
084        running.set(false);
085        synchronized (sleepMutex) {
086            sleepMutex.notifyAll();
087        }
088    }
089
090    public String[] getServices() {
091        return services;
092    }
093
094    public void setServices(String services) {
095        this.services = services.split(",");
096    }
097
098    public void setServices(String services[]) {
099        this.services = services;
100    }
101
102    public void setServices(URI services[]) {
103        this.services = new String[services.length];
104        for (int i = 0; i < services.length; i++) {
105            this.services[i] = services[i].toString();
106        }
107    }
108
109    public void serviceFailed(DiscoveryEvent devent) throws IOException {
110
111        final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
112        if (event.failed.compareAndSet(false, true)) {
113
114            listener.onServiceRemove(event);
115            final Map context = MDCHelper.getCopyOfContextMap();
116            DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
117                public void run() {
118
119                    MDCHelper.setContextMap(context);
120
121                    // We detect a failed connection attempt because the service
122                    // fails right
123                    // away.
124                    if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
125                        LOG.debug("Failure occurred soon after the discovery event was generated.  It will be classified as a connection failure: "+event);
126
127                        event.connectFailures++;
128
129                        if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
130                            LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries.  Reconnecting has been disabled.");
131                            return;
132                        }
133
134                        synchronized (sleepMutex) {
135                            try {
136                                if (!running.get()) {
137                                    return;
138                                }
139
140                                LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect.");
141                                sleepMutex.wait(event.reconnectDelay);
142                            } catch (InterruptedException ie) {
143                                Thread.currentThread().interrupt();
144                                return;
145                            }
146                        }
147
148                        if (!useExponentialBackOff) {
149                            event.reconnectDelay = initialReconnectDelay;
150                        } else {
151                            // Exponential increment of reconnect delay.
152                            event.reconnectDelay *= backOffMultiplier;
153                            if (event.reconnectDelay > maxReconnectDelay) {
154                                event.reconnectDelay = maxReconnectDelay;
155                            }
156                        }
157
158                    } else {
159                        event.connectFailures = 0;
160                        event.reconnectDelay = initialReconnectDelay;
161                    }
162
163                    if (!running.get()) {
164                        return;
165                    }
166
167                    event.connectTime = System.currentTimeMillis();
168                    event.failed.set(false);
169                    listener.onServiceAdd(event);
170                }
171            }, "Simple Discovery Agent");
172        }
173    }
174
175    public long getBackOffMultiplier() {
176        return backOffMultiplier;
177    }
178
179    public void setBackOffMultiplier(long backOffMultiplier) {
180        this.backOffMultiplier = backOffMultiplier;
181    }
182
183    public long getInitialReconnectDelay() {
184        return initialReconnectDelay;
185    }
186
187    public void setInitialReconnectDelay(long initialReconnectDelay) {
188        this.initialReconnectDelay = initialReconnectDelay;
189    }
190
191    public int getMaxReconnectAttempts() {
192        return maxReconnectAttempts;
193    }
194
195    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
196        this.maxReconnectAttempts = maxReconnectAttempts;
197    }
198
199    public long getMaxReconnectDelay() {
200        return maxReconnectDelay;
201    }
202
203    public void setMaxReconnectDelay(long maxReconnectDelay) {
204        this.maxReconnectDelay = maxReconnectDelay;
205    }
206
207    public long getMinConnectTime() {
208        return minConnectTime;
209    }
210
211    public void setMinConnectTime(long minConnectTime) {
212        this.minConnectTime = minConnectTime;
213    }
214
215    public boolean isUseExponentialBackOff() {
216        return useExponentialBackOff;
217    }
218
219    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
220        this.useExponentialBackOff = useExponentialBackOff;
221    }
222}