001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.transport; 019 020import java.io.IOException; 021import java.net.Socket; 022import java.util.Iterator; 023import java.util.concurrent.ConcurrentLinkedQueue; 024import java.util.concurrent.atomic.AtomicInteger; 025import java.util.concurrent.locks.Condition; 026import java.util.concurrent.locks.ReentrantLock; 027 028import org.apache.activemq.transport.tcp.TcpBufferedOutputStream; 029import org.apache.activemq.transport.tcp.TimeStampStream; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * This filter implements write timeouts for socket write operations. 035 * When using blocking IO, the Java implementation doesn't have an explicit flag 036 * to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions, 037 * which is usually around 13-30 minutes).<br/> 038 * To enable this transport, in the transport URI, simpley add<br/> 039 * <code>transport.soWriteTimeout=<value in millis></code>.<br/> 040 * For example (15 second timeout on write operations to the socket):</br> 041 * <pre><code> 042 * <transportConnector 043 * name="tcp1" 044 * uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000" 045 * /> 046 * </code></pre><br/> 047 * For example (enable default timeout on the socket):</br> 048 * <pre><code> 049 * <transportConnector 050 * name="tcp1" 051 * uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000" 052 * /> 053 * </code></pre> 054 * @author Filip Hanik 055 * 056 */ 057public class WriteTimeoutFilter extends TransportFilter { 058 059 private static final Logger LOG = LoggerFactory.getLogger(WriteTimeoutFilter.class); 060 protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>(); 061 protected static AtomicInteger messageCounter = new AtomicInteger(0); 062 protected static TimeoutThread timeoutThread = new TimeoutThread(); 063 064 protected static long sleep = 5000l; 065 066 protected long writeTimeout = -1; 067 068 public WriteTimeoutFilter(Transport next) { 069 super(next); 070 } 071 072 @Override 073 public void oneway(Object command) throws IOException { 074 try { 075 registerWrite(this); 076 super.oneway(command); 077 } catch (IOException x) { 078 throw x; 079 } finally { 080 deRegisterWrite(this,false,null); 081 } 082 } 083 084 public long getWriteTimeout() { 085 return writeTimeout; 086 } 087 088 public void setWriteTimeout(long writeTimeout) { 089 this.writeTimeout = writeTimeout; 090 } 091 092 public static long getSleep() { 093 return sleep; 094 } 095 096 public static void setSleep(long sleep) { 097 WriteTimeoutFilter.sleep = sleep; 098 } 099 100 101 protected TimeStampStream getWriter() { 102 return next.narrow(TimeStampStream.class); 103 } 104 105 protected Socket getSocket() { 106 return next.narrow(Socket.class); 107 } 108 109 protected static void registerWrite(WriteTimeoutFilter filter) { 110 writers.add(filter); 111 } 112 113 protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) { 114 boolean result = writers.remove(filter); 115 if (result) { 116 if (fail) { 117 String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress(); 118 LOG.warn(message); 119 Socket sock = filter.getSocket(); 120 if (sock==null) { 121 LOG.error("Destination socket is null, unable to close socket.("+message+")"); 122 } else { 123 try { 124 sock.close(); 125 }catch (IOException ignore) { 126 } 127 } 128 } 129 } 130 return result; 131 } 132 133 @Override 134 public void start() throws Exception { 135 super.start(); 136 } 137 138 @Override 139 public void stop() throws Exception { 140 super.stop(); 141 } 142 143 protected static class TimeoutThread extends Thread { 144 static AtomicInteger instance = new AtomicInteger(0); 145 boolean run = true; 146 public TimeoutThread() { 147 setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet()); 148 setDaemon(true); 149 setPriority(Thread.MIN_PRIORITY); 150 start(); 151 } 152 153 154 public void run() { 155 while (run) { 156 boolean error = false; 157 try { 158 if (!interrupted()) { 159 Iterator<WriteTimeoutFilter> filters = writers.iterator(); 160 while (run && filters.hasNext()) { 161 WriteTimeoutFilter filter = filters.next(); 162 if (filter.getWriteTimeout()<=0) continue; //no timeout set 163 long writeStart = filter.getWriter().getWriteTimestamp(); 164 long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1; 165 if (delta>filter.getWriteTimeout()) { 166 WriteTimeoutFilter.deRegisterWrite(filter, true,null); 167 }//if timeout 168 }//while 169 }//if interrupted 170 try { 171 Thread.sleep(getSleep()); 172 error = false; 173 } catch (InterruptedException x) { 174 //do nothing 175 } 176 }catch (Throwable t) { //make sure this thread never dies 177 if (!error) { //use error flag to avoid filling up the logs 178 LOG.error("WriteTimeout thread unable validate existing sockets.",t); 179 error = true; 180 } 181 } 182 } 183 } 184 } 185 186}