001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import org.apache.activemq.command.ConsumerId; 020import org.apache.activemq.command.Message; 021import org.apache.activemq.command.MessageId; 022 023/** 024 * Keeps track of a message that is flowing through the Broker. This object may 025 * hold a hard reference to the message or only hold the id of the message if 026 * the message has been persisted on in a MessageStore. 027 * 028 * 029 */ 030public class IndirectMessageReference implements QueueMessageReference { 031 032 /** The subscription that has locked the message */ 033 private LockOwner lockOwner; 034 /** Has the message been dropped? */ 035 private boolean dropped; 036 /** Has the message been acked? */ 037 private boolean acked; 038 /** Direct reference to the message */ 039 private final Message message; 040 041 /** 042 * @param message 043 */ 044 public IndirectMessageReference(final Message message) { 045 this.message = message; 046 message.getMessageId(); 047 message.getGroupID(); 048 message.getGroupSequence(); 049 } 050 051 public Message getMessageHardRef() { 052 return message; 053 } 054 055 public int getReferenceCount() { 056 return message.getReferenceCount(); 057 } 058 059 public int incrementReferenceCount() { 060 return message.incrementReferenceCount(); 061 } 062 063 public int decrementReferenceCount() { 064 return message.decrementReferenceCount(); 065 } 066 067 public Message getMessage() { 068 return message; 069 } 070 071 public String toString() { 072 return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null); 073 } 074 075 public void incrementRedeliveryCounter() { 076 message.incrementRedeliveryCounter(); 077 } 078 079 public synchronized boolean isDropped() { 080 return dropped; 081 } 082 083 public synchronized void drop() { 084 dropped = true; 085 lockOwner = null; 086 message.decrementReferenceCount(); 087 } 088 089 public boolean lock(LockOwner subscription) { 090 synchronized (this) { 091 if (dropped || lockOwner != null) { 092 return false; 093 } 094 lockOwner = subscription; 095 return true; 096 } 097 } 098 099 public synchronized boolean unlock() { 100 boolean result = lockOwner != null; 101 lockOwner = null; 102 return result; 103 } 104 105 public synchronized LockOwner getLockOwner() { 106 return lockOwner; 107 } 108 109 public int getRedeliveryCounter() { 110 return message.getRedeliveryCounter(); 111 } 112 113 public MessageId getMessageId() { 114 return message.getMessageId(); 115 } 116 117 public Destination getRegionDestination() { 118 return message.getRegionDestination(); 119 } 120 121 public boolean isPersistent() { 122 return message.isPersistent(); 123 } 124 125 public synchronized boolean isLocked() { 126 return lockOwner != null; 127 } 128 129 public synchronized boolean isAcked() { 130 return acked; 131 } 132 133 public synchronized void setAcked(boolean b) { 134 acked = b; 135 } 136 137 public String getGroupID() { 138 return message.getGroupID(); 139 } 140 141 public int getGroupSequence() { 142 return message.getGroupSequence(); 143 } 144 145 public ConsumerId getTargetConsumerId() { 146 return message.getTargetConsumerId(); 147 } 148 149 public long getExpiration() { 150 return message.getExpiration(); 151 } 152 153 public boolean isExpired() { 154 return message.isExpired(); 155 } 156 157 public synchronized int getSize() { 158 return message.getSize(); 159 } 160 161 public boolean isAdvisory() { 162 return message.isAdvisory(); 163 } 164}