001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport;
018
019import java.io.IOException;
020import java.util.Timer;
021import java.util.concurrent.SynchronousQueue;
022import java.util.concurrent.ThreadFactory;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicBoolean;
026import java.util.concurrent.atomic.AtomicInteger;
027
028import org.apache.activemq.command.KeepAliveInfo;
029import org.apache.activemq.command.WireFormatInfo;
030import org.apache.activemq.thread.SchedulerTimerTask;
031import org.apache.activemq.wireformat.WireFormat;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Used to make sure that commands are arriving periodically from the peer of
037 * the transport.
038 *
039 * 
040 */
041public class InactivityMonitor extends TransportFilter {
042
043    private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitor.class);
044    private static ThreadPoolExecutor ASYNC_TASKS;
045    private static int CHECKER_COUNTER;
046    private static long DEFAULT_CHECK_TIME_MILLS = 30000;
047    private static Timer  READ_CHECK_TIMER;
048    private static Timer  WRITE_CHECK_TIMER;
049
050    private WireFormatInfo localWireFormatInfo;
051    private WireFormatInfo remoteWireFormatInfo;
052    private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
053
054    private final AtomicBoolean commandSent = new AtomicBoolean(false);
055    private final AtomicBoolean inSend = new AtomicBoolean(false);
056    private final AtomicBoolean failed = new AtomicBoolean(false);
057
058    private final AtomicBoolean commandReceived = new AtomicBoolean(true);
059    private final AtomicBoolean inReceive = new AtomicBoolean(false);
060    private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
061
062    private SchedulerTimerTask writeCheckerTask;
063    private SchedulerTimerTask readCheckerTask;
064
065    private boolean ignoreRemoteWireFormat = false;
066    private boolean ignoreAllWireFormatInfo = false;
067
068    private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
069    private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
070    private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
071    private boolean useKeepAlive = true;
072    private boolean keepAliveResponseRequired;
073    private WireFormat wireFormat;
074
075    private final Runnable readChecker = new Runnable() {
076        long lastRunTime;
077        public void run() {
078            long now = System.currentTimeMillis();
079            long elapsed = (now-lastRunTime);
080
081            if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
082                LOG.debug(""+elapsed+" ms elapsed since last read check.");
083            }
084
085            // Perhaps the timer executed a read check late.. and then executes
086            // the next read check on time which causes the time elapsed between
087            // read checks to be small..
088
089            // If less than 90% of the read check Time elapsed then abort this readcheck.
090            if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
091                LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
092                return;
093            }
094
095            lastRunTime = now;
096            readCheck();
097        }
098    };
099
100    private boolean allowReadCheck(long elapsed) {
101        return elapsed > (readCheckTime * 9 / 10);
102    }
103
104    private final Runnable writeChecker = new Runnable() {
105        long lastRunTime;
106        public void run() {
107            long now = System.currentTimeMillis();
108            if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
109                LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
110
111            }
112            lastRunTime = now;
113            writeCheck();
114        }
115    };
116
117    public InactivityMonitor(Transport next, WireFormat wireFormat) {
118        super(next);
119        this.wireFormat = wireFormat;
120        if (this.wireFormat == null) {
121            this.ignoreAllWireFormatInfo = true;
122        }
123    }
124
125    public void start() throws Exception {
126        next.start();
127        startMonitorThreads();
128    }
129
130    public void stop() throws Exception {
131        stopMonitorThreads();
132        next.stop();
133    }
134
135    final void writeCheck() {
136        if (inSend.get()) {
137            if (LOG.isTraceEnabled()) {
138                LOG.trace("A send is in progress");
139            }
140            return;
141        }
142
143        if (!commandSent.get() && useKeepAlive) {
144            if (LOG.isTraceEnabled()) {
145                LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
146            }
147            ASYNC_TASKS.execute(new Runnable() {
148                public void run() {
149                    if (monitorStarted.get()) {
150                        try {
151
152                            KeepAliveInfo info = new KeepAliveInfo();
153                            info.setResponseRequired(keepAliveResponseRequired);
154                            oneway(info);
155                        } catch (IOException e) {
156                            onException(e);
157                        }
158                    }
159                };
160            });
161        } else {
162            if (LOG.isTraceEnabled()) {
163                LOG.trace(this + " message sent since last write check, resetting flag");
164            }
165        }
166
167        commandSent.set(false);
168    }
169
170    final void readCheck() {
171        int currentCounter = next.getReceiveCounter();
172        int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
173        if (inReceive.get() || currentCounter!=previousCounter ) {
174            if (LOG.isTraceEnabled()) {
175                LOG.trace("A receive is in progress");
176            }
177            return;
178        }
179        if (!commandReceived.get()) {
180            if (LOG.isDebugEnabled()) {
181                LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
182            }
183            ASYNC_TASKS.execute(new Runnable() {
184                public void run() {
185                    onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
186                };
187
188            });
189        } else {
190            if (LOG.isTraceEnabled()) {
191                LOG.trace("Message received since last read check, resetting flag: ");
192            }
193        }
194        commandReceived.set(false);
195    }
196
197    public void onCommand(Object command) {
198        commandReceived.set(true);
199        inReceive.set(true);
200        try {
201            if (command.getClass() == KeepAliveInfo.class) {
202                KeepAliveInfo info = (KeepAliveInfo) command;
203                if (info.isResponseRequired()) {
204                    try {
205                        info.setResponseRequired(false);
206                        oneway(info);
207                    } catch (IOException e) {
208                        onException(e);
209                    }
210                }
211            } else {
212                if (command.getClass() == WireFormatInfo.class) {
213                    synchronized (this) {
214                        IOException error = null;
215                        remoteWireFormatInfo = (WireFormatInfo) command;
216                        try {
217                            startMonitorThreads();
218                        } catch (IOException e) {
219                            error = e;
220                        }
221                        if (error != null) {
222                            onException(error);
223                        }
224                    }
225                }
226                synchronized (readChecker) {
227                    transportListener.onCommand(command);
228                }
229            }
230        } finally {
231
232            inReceive.set(false);
233        }
234    }
235
236    public void oneway(Object o) throws IOException {
237        // Disable inactivity monitoring while processing a command.
238        //synchronize this method - its not synchronized
239        //further down the transport stack and gets called by more
240        //than one thread  by this class
241        synchronized(inSend) {
242            inSend.set(true);
243            try {
244
245                if( failed.get() ) {
246                    throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
247                }
248                if (o.getClass() == WireFormatInfo.class) {
249                    synchronized (this) {
250                        localWireFormatInfo = (WireFormatInfo)o;
251                        startMonitorThreads();
252                    }
253                }
254                next.oneway(o);
255            } finally {
256                commandSent.set(true);
257                inSend.set(false);
258            }
259        }
260    }
261
262    public void onException(IOException error) {
263        if (failed.compareAndSet(false, true)) {
264            stopMonitorThreads();
265            transportListener.onException(error);
266        }
267    }
268
269    public void setKeepAliveResponseRequired(boolean val) {
270        keepAliveResponseRequired = val;
271    }
272    
273    public void setUseKeepAlive(boolean val) {
274        useKeepAlive = val;
275    }
276
277    public void setIgnoreRemoteWireFormat(boolean val) {
278        ignoreRemoteWireFormat = val;
279    }
280
281    public long getReadCheckTime() {
282        return readCheckTime;
283    }
284
285    public void setReadCheckTime(long readCheckTime) {
286        this.readCheckTime = readCheckTime;
287    }
288
289    public long getInitialDelayTime() {
290        return initialDelayTime;
291    }
292
293    public void setInitialDelayTime(long initialDelayTime) {
294        this.initialDelayTime = initialDelayTime;
295    }
296    
297    private synchronized void startMonitorThreads() throws IOException {
298        if (monitorStarted.get()) {
299            return;
300        }
301
302        if (!configuredOk()) {
303            return;
304        }
305
306        if (readCheckTime > 0) {
307            monitorStarted.set(true);
308            writeCheckerTask = new SchedulerTimerTask(writeChecker);
309            readCheckerTask = new  SchedulerTimerTask(readChecker);
310            writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
311            synchronized( InactivityMonitor.class ) {
312                if( CHECKER_COUNTER == 0 ) {
313                    ASYNC_TASKS = createExecutor();
314                    READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
315                    WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
316                }
317                CHECKER_COUNTER++;
318                WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime);
319                READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime);
320            }
321        }
322    }
323
324    private boolean configuredOk() throws IOException {
325        boolean configured = false;
326        if (ignoreAllWireFormatInfo) {
327            configured = true;
328        } else if (localWireFormatInfo != null && remoteWireFormatInfo != null) {
329            if (!ignoreRemoteWireFormat) {
330                if (LOG.isDebugEnabled()) {
331                    LOG.debug("Using min of local: " + localWireFormatInfo + " and remote: " + remoteWireFormatInfo);
332                }
333                readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
334                initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
335            } else {
336                if (LOG.isDebugEnabled()) {
337                    LOG.debug("Using local: " + localWireFormatInfo);
338                }
339                readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
340                initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay();
341            }
342            configured = true;
343        }
344        return configured;
345    }
346
347    /**
348     *
349     */
350    private synchronized void stopMonitorThreads() {
351        if (monitorStarted.compareAndSet(true, false)) {
352            readCheckerTask.cancel();
353            writeCheckerTask.cancel();
354            synchronized( InactivityMonitor.class ) {
355                WRITE_CHECK_TIMER.purge();
356                READ_CHECK_TIMER.purge();
357                CHECKER_COUNTER--;
358                if(CHECKER_COUNTER==0) {
359                    WRITE_CHECK_TIMER.cancel();
360                    READ_CHECK_TIMER.cancel();
361                    WRITE_CHECK_TIMER = null;
362                    READ_CHECK_TIMER = null;
363                    ASYNC_TASKS.shutdownNow();
364                    ASYNC_TASKS = null;
365                }
366            }
367        }
368    }
369
370    private ThreadFactory factory = new ThreadFactory() {
371        public Thread newThread(Runnable runnable) {
372            Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
373            thread.setDaemon(true);
374            return thread;
375        }
376    };
377
378    private ThreadPoolExecutor createExecutor() {
379        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
380    }
381}