001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.util.Set;
020import java.util.concurrent.CopyOnWriteArraySet;
021import java.util.concurrent.atomic.AtomicBoolean;
022import java.util.concurrent.atomic.AtomicInteger;
023
024import org.apache.activemq.command.ConsumerId;
025import org.apache.activemq.command.ConsumerInfo;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029/**
030 * Represents a network bridge interface
031 * 
032 * 
033 */
034public class DemandSubscription {
035    private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class);
036
037    private final ConsumerInfo remoteInfo;
038    private final ConsumerInfo localInfo;
039    private Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
040
041    private AtomicInteger dispatched = new AtomicInteger(0);
042    private AtomicBoolean activeWaiter = new AtomicBoolean();
043
044    DemandSubscription(ConsumerInfo info) {
045        remoteInfo = info;
046        localInfo = info.copy();
047        localInfo.setNetworkSubscription(true);
048        remoteSubsIds.add(info.getConsumerId());
049    }
050
051    /**
052     * Increment the consumers associated with this subscription
053     * 
054     * @param id
055     * @return true if added
056     */
057    public boolean add(ConsumerId id) {
058        return remoteSubsIds.add(id);
059    }
060
061    /**
062     * Increment the consumers associated with this subscription
063     * 
064     * @param id
065     * @return true if removed
066     */
067    public boolean remove(ConsumerId id) {
068        return remoteSubsIds.remove(id);
069    }
070
071    /**
072     * @return true if there are no interested consumers
073     */
074    public boolean isEmpty() {
075        return remoteSubsIds.isEmpty();
076    }
077
078    /**
079     * @return Returns the localInfo.
080     */
081    public ConsumerInfo getLocalInfo() {
082        return localInfo;
083    }
084
085    /**
086     * @return Returns the remoteInfo.
087     */
088    public ConsumerInfo getRemoteInfo() {
089        return remoteInfo;
090    }
091
092    public void waitForCompletion() {
093        if (dispatched.get() > 0) {
094            if (LOG.isDebugEnabled()) {
095                LOG.debug("Waiting for completion for sub: " + localInfo.getConsumerId() + ", dispatched: " + this.dispatched.get());
096            }
097            activeWaiter.set(true);
098            if (dispatched.get() > 0) {
099                synchronized (activeWaiter) {
100                    try {
101                        activeWaiter.wait();
102                    } catch (InterruptedException ignored) {
103                    }
104                }
105                if (this.dispatched.get() > 0) {
106                    LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially " + this.dispatched.get() + " duplicate deliveried");
107                }
108            }
109        }
110    }
111
112    public void decrementOutstandingResponses() {
113        if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) {
114            synchronized (activeWaiter) {
115                activeWaiter.notifyAll();
116            }
117        }
118    }
119
120    public boolean incrementOutstandingResponses() {
121        dispatched.incrementAndGet();
122        if (activeWaiter.get()) {
123            decrementOutstandingResponses();
124            return false;
125        }
126        return true;
127    }
128}