001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import org.apache.activemq.broker.region.Destination;
020import org.apache.activemq.broker.region.Region;
021import org.apache.activemq.command.Message;
022import org.apache.activemq.state.ProducerState;
023import org.slf4j.Logger;
024import org.slf4j.LoggerFactory;
025
026/**
027 * Holds internal state in the broker for a MessageProducer
028 * 
029 * 
030 */
031public class ProducerBrokerExchange {
032
033    private static final Logger LOG = LoggerFactory.getLogger(ProducerBrokerExchange.class);
034    private ConnectionContext connectionContext;
035    private Destination regionDestination;
036    private Region region;
037    private ProducerState producerState;
038    private boolean mutable = true;
039    private long lastSendSequenceNumber = -1;
040    
041    public ProducerBrokerExchange() {
042    }
043
044    public ProducerBrokerExchange copy() {
045        ProducerBrokerExchange rc = new ProducerBrokerExchange();
046        rc.connectionContext = connectionContext.copy();
047        rc.regionDestination = regionDestination;
048        rc.region = region;
049        rc.producerState = producerState;
050        rc.mutable = mutable;
051        return rc;
052    }
053
054    
055    /**
056     * @return the connectionContext
057     */
058    public ConnectionContext getConnectionContext() {
059        return this.connectionContext;
060    }
061
062    /**
063     * @param connectionContext the connectionContext to set
064     */
065    public void setConnectionContext(ConnectionContext connectionContext) {
066        this.connectionContext = connectionContext;
067    }
068
069    /**
070     * @return the mutable
071     */
072    public boolean isMutable() {
073        return this.mutable;
074    }
075
076    /**
077     * @param mutable the mutable to set
078     */
079    public void setMutable(boolean mutable) {
080        this.mutable = mutable;
081    }
082
083    /**
084     * @return the regionDestination
085     */
086    public Destination getRegionDestination() {
087        return this.regionDestination;
088    }
089
090    /**
091     * @param regionDestination the regionDestination to set
092     */
093    public void setRegionDestination(Destination regionDestination) {
094        this.regionDestination = regionDestination;
095    }
096
097    /**
098     * @return the region
099     */
100    public Region getRegion() {
101        return this.region;
102    }
103
104    /**
105     * @param region the region to set
106     */
107    public void setRegion(Region region) {
108        this.region = region;
109    }
110
111    /**
112     * @return the producerState
113     */
114    public ProducerState getProducerState() {
115        return this.producerState;
116    }
117
118    /**
119     * @param producerState the producerState to set
120     */
121    public void setProducerState(ProducerState producerState) {
122        this.producerState = producerState;
123    }
124
125    /**
126     * Enforce duplicate suppression using info from persistence adapter
127     * @param messageSend
128     * @return false if message should be ignored as a duplicate
129     */
130    public boolean canDispatch(Message messageSend) {
131        boolean canDispatch = true;
132        if (lastSendSequenceNumber > 0) {
133            if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber) {
134                canDispatch = false;
135                LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId [" 
136                        + messageSend.getMessageId().getProducerSequenceId() + "] less than last stored: "  + lastSendSequenceNumber);
137            }
138        }
139        return canDispatch;
140    }
141
142    public void setLastStoredSequenceId(long l) {
143        lastSendSequenceNumber = l;
144        LOG.debug("last stored sequence id set: " + l);
145    }
146}