001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.vm;
018
019import java.io.IOException;
020import java.io.InterruptedIOException;
021import java.net.URI;
022import java.util.concurrent.LinkedBlockingQueue;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.concurrent.atomic.AtomicLong;
025import org.apache.activemq.command.ShutdownInfo;
026import org.apache.activemq.thread.DefaultThreadPools;
027import org.apache.activemq.thread.Task;
028import org.apache.activemq.thread.TaskRunner;
029import org.apache.activemq.thread.TaskRunnerFactory;
030import org.apache.activemq.thread.Valve;
031import org.apache.activemq.transport.FutureResponse;
032import org.apache.activemq.transport.ResponseCallback;
033import org.apache.activemq.transport.Transport;
034import org.apache.activemq.transport.TransportDisposedIOException;
035import org.apache.activemq.transport.TransportListener;
036import org.apache.activemq.util.IOExceptionSupport;
037
038
039/**
040 * A Transport implementation that uses direct method invocations.
041 * 
042 * 
043 */
044public class VMTransport implements Transport, Task {
045
046    private static final Object DISCONNECT = new Object();
047    private static final AtomicLong NEXT_ID = new AtomicLong(0);
048    protected VMTransport peer;
049    protected TransportListener transportListener;
050    protected boolean disposed;
051    protected boolean marshal;
052    protected boolean network;
053    protected boolean async = true;
054    protected int asyncQueueDepth = 2000;
055    protected LinkedBlockingQueue<Object> messageQueue;
056    protected boolean started;
057    protected final URI location;
058    protected final long id;
059    private TaskRunner taskRunner;
060    private final Object lazyInitMutext = new Object();
061    private final Valve enqueueValve = new Valve(true);
062    protected final AtomicBoolean stopping = new AtomicBoolean();
063    private volatile int receiveCounter;
064    
065    public VMTransport(URI location) {
066        this.location = location;
067        this.id = NEXT_ID.getAndIncrement();
068    }
069
070    public void setPeer(VMTransport peer) {
071        this.peer = peer;
072    }
073
074    public void oneway(Object command) throws IOException {
075        if (disposed) {
076            throw new TransportDisposedIOException("Transport disposed.");
077        }
078        if (peer == null) {
079            throw new IOException("Peer not connected.");
080        }
081
082        
083        TransportListener transportListener=null;
084        try {
085            // Disable the peer from changing his state while we try to enqueue onto him.
086            peer.enqueueValve.increment();
087        
088            if (peer.disposed || peer.stopping.get()) {
089                throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
090            }
091            
092            if (peer.started) {
093                if (peer.async) {
094                    peer.getMessageQueue().put(command);
095                    peer.wakeup();
096                } else {
097                    transportListener = peer.transportListener;
098                }
099            } else {
100                peer.getMessageQueue().put(command);
101            }
102            
103        } catch (InterruptedException e) {
104            InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
105            iioe.initCause(e);
106            throw iioe;
107        } finally {
108            // Allow the peer to change state again...
109            peer.enqueueValve.decrement();
110        }
111
112        dispatch(peer, transportListener, command);
113    }
114    
115    public void dispatch(VMTransport transport, TransportListener transportListener, Object command) {
116        if( transportListener!=null ) {
117            if( command == DISCONNECT ) {
118                transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
119            } else {
120                transport.receiveCounter++;
121                transportListener.onCommand(command);
122            }
123        }
124    }
125
126    public void start() throws Exception {
127        if (transportListener == null) {
128            throw new IOException("TransportListener not set.");
129        }
130        try {
131            enqueueValve.turnOff();
132            if (messageQueue != null && !async) {
133                Object command;
134                while ((command = messageQueue.poll()) != null && !stopping.get() ) {
135                    receiveCounter++;
136                    dispatch(this, transportListener, command);
137                }
138            }
139            started = true;
140            wakeup();
141        } finally {
142            enqueueValve.turnOn();
143        }
144        // If we get stopped while starting up, then do the actual stop now 
145        // that the enqueueValve is back on.
146        if( stopping.get() ) {
147            stop();
148        }
149    }
150
151    public void stop() throws Exception {
152        stopping.set(true);
153        
154        // If stop() is called while being start()ed.. then we can't stop until we return to the start() method.
155        if( enqueueValve.isOn() ) {
156                
157            // let the peer know that we are disconnecting..
158            try {
159                peer.transportListener.onCommand(new ShutdownInfo());
160            } catch (Exception ignore) {
161            }
162                
163                
164            TaskRunner tr = null;
165            try {
166                enqueueValve.turnOff();
167                if (!disposed) {
168                    started = false;
169                    disposed = true;
170                    if (taskRunner != null) {
171                        tr = taskRunner;
172                        taskRunner = null;
173                    }
174                }
175            } finally {
176                stopping.set(false);
177                enqueueValve.turnOn();
178            }
179            if (tr != null) {
180                tr.shutdown(1000);
181            }
182            
183
184        }
185        
186    }
187    
188    /**
189     * @see org.apache.activemq.thread.Task#iterate()
190     */
191    public boolean iterate() {
192        
193        final TransportListener tl;
194        try {
195            // Disable changing the state variables while we are running... 
196            enqueueValve.increment();
197            tl = transportListener;
198            if (!started || disposed || tl == null || stopping.get()) {
199                if( stopping.get() ) {
200                    // drain the queue it since folks could be blocked putting on to
201                    // it and that would not allow the stop() method for finishing up.
202                    getMessageQueue().clear();  
203                }
204                return false;
205            }
206        } catch (InterruptedException e) {
207            return false;
208        } finally {
209            enqueueValve.decrement();
210        }
211
212        LinkedBlockingQueue<Object> mq = getMessageQueue();
213        Object command = mq.poll();
214        if (command != null) {
215            if( command == DISCONNECT ) {
216                tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
217            } else {
218                tl.onCommand(command);
219            }
220            return !mq.isEmpty();
221        } else {
222            return false;
223        }
224        
225    }
226
227    public void setTransportListener(TransportListener commandListener) {
228        try {
229            try {
230                enqueueValve.turnOff();
231                this.transportListener = commandListener;
232                wakeup();
233            } finally {
234                enqueueValve.turnOn();
235            }
236        } catch (InterruptedException e) {
237            throw new RuntimeException(e);
238        }
239    }
240
241    private LinkedBlockingQueue<Object> getMessageQueue() {
242        synchronized (lazyInitMutext) {
243            if (messageQueue == null) {
244                messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
245            }
246            return messageQueue;
247        }
248    }
249
250    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
251        throw new AssertionError("Unsupported Method");
252    }
253
254    public Object request(Object command) throws IOException {
255        throw new AssertionError("Unsupported Method");
256    }
257
258    public Object request(Object command, int timeout) throws IOException {
259        throw new AssertionError("Unsupported Method");
260    }
261
262    public TransportListener getTransportListener() {
263        return transportListener;
264    }
265
266    public <T> T narrow(Class<T> target) {
267        if (target.isAssignableFrom(getClass())) {
268            return target.cast(this);
269        }
270        return null;
271    }
272
273    public boolean isMarshal() {
274        return marshal;
275    }
276
277    public void setMarshal(boolean marshal) {
278        this.marshal = marshal;
279    }
280
281    public boolean isNetwork() {
282        return network;
283    }
284
285    public void setNetwork(boolean network) {
286        this.network = network;
287    }
288
289    @Override
290    public String toString() {
291        return location + "#" + id;
292    }
293
294    public String getRemoteAddress() {
295        if (peer != null) {
296            return peer.toString();
297        }
298        return null;
299    }
300
301    /**
302     * @return the async
303     */
304    public boolean isAsync() {
305        return async;
306    }
307
308    /**
309     * @param async the async to set
310     */
311    public void setAsync(boolean async) {
312        this.async = async;
313    }
314
315    /**
316     * @return the asyncQueueDepth
317     */
318    public int getAsyncQueueDepth() {
319        return asyncQueueDepth;
320    }
321
322    /**
323     * @param asyncQueueDepth the asyncQueueDepth to set
324     */
325    public void setAsyncQueueDepth(int asyncQueueDepth) {
326        this.asyncQueueDepth = asyncQueueDepth;
327    }
328
329    protected void wakeup() {
330        if (async) {
331            synchronized (lazyInitMutext) {
332                if (taskRunner == null) {
333                    taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
334                }
335            }
336            try {
337                taskRunner.wakeup();
338            } catch (InterruptedException e) {
339                Thread.currentThread().interrupt();
340            }
341        }
342    }
343
344    public boolean isFaultTolerant() {
345        return false;
346    }
347
348        public boolean isDisposed() {
349                return disposed;
350        }
351        
352        public boolean isConnected() {
353            return started;
354        }
355
356        public void reconnect(URI uri) throws IOException {
357        throw new IOException("Not supported");
358    }
359
360    public boolean isReconnectSupported() {
361        return false;
362    }
363
364    public boolean isUpdateURIsSupported() {
365        return false;
366    }
367    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
368        throw new IOException("Not supported");
369    }
370
371    public int getReceiveCounter() {
372        return receiveCounter;
373    }
374}