001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.discovery.simple; 018 019import java.io.IOException; 020import java.net.URI; 021import java.util.Map; 022import java.util.concurrent.SynchronousQueue; 023import java.util.concurrent.ThreadFactory; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027 028import org.apache.activemq.command.DiscoveryEvent; 029import org.apache.activemq.thread.DefaultThreadPools; 030import org.apache.activemq.transport.discovery.DiscoveryAgent; 031import org.apache.activemq.transport.discovery.DiscoveryListener; 032import org.apache.activemq.util.MDCHelper; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * A simple DiscoveryAgent that allows static configuration of the discovered 038 * services. 039 * 040 * 041 */ 042public class SimpleDiscoveryAgent implements DiscoveryAgent { 043 044 private final static Logger LOG = LoggerFactory.getLogger(SimpleDiscoveryAgent.class); 045 private long initialReconnectDelay = 1000; 046 private long maxReconnectDelay = 1000 * 30; 047 private long backOffMultiplier = 2; 048 private boolean useExponentialBackOff=true; 049 private int maxReconnectAttempts; 050 private final Object sleepMutex = new Object(); 051 private long minConnectTime = 5000; 052 private DiscoveryListener listener; 053 private String services[] = new String[] {}; 054 private final AtomicBoolean running = new AtomicBoolean(false); 055 056 class SimpleDiscoveryEvent extends DiscoveryEvent { 057 058 private int connectFailures; 059 private long reconnectDelay = initialReconnectDelay; 060 private long connectTime = System.currentTimeMillis(); 061 private AtomicBoolean failed = new AtomicBoolean(false); 062 063 public SimpleDiscoveryEvent(String service) { 064 super(service); 065 } 066 067 } 068 069 public void setDiscoveryListener(DiscoveryListener listener) { 070 this.listener = listener; 071 } 072 073 public void registerService(String name) throws IOException { 074 } 075 076 public void start() throws Exception { 077 running.set(true); 078 for (int i = 0; i < services.length; i++) { 079 listener.onServiceAdd(new SimpleDiscoveryEvent(services[i])); 080 } 081 } 082 083 public void stop() throws Exception { 084 running.set(false); 085 synchronized (sleepMutex) { 086 sleepMutex.notifyAll(); 087 } 088 } 089 090 public String[] getServices() { 091 return services; 092 } 093 094 public void setServices(String services) { 095 this.services = services.split(","); 096 } 097 098 public void setServices(String services[]) { 099 this.services = services; 100 } 101 102 public void setServices(URI services[]) { 103 this.services = new String[services.length]; 104 for (int i = 0; i < services.length; i++) { 105 this.services[i] = services[i].toString(); 106 } 107 } 108 109 public void serviceFailed(DiscoveryEvent devent) throws IOException { 110 111 final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent; 112 if (event.failed.compareAndSet(false, true)) { 113 114 listener.onServiceRemove(event); 115 final Map context = MDCHelper.getCopyOfContextMap(); 116 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { 117 public void run() { 118 119 MDCHelper.setContextMap(context); 120 121 // We detect a failed connection attempt because the service 122 // fails right 123 // away. 124 if (event.connectTime + minConnectTime > System.currentTimeMillis()) { 125 LOG.debug("Failure occurred soon after the discovery event was generated. It will be classified as a connection failure: "+event); 126 127 event.connectFailures++; 128 129 if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) { 130 LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled."); 131 return; 132 } 133 134 synchronized (sleepMutex) { 135 try { 136 if (!running.get()) { 137 return; 138 } 139 140 LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect."); 141 sleepMutex.wait(event.reconnectDelay); 142 } catch (InterruptedException ie) { 143 Thread.currentThread().interrupt(); 144 return; 145 } 146 } 147 148 if (!useExponentialBackOff) { 149 event.reconnectDelay = initialReconnectDelay; 150 } else { 151 // Exponential increment of reconnect delay. 152 event.reconnectDelay *= backOffMultiplier; 153 if (event.reconnectDelay > maxReconnectDelay) { 154 event.reconnectDelay = maxReconnectDelay; 155 } 156 } 157 158 } else { 159 event.connectFailures = 0; 160 event.reconnectDelay = initialReconnectDelay; 161 } 162 163 if (!running.get()) { 164 return; 165 } 166 167 event.connectTime = System.currentTimeMillis(); 168 event.failed.set(false); 169 listener.onServiceAdd(event); 170 } 171 }, "Simple Discovery Agent"); 172 } 173 } 174 175 public long getBackOffMultiplier() { 176 return backOffMultiplier; 177 } 178 179 public void setBackOffMultiplier(long backOffMultiplier) { 180 this.backOffMultiplier = backOffMultiplier; 181 } 182 183 public long getInitialReconnectDelay() { 184 return initialReconnectDelay; 185 } 186 187 public void setInitialReconnectDelay(long initialReconnectDelay) { 188 this.initialReconnectDelay = initialReconnectDelay; 189 } 190 191 public int getMaxReconnectAttempts() { 192 return maxReconnectAttempts; 193 } 194 195 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 196 this.maxReconnectAttempts = maxReconnectAttempts; 197 } 198 199 public long getMaxReconnectDelay() { 200 return maxReconnectDelay; 201 } 202 203 public void setMaxReconnectDelay(long maxReconnectDelay) { 204 this.maxReconnectDelay = maxReconnectDelay; 205 } 206 207 public long getMinConnectTime() { 208 return minConnectTime; 209 } 210 211 public void setMinConnectTime(long minConnectTime) { 212 this.minConnectTime = minConnectTime; 213 } 214 215 public boolean isUseExponentialBackOff() { 216 return useExponentialBackOff; 217 } 218 219 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 220 this.useExponentialBackOff = useExponentialBackOff; 221 } 222}