001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.vm; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.net.URI; 022import java.util.concurrent.LinkedBlockingQueue; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.concurrent.atomic.AtomicLong; 025import org.apache.activemq.command.ShutdownInfo; 026import org.apache.activemq.thread.DefaultThreadPools; 027import org.apache.activemq.thread.Task; 028import org.apache.activemq.thread.TaskRunner; 029import org.apache.activemq.thread.TaskRunnerFactory; 030import org.apache.activemq.thread.Valve; 031import org.apache.activemq.transport.FutureResponse; 032import org.apache.activemq.transport.ResponseCallback; 033import org.apache.activemq.transport.Transport; 034import org.apache.activemq.transport.TransportDisposedIOException; 035import org.apache.activemq.transport.TransportListener; 036import org.apache.activemq.util.IOExceptionSupport; 037 038 039/** 040 * A Transport implementation that uses direct method invocations. 041 * 042 * 043 */ 044public class VMTransport implements Transport, Task { 045 046 private static final Object DISCONNECT = new Object(); 047 private static final AtomicLong NEXT_ID = new AtomicLong(0); 048 protected VMTransport peer; 049 protected TransportListener transportListener; 050 protected boolean disposed; 051 protected boolean marshal; 052 protected boolean network; 053 protected boolean async = true; 054 protected int asyncQueueDepth = 2000; 055 protected LinkedBlockingQueue<Object> messageQueue; 056 protected boolean started; 057 protected final URI location; 058 protected final long id; 059 private TaskRunner taskRunner; 060 private final Object lazyInitMutext = new Object(); 061 private final Valve enqueueValve = new Valve(true); 062 protected final AtomicBoolean stopping = new AtomicBoolean(); 063 private volatile int receiveCounter; 064 065 public VMTransport(URI location) { 066 this.location = location; 067 this.id = NEXT_ID.getAndIncrement(); 068 } 069 070 public void setPeer(VMTransport peer) { 071 this.peer = peer; 072 } 073 074 public void oneway(Object command) throws IOException { 075 if (disposed) { 076 throw new TransportDisposedIOException("Transport disposed."); 077 } 078 if (peer == null) { 079 throw new IOException("Peer not connected."); 080 } 081 082 083 TransportListener transportListener=null; 084 try { 085 // Disable the peer from changing his state while we try to enqueue onto him. 086 peer.enqueueValve.increment(); 087 088 if (peer.disposed || peer.stopping.get()) { 089 throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."); 090 } 091 092 if (peer.started) { 093 if (peer.async) { 094 peer.getMessageQueue().put(command); 095 peer.wakeup(); 096 } else { 097 transportListener = peer.transportListener; 098 } 099 } else { 100 peer.getMessageQueue().put(command); 101 } 102 103 } catch (InterruptedException e) { 104 InterruptedIOException iioe = new InterruptedIOException(e.getMessage()); 105 iioe.initCause(e); 106 throw iioe; 107 } finally { 108 // Allow the peer to change state again... 109 peer.enqueueValve.decrement(); 110 } 111 112 dispatch(peer, transportListener, command); 113 } 114 115 public void dispatch(VMTransport transport, TransportListener transportListener, Object command) { 116 if( transportListener!=null ) { 117 if( command == DISCONNECT ) { 118 transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); 119 } else { 120 transport.receiveCounter++; 121 transportListener.onCommand(command); 122 } 123 } 124 } 125 126 public void start() throws Exception { 127 if (transportListener == null) { 128 throw new IOException("TransportListener not set."); 129 } 130 try { 131 enqueueValve.turnOff(); 132 if (messageQueue != null && !async) { 133 Object command; 134 while ((command = messageQueue.poll()) != null && !stopping.get() ) { 135 receiveCounter++; 136 dispatch(this, transportListener, command); 137 } 138 } 139 started = true; 140 wakeup(); 141 } finally { 142 enqueueValve.turnOn(); 143 } 144 // If we get stopped while starting up, then do the actual stop now 145 // that the enqueueValve is back on. 146 if( stopping.get() ) { 147 stop(); 148 } 149 } 150 151 public void stop() throws Exception { 152 stopping.set(true); 153 154 // If stop() is called while being start()ed.. then we can't stop until we return to the start() method. 155 if( enqueueValve.isOn() ) { 156 157 // let the peer know that we are disconnecting.. 158 try { 159 peer.transportListener.onCommand(new ShutdownInfo()); 160 } catch (Exception ignore) { 161 } 162 163 164 TaskRunner tr = null; 165 try { 166 enqueueValve.turnOff(); 167 if (!disposed) { 168 started = false; 169 disposed = true; 170 if (taskRunner != null) { 171 tr = taskRunner; 172 taskRunner = null; 173 } 174 } 175 } finally { 176 stopping.set(false); 177 enqueueValve.turnOn(); 178 } 179 if (tr != null) { 180 tr.shutdown(1000); 181 } 182 183 184 } 185 186 } 187 188 /** 189 * @see org.apache.activemq.thread.Task#iterate() 190 */ 191 public boolean iterate() { 192 193 final TransportListener tl; 194 try { 195 // Disable changing the state variables while we are running... 196 enqueueValve.increment(); 197 tl = transportListener; 198 if (!started || disposed || tl == null || stopping.get()) { 199 if( stopping.get() ) { 200 // drain the queue it since folks could be blocked putting on to 201 // it and that would not allow the stop() method for finishing up. 202 getMessageQueue().clear(); 203 } 204 return false; 205 } 206 } catch (InterruptedException e) { 207 return false; 208 } finally { 209 enqueueValve.decrement(); 210 } 211 212 LinkedBlockingQueue<Object> mq = getMessageQueue(); 213 Object command = mq.poll(); 214 if (command != null) { 215 if( command == DISCONNECT ) { 216 tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); 217 } else { 218 tl.onCommand(command); 219 } 220 return !mq.isEmpty(); 221 } else { 222 return false; 223 } 224 225 } 226 227 public void setTransportListener(TransportListener commandListener) { 228 try { 229 try { 230 enqueueValve.turnOff(); 231 this.transportListener = commandListener; 232 wakeup(); 233 } finally { 234 enqueueValve.turnOn(); 235 } 236 } catch (InterruptedException e) { 237 throw new RuntimeException(e); 238 } 239 } 240 241 private LinkedBlockingQueue<Object> getMessageQueue() { 242 synchronized (lazyInitMutext) { 243 if (messageQueue == null) { 244 messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth); 245 } 246 return messageQueue; 247 } 248 } 249 250 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 251 throw new AssertionError("Unsupported Method"); 252 } 253 254 public Object request(Object command) throws IOException { 255 throw new AssertionError("Unsupported Method"); 256 } 257 258 public Object request(Object command, int timeout) throws IOException { 259 throw new AssertionError("Unsupported Method"); 260 } 261 262 public TransportListener getTransportListener() { 263 return transportListener; 264 } 265 266 public <T> T narrow(Class<T> target) { 267 if (target.isAssignableFrom(getClass())) { 268 return target.cast(this); 269 } 270 return null; 271 } 272 273 public boolean isMarshal() { 274 return marshal; 275 } 276 277 public void setMarshal(boolean marshal) { 278 this.marshal = marshal; 279 } 280 281 public boolean isNetwork() { 282 return network; 283 } 284 285 public void setNetwork(boolean network) { 286 this.network = network; 287 } 288 289 @Override 290 public String toString() { 291 return location + "#" + id; 292 } 293 294 public String getRemoteAddress() { 295 if (peer != null) { 296 return peer.toString(); 297 } 298 return null; 299 } 300 301 /** 302 * @return the async 303 */ 304 public boolean isAsync() { 305 return async; 306 } 307 308 /** 309 * @param async the async to set 310 */ 311 public void setAsync(boolean async) { 312 this.async = async; 313 } 314 315 /** 316 * @return the asyncQueueDepth 317 */ 318 public int getAsyncQueueDepth() { 319 return asyncQueueDepth; 320 } 321 322 /** 323 * @param asyncQueueDepth the asyncQueueDepth to set 324 */ 325 public void setAsyncQueueDepth(int asyncQueueDepth) { 326 this.asyncQueueDepth = asyncQueueDepth; 327 } 328 329 protected void wakeup() { 330 if (async) { 331 synchronized (lazyInitMutext) { 332 if (taskRunner == null) { 333 taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString()); 334 } 335 } 336 try { 337 taskRunner.wakeup(); 338 } catch (InterruptedException e) { 339 Thread.currentThread().interrupt(); 340 } 341 } 342 } 343 344 public boolean isFaultTolerant() { 345 return false; 346 } 347 348 public boolean isDisposed() { 349 return disposed; 350 } 351 352 public boolean isConnected() { 353 return started; 354 } 355 356 public void reconnect(URI uri) throws IOException { 357 throw new IOException("Not supported"); 358 } 359 360 public boolean isReconnectSupported() { 361 return false; 362 } 363 364 public boolean isUpdateURIsSupported() { 365 return false; 366 } 367 public void updateURIs(boolean reblance,URI[] uris) throws IOException { 368 throw new IOException("Not supported"); 369 } 370 371 public int getReceiveCounter() { 372 return receiveCounter; 373 } 374}