001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.transport;
019
020import java.io.IOException;
021import java.net.Socket;
022import java.util.Iterator;
023import java.util.concurrent.ConcurrentLinkedQueue;
024import java.util.concurrent.atomic.AtomicInteger;
025import java.util.concurrent.locks.Condition;
026import java.util.concurrent.locks.ReentrantLock;
027
028import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
029import org.apache.activemq.transport.tcp.TimeStampStream;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * This filter implements write timeouts for socket write operations.
035 * When using blocking IO, the Java implementation doesn't have an explicit flag
036 * to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions,
037 * which is usually around 13-30 minutes).<br/>
038 * To enable this transport, in the transport URI, simpley add<br/>
039 * <code>transport.soWriteTimeout=<value in millis></code>.<br/>
040 * For example (15 second timeout on write operations to the socket):</br>
041 * <pre><code>
042 * &lt;transportConnector 
043 *     name=&quot;tcp1&quot; 
044 *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
045 * /&gt;
046 * </code></pre><br/>
047 * For example (enable default timeout on the socket):</br>
048 * <pre><code>
049 * &lt;transportConnector 
050 *     name=&quot;tcp1&quot; 
051 *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
052 * /&gt;
053 * </code></pre>
054 * @author Filip Hanik
055 *
056 */
057public class WriteTimeoutFilter extends TransportFilter {
058
059    private static final Logger LOG = LoggerFactory.getLogger(WriteTimeoutFilter.class);
060    protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>();
061    protected static AtomicInteger messageCounter = new AtomicInteger(0);
062    protected static TimeoutThread timeoutThread = new TimeoutThread(); 
063    
064    protected static long sleep = 5000l;
065
066    protected long writeTimeout = -1;
067    
068    public WriteTimeoutFilter(Transport next) {
069        super(next);
070    }
071
072    @Override
073    public void oneway(Object command) throws IOException {
074        try {
075            registerWrite(this);
076            super.oneway(command);
077        } catch (IOException x) {
078            throw x;
079        } finally {
080            deRegisterWrite(this,false,null);
081        }
082    }
083    
084    public long getWriteTimeout() {
085        return writeTimeout;
086    }
087
088    public void setWriteTimeout(long writeTimeout) {
089        this.writeTimeout = writeTimeout;
090    }
091    
092    public static long getSleep() {
093        return sleep;
094    }
095
096    public static void setSleep(long sleep) {
097        WriteTimeoutFilter.sleep = sleep;
098    }
099
100    
101    protected TimeStampStream getWriter() {
102        return next.narrow(TimeStampStream.class);
103    }
104    
105    protected Socket getSocket() {
106        return next.narrow(Socket.class);
107    }
108    
109    protected static void registerWrite(WriteTimeoutFilter filter) {
110        writers.add(filter);
111    }
112    
113    protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
114        boolean result = writers.remove(filter); 
115        if (result) {
116            if (fail) {
117                String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress();
118                LOG.warn(message);
119                Socket sock = filter.getSocket();
120                if (sock==null) {
121                    LOG.error("Destination socket is null, unable to close socket.("+message+")");
122                } else {
123                    try {
124                        sock.close();
125                    }catch (IOException ignore) {
126                    }
127                }
128            }
129        }
130        return result;
131    }
132    
133    @Override
134    public void start() throws Exception {
135        super.start();
136    }
137    
138    @Override
139    public void stop() throws Exception {
140        super.stop();
141    }
142    
143    protected static class TimeoutThread extends Thread {
144        static AtomicInteger instance = new AtomicInteger(0);
145        boolean run = true;
146        public TimeoutThread() {
147            setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet());
148            setDaemon(true);
149            setPriority(Thread.MIN_PRIORITY);
150            start();
151        }
152
153        
154        public void run() {
155            while (run) {
156                boolean error = false;
157                try {
158                        if (!interrupted()) {
159                                Iterator<WriteTimeoutFilter> filters = writers.iterator();
160                            while (run && filters.hasNext()) { 
161                            WriteTimeoutFilter filter = filters.next();
162                            if (filter.getWriteTimeout()<=0) continue; //no timeout set
163                            long writeStart = filter.getWriter().getWriteTimestamp();
164                            long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
165                            if (delta>filter.getWriteTimeout()) {
166                                WriteTimeoutFilter.deRegisterWrite(filter, true,null);
167                            }//if timeout
168                        }//while
169                    }//if interrupted
170                    try {
171                        Thread.sleep(getSleep());
172                        error = false;
173                    } catch (InterruptedException x) {
174                        //do nothing
175                    }
176                }catch (Throwable t) { //make sure this thread never dies
177                    if (!error) { //use error flag to avoid filling up the logs
178                        LOG.error("WriteTimeout thread unable validate existing sockets.",t);
179                        error = true;
180                    }
181                }
182            }
183        }
184    }
185
186}