001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport; 018 019import java.io.IOException; 020import java.util.Timer; 021import java.util.concurrent.SynchronousQueue; 022import java.util.concurrent.ThreadFactory; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicBoolean; 026import java.util.concurrent.atomic.AtomicInteger; 027 028import org.apache.activemq.command.KeepAliveInfo; 029import org.apache.activemq.command.WireFormatInfo; 030import org.apache.activemq.thread.SchedulerTimerTask; 031import org.apache.activemq.wireformat.WireFormat; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * Used to make sure that commands are arriving periodically from the peer of 037 * the transport. 038 * 039 * 040 */ 041public class InactivityMonitor extends TransportFilter { 042 043 private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitor.class); 044 private static ThreadPoolExecutor ASYNC_TASKS; 045 private static int CHECKER_COUNTER; 046 private static long DEFAULT_CHECK_TIME_MILLS = 30000; 047 private static Timer READ_CHECK_TIMER; 048 private static Timer WRITE_CHECK_TIMER; 049 050 private WireFormatInfo localWireFormatInfo; 051 private WireFormatInfo remoteWireFormatInfo; 052 private final AtomicBoolean monitorStarted = new AtomicBoolean(false); 053 054 private final AtomicBoolean commandSent = new AtomicBoolean(false); 055 private final AtomicBoolean inSend = new AtomicBoolean(false); 056 private final AtomicBoolean failed = new AtomicBoolean(false); 057 058 private final AtomicBoolean commandReceived = new AtomicBoolean(true); 059 private final AtomicBoolean inReceive = new AtomicBoolean(false); 060 private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); 061 062 private SchedulerTimerTask writeCheckerTask; 063 private SchedulerTimerTask readCheckerTask; 064 065 private boolean ignoreRemoteWireFormat = false; 066 private boolean ignoreAllWireFormatInfo = false; 067 068 private long readCheckTime = DEFAULT_CHECK_TIME_MILLS; 069 private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS; 070 private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS; 071 private boolean useKeepAlive = true; 072 private boolean keepAliveResponseRequired; 073 private WireFormat wireFormat; 074 075 private final Runnable readChecker = new Runnable() { 076 long lastRunTime; 077 public void run() { 078 long now = System.currentTimeMillis(); 079 long elapsed = (now-lastRunTime); 080 081 if( lastRunTime != 0 && LOG.isDebugEnabled() ) { 082 LOG.debug(""+elapsed+" ms elapsed since last read check."); 083 } 084 085 // Perhaps the timer executed a read check late.. and then executes 086 // the next read check on time which causes the time elapsed between 087 // read checks to be small.. 088 089 // If less than 90% of the read check Time elapsed then abort this readcheck. 090 if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression. 091 LOG.debug("Aborting read check.. Not enough time elapsed since last read check."); 092 return; 093 } 094 095 lastRunTime = now; 096 readCheck(); 097 } 098 }; 099 100 private boolean allowReadCheck(long elapsed) { 101 return elapsed > (readCheckTime * 9 / 10); 102 } 103 104 private final Runnable writeChecker = new Runnable() { 105 long lastRunTime; 106 public void run() { 107 long now = System.currentTimeMillis(); 108 if( lastRunTime != 0 && LOG.isDebugEnabled() ) { 109 LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check."); 110 111 } 112 lastRunTime = now; 113 writeCheck(); 114 } 115 }; 116 117 public InactivityMonitor(Transport next, WireFormat wireFormat) { 118 super(next); 119 this.wireFormat = wireFormat; 120 if (this.wireFormat == null) { 121 this.ignoreAllWireFormatInfo = true; 122 } 123 } 124 125 public void start() throws Exception { 126 next.start(); 127 startMonitorThreads(); 128 } 129 130 public void stop() throws Exception { 131 stopMonitorThreads(); 132 next.stop(); 133 } 134 135 final void writeCheck() { 136 if (inSend.get()) { 137 if (LOG.isTraceEnabled()) { 138 LOG.trace("A send is in progress"); 139 } 140 return; 141 } 142 143 if (!commandSent.get() && useKeepAlive) { 144 if (LOG.isTraceEnabled()) { 145 LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo"); 146 } 147 ASYNC_TASKS.execute(new Runnable() { 148 public void run() { 149 if (monitorStarted.get()) { 150 try { 151 152 KeepAliveInfo info = new KeepAliveInfo(); 153 info.setResponseRequired(keepAliveResponseRequired); 154 oneway(info); 155 } catch (IOException e) { 156 onException(e); 157 } 158 } 159 }; 160 }); 161 } else { 162 if (LOG.isTraceEnabled()) { 163 LOG.trace(this + " message sent since last write check, resetting flag"); 164 } 165 } 166 167 commandSent.set(false); 168 } 169 170 final void readCheck() { 171 int currentCounter = next.getReceiveCounter(); 172 int previousCounter = lastReceiveCounter.getAndSet(currentCounter); 173 if (inReceive.get() || currentCounter!=previousCounter ) { 174 if (LOG.isTraceEnabled()) { 175 LOG.trace("A receive is in progress"); 176 } 177 return; 178 } 179 if (!commandReceived.get()) { 180 if (LOG.isDebugEnabled()) { 181 LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); 182 } 183 ASYNC_TASKS.execute(new Runnable() { 184 public void run() { 185 onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress())); 186 }; 187 188 }); 189 } else { 190 if (LOG.isTraceEnabled()) { 191 LOG.trace("Message received since last read check, resetting flag: "); 192 } 193 } 194 commandReceived.set(false); 195 } 196 197 public void onCommand(Object command) { 198 commandReceived.set(true); 199 inReceive.set(true); 200 try { 201 if (command.getClass() == KeepAliveInfo.class) { 202 KeepAliveInfo info = (KeepAliveInfo) command; 203 if (info.isResponseRequired()) { 204 try { 205 info.setResponseRequired(false); 206 oneway(info); 207 } catch (IOException e) { 208 onException(e); 209 } 210 } 211 } else { 212 if (command.getClass() == WireFormatInfo.class) { 213 synchronized (this) { 214 IOException error = null; 215 remoteWireFormatInfo = (WireFormatInfo) command; 216 try { 217 startMonitorThreads(); 218 } catch (IOException e) { 219 error = e; 220 } 221 if (error != null) { 222 onException(error); 223 } 224 } 225 } 226 synchronized (readChecker) { 227 transportListener.onCommand(command); 228 } 229 } 230 } finally { 231 232 inReceive.set(false); 233 } 234 } 235 236 public void oneway(Object o) throws IOException { 237 // Disable inactivity monitoring while processing a command. 238 //synchronize this method - its not synchronized 239 //further down the transport stack and gets called by more 240 //than one thread by this class 241 synchronized(inSend) { 242 inSend.set(true); 243 try { 244 245 if( failed.get() ) { 246 throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress()); 247 } 248 if (o.getClass() == WireFormatInfo.class) { 249 synchronized (this) { 250 localWireFormatInfo = (WireFormatInfo)o; 251 startMonitorThreads(); 252 } 253 } 254 next.oneway(o); 255 } finally { 256 commandSent.set(true); 257 inSend.set(false); 258 } 259 } 260 } 261 262 public void onException(IOException error) { 263 if (failed.compareAndSet(false, true)) { 264 stopMonitorThreads(); 265 transportListener.onException(error); 266 } 267 } 268 269 public void setKeepAliveResponseRequired(boolean val) { 270 keepAliveResponseRequired = val; 271 } 272 273 public void setUseKeepAlive(boolean val) { 274 useKeepAlive = val; 275 } 276 277 public void setIgnoreRemoteWireFormat(boolean val) { 278 ignoreRemoteWireFormat = val; 279 } 280 281 public long getReadCheckTime() { 282 return readCheckTime; 283 } 284 285 public void setReadCheckTime(long readCheckTime) { 286 this.readCheckTime = readCheckTime; 287 } 288 289 public long getInitialDelayTime() { 290 return initialDelayTime; 291 } 292 293 public void setInitialDelayTime(long initialDelayTime) { 294 this.initialDelayTime = initialDelayTime; 295 } 296 297 private synchronized void startMonitorThreads() throws IOException { 298 if (monitorStarted.get()) { 299 return; 300 } 301 302 if (!configuredOk()) { 303 return; 304 } 305 306 if (readCheckTime > 0) { 307 monitorStarted.set(true); 308 writeCheckerTask = new SchedulerTimerTask(writeChecker); 309 readCheckerTask = new SchedulerTimerTask(readChecker); 310 writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime; 311 synchronized( InactivityMonitor.class ) { 312 if( CHECKER_COUNTER == 0 ) { 313 ASYNC_TASKS = createExecutor(); 314 READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true); 315 WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true); 316 } 317 CHECKER_COUNTER++; 318 WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime); 319 READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime); 320 } 321 } 322 } 323 324 private boolean configuredOk() throws IOException { 325 boolean configured = false; 326 if (ignoreAllWireFormatInfo) { 327 configured = true; 328 } else if (localWireFormatInfo != null && remoteWireFormatInfo != null) { 329 if (!ignoreRemoteWireFormat) { 330 if (LOG.isDebugEnabled()) { 331 LOG.debug("Using min of local: " + localWireFormatInfo + " and remote: " + remoteWireFormatInfo); 332 } 333 readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); 334 initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()); 335 } else { 336 if (LOG.isDebugEnabled()) { 337 LOG.debug("Using local: " + localWireFormatInfo); 338 } 339 readCheckTime = localWireFormatInfo.getMaxInactivityDuration(); 340 initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay(); 341 } 342 configured = true; 343 } 344 return configured; 345 } 346 347 /** 348 * 349 */ 350 private synchronized void stopMonitorThreads() { 351 if (monitorStarted.compareAndSet(true, false)) { 352 readCheckerTask.cancel(); 353 writeCheckerTask.cancel(); 354 synchronized( InactivityMonitor.class ) { 355 WRITE_CHECK_TIMER.purge(); 356 READ_CHECK_TIMER.purge(); 357 CHECKER_COUNTER--; 358 if(CHECKER_COUNTER==0) { 359 WRITE_CHECK_TIMER.cancel(); 360 READ_CHECK_TIMER.cancel(); 361 WRITE_CHECK_TIMER = null; 362 READ_CHECK_TIMER = null; 363 ASYNC_TASKS.shutdownNow(); 364 ASYNC_TASKS = null; 365 } 366 } 367 } 368 } 369 370 private ThreadFactory factory = new ThreadFactory() { 371 public Thread newThread(Runnable runnable) { 372 Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable); 373 thread.setDaemon(true); 374 return thread; 375 } 376 }; 377 378 private ThreadPoolExecutor createExecutor() { 379 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory); 380 } 381}