001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.vm;
018
019import java.io.IOException;
020import java.net.InetSocketAddress;
021import java.net.URI;
022import java.util.concurrent.atomic.AtomicInteger;
023
024import org.apache.activemq.command.BrokerInfo;
025import org.apache.activemq.transport.MutexTransport;
026import org.apache.activemq.transport.ResponseCorrelator;
027import org.apache.activemq.transport.Transport;
028import org.apache.activemq.transport.TransportAcceptListener;
029import org.apache.activemq.transport.TransportServer;
030
031/**
032 * Broker side of the VMTransport
033 */
034public class VMTransportServer implements TransportServer {
035
036    private TransportAcceptListener acceptListener;
037    private final URI location;
038    private boolean disposed;
039
040    private final AtomicInteger connectionCount = new AtomicInteger(0);
041    private final boolean disposeOnDisconnect;
042
043    /**
044     * @param location
045     * @param disposeOnDisconnect
046     */
047    public VMTransportServer(URI location, boolean disposeOnDisconnect) {
048        this.location = location;
049        this.disposeOnDisconnect = disposeOnDisconnect;
050    }
051
052    /**
053     * @return a pretty print of this
054     */
055    public String toString() {
056        return "VMTransportServer(" + location + ")";
057    }
058
059    /**
060     * @return new VMTransport
061     * @throws IOException
062     */
063    public VMTransport connect() throws IOException {
064        TransportAcceptListener al;
065        synchronized (this) {
066            if (disposed) {
067                throw new IOException("Server has been disposed.");
068            }
069            al = acceptListener;
070        }
071        if (al == null) {
072            throw new IOException("Server TransportAcceptListener is null.");
073        }
074
075        connectionCount.incrementAndGet();
076        VMTransport client = new VMTransport(location) {
077            public void stop() throws Exception {
078                if (stopping.compareAndSet(false, true) && !disposed) {
079                                        super.stop();
080                                        if (connectionCount.decrementAndGet() == 0
081                                                        && disposeOnDisconnect) {
082                                                VMTransportServer.this.stop();
083                                        }
084                                }
085            };
086        };
087
088        VMTransport server = new VMTransport(location);
089        client.setPeer(server);
090        server.setPeer(client);
091        al.onAccept(configure(server));
092        return client;
093    }
094
095    /**
096     * Configure transport
097     * 
098     * @param transport
099     * @return the Transport
100     */
101    public static Transport configure(Transport transport) {
102        transport = new MutexTransport(transport);
103        transport = new ResponseCorrelator(transport);
104        return transport;
105    }
106
107    /**
108     * Set the Transport accept listener for new Connections
109     * 
110     * @param acceptListener
111     */
112    public synchronized void setAcceptListener(TransportAcceptListener acceptListener) {
113        this.acceptListener = acceptListener;
114    }
115
116    public void start() throws IOException {
117    }
118
119    public void stop() throws IOException {
120        VMTransportFactory.stopped(this);
121    }
122
123    public URI getConnectURI() {
124        return location;
125    }
126
127    public URI getBindURI() {
128        return location;
129    }
130
131    public void setBrokerInfo(BrokerInfo brokerInfo) {
132    }
133
134    public InetSocketAddress getSocketAddress() {
135        return null;
136    }
137    
138    public int getConnectionCount() {
139        return connectionCount.intValue();
140    }
141}