001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import org.apache.activemq.broker.region.Destination; 020import org.apache.activemq.broker.region.Region; 021import org.apache.activemq.command.Message; 022import org.apache.activemq.state.ProducerState; 023import org.slf4j.Logger; 024import org.slf4j.LoggerFactory; 025 026/** 027 * Holds internal state in the broker for a MessageProducer 028 * 029 * 030 */ 031public class ProducerBrokerExchange { 032 033 private static final Logger LOG = LoggerFactory.getLogger(ProducerBrokerExchange.class); 034 private ConnectionContext connectionContext; 035 private Destination regionDestination; 036 private Region region; 037 private ProducerState producerState; 038 private boolean mutable = true; 039 private long lastSendSequenceNumber = -1; 040 041 public ProducerBrokerExchange() { 042 } 043 044 public ProducerBrokerExchange copy() { 045 ProducerBrokerExchange rc = new ProducerBrokerExchange(); 046 rc.connectionContext = connectionContext.copy(); 047 rc.regionDestination = regionDestination; 048 rc.region = region; 049 rc.producerState = producerState; 050 rc.mutable = mutable; 051 return rc; 052 } 053 054 055 /** 056 * @return the connectionContext 057 */ 058 public ConnectionContext getConnectionContext() { 059 return this.connectionContext; 060 } 061 062 /** 063 * @param connectionContext the connectionContext to set 064 */ 065 public void setConnectionContext(ConnectionContext connectionContext) { 066 this.connectionContext = connectionContext; 067 } 068 069 /** 070 * @return the mutable 071 */ 072 public boolean isMutable() { 073 return this.mutable; 074 } 075 076 /** 077 * @param mutable the mutable to set 078 */ 079 public void setMutable(boolean mutable) { 080 this.mutable = mutable; 081 } 082 083 /** 084 * @return the regionDestination 085 */ 086 public Destination getRegionDestination() { 087 return this.regionDestination; 088 } 089 090 /** 091 * @param regionDestination the regionDestination to set 092 */ 093 public void setRegionDestination(Destination regionDestination) { 094 this.regionDestination = regionDestination; 095 } 096 097 /** 098 * @return the region 099 */ 100 public Region getRegion() { 101 return this.region; 102 } 103 104 /** 105 * @param region the region to set 106 */ 107 public void setRegion(Region region) { 108 this.region = region; 109 } 110 111 /** 112 * @return the producerState 113 */ 114 public ProducerState getProducerState() { 115 return this.producerState; 116 } 117 118 /** 119 * @param producerState the producerState to set 120 */ 121 public void setProducerState(ProducerState producerState) { 122 this.producerState = producerState; 123 } 124 125 /** 126 * Enforce duplicate suppression using info from persistence adapter 127 * @param messageSend 128 * @return false if message should be ignored as a duplicate 129 */ 130 public boolean canDispatch(Message messageSend) { 131 boolean canDispatch = true; 132 if (lastSendSequenceNumber > 0) { 133 if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber) { 134 canDispatch = false; 135 LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId [" 136 + messageSend.getMessageId().getProducerSequenceId() + "] less than last stored: " + lastSendSequenceNumber); 137 } 138 } 139 return canDispatch; 140 } 141 142 public void setLastStoredSequenceId(long l) { 143 lastSendSequenceNumber = l; 144 LOG.debug("last stored sequence id set: " + l); 145 } 146}