001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.concurrent.atomic.AtomicLong;
022import javax.jms.Destination;
023import javax.jms.IllegalStateException;
024import javax.jms.InvalidDestinationException;
025import javax.jms.JMSException;
026import javax.jms.Message;
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.command.ProducerAck;
029import org.apache.activemq.command.ProducerId;
030import org.apache.activemq.command.ProducerInfo;
031import org.apache.activemq.management.JMSProducerStatsImpl;
032import org.apache.activemq.management.StatsCapable;
033import org.apache.activemq.management.StatsImpl;
034import org.apache.activemq.usage.MemoryUsage;
035import org.apache.activemq.util.IntrospectionSupport;
036
037/**
038 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
039 * destination. A <CODE>MessageProducer</CODE> object is created by passing a
040 * <CODE>Destination</CODE> object to a message-producer creation method
041 * supplied by a session.
042 * <P>
043 * <CODE>MessageProducer</CODE> is the parent interface for all message
044 * producers.
045 * <P>
046 * A client also has the option of creating a message producer without supplying
047 * a destination. In this case, a destination must be provided with every send
048 * operation. A typical use for this kind of message producer is to send replies
049 * to requests using the request's <CODE>JMSReplyTo</CODE> destination.
050 * <P>
051 * A client can specify a default delivery mode, priority, and time to live for
052 * messages sent by a message producer. It can also specify the delivery mode,
053 * priority, and time to live for an individual message.
054 * <P>
055 * A client can specify a time-to-live value in milliseconds for each message it
056 * sends. This value defines a message expiration time that is the sum of the
057 * message's time-to-live and the GMT when it is sent (for transacted sends,
058 * this is the time the client sends the message, not the time the transaction
059 * is committed).
060 * <P>
061 * A JMS provider should do its best to expire messages accurately; however, the
062 * JMS API does not define the accuracy provided.
063 * 
064 * 
065 * @see javax.jms.TopicPublisher
066 * @see javax.jms.QueueSender
067 * @see javax.jms.Session#createProducer
068 */
069public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
070
071    protected ProducerInfo info;
072    protected boolean closed;
073
074    private final JMSProducerStatsImpl stats;
075    private AtomicLong messageSequence;
076    private final long startTime;
077    private MessageTransformer transformer;
078    private MemoryUsage producerWindow;
079
080    protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
081        super(session);
082        this.info = new ProducerInfo(producerId);
083        this.info.setWindowSize(session.connection.getProducerWindowSize());
084        if (destination != null && destination.getOptions() != null) {
085            Map<String, String> options = new HashMap<String, String>(destination.getOptions());
086            IntrospectionSupport.setProperties(this.info, options, "producer.");
087        }
088        this.info.setDestination(destination);
089
090        // Enable producer window flow control if protocol > 3 and the window
091        // size > 0
092        if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
093            producerWindow = new MemoryUsage("Producer Window: " + producerId);
094            producerWindow.setExecutor(session.getConnectionExecutor());
095            producerWindow.setLimit(this.info.getWindowSize());
096            producerWindow.start();
097        }
098
099        this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
100        this.defaultPriority = Message.DEFAULT_PRIORITY;
101        this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
102        this.startTime = System.currentTimeMillis();
103        this.messageSequence = new AtomicLong(0);
104        this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
105        this.session.addProducer(this);
106        this.session.asyncSendPacket(info);
107        this.setSendTimeout(sendTimeout);
108        setTransformer(session.getTransformer());
109    }
110
111    public StatsImpl getStats() {
112        return stats;
113    }
114
115    public JMSProducerStatsImpl getProducerStats() {
116        return stats;
117    }
118
119    /**
120     * Gets the destination associated with this <CODE>MessageProducer</CODE>.
121     * 
122     * @return this producer's <CODE>Destination/ <CODE>
123     * @throws JMSException if the JMS provider fails to close the producer due to
124     *                      some internal error.
125     * @since 1.1
126     */
127    public Destination getDestination() throws JMSException {
128        checkClosed();
129        return this.info.getDestination();
130    }
131
132    /**
133     * Closes the message producer.
134     * <P>
135     * Since a provider may allocate some resources on behalf of a <CODE>
136     * MessageProducer</CODE>
137     * outside the Java virtual machine, clients should close them when they are
138     * not needed. Relying on garbage collection to eventually reclaim these
139     * resources may not be timely enough.
140     * 
141     * @throws JMSException if the JMS provider fails to close the producer due
142     *                 to some internal error.
143     */
144    public void close() throws JMSException {
145        if (!closed) {
146            dispose();
147            this.session.asyncSendPacket(info.createRemoveCommand());
148        }
149    }
150
151    public void dispose() {
152        if (!closed) {
153            this.session.removeProducer(this);
154            if (producerWindow != null) {
155                producerWindow.stop();
156            }
157            closed = true;
158        }
159    }
160
161    /**
162     * Check if the instance of this producer has been closed.
163     * 
164     * @throws IllegalStateException
165     */
166    @Override
167    protected void checkClosed() throws IllegalStateException {
168        if (closed) {
169            throw new IllegalStateException("The producer is closed");
170        }
171    }
172
173    /**
174     * Sends a message to a destination for an unidentified message producer,
175     * specifying delivery mode, priority and time to live.
176     * <P>
177     * Typically, a message producer is assigned a destination at creation time;
178     * however, the JMS API also supports unidentified message producers, which
179     * require that the destination be supplied every time a message is sent.
180     * 
181     * @param destination the destination to send this message to
182     * @param message the message to send
183     * @param deliveryMode the delivery mode to use
184     * @param priority the priority for this message
185     * @param timeToLive the message's lifetime (in milliseconds)
186     * @throws JMSException if the JMS provider fails to send the message due to
187     *                 some internal error.
188     * @throws UnsupportedOperationException if an invalid destination is
189     *                 specified.
190     * @throws InvalidDestinationException if a client uses this method with an
191     *                 invalid destination.
192     * @see javax.jms.Session#createProducer
193     * @since 1.1
194     */
195    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
196        checkClosed();
197        if (destination == null) {
198            if (info.getDestination() == null) {
199                throw new UnsupportedOperationException("A destination must be specified.");
200            }
201            throw new InvalidDestinationException("Don't understand null destinations");
202        }
203
204        ActiveMQDestination dest;
205        if (destination == info.getDestination()) {
206            dest = (ActiveMQDestination)destination;
207        } else if (info.getDestination() == null) {
208            dest = ActiveMQDestination.transform(destination);
209        } else {
210            throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
211        }
212        if (dest == null) {
213            throw new JMSException("No destination specified");
214        }
215
216        if (transformer != null) {
217            Message transformedMessage = transformer.producerTransform(session, this, message);
218            if (transformedMessage != null) {
219                message = transformedMessage;
220            }
221        }
222
223        if (producerWindow != null) {
224            try {
225                producerWindow.waitForSpace();
226            } catch (InterruptedException e) {
227                throw new JMSException("Send aborted due to thread interrupt.");
228            }
229        }
230
231        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
232
233        stats.onMessage();
234    }
235
236    public MessageTransformer getTransformer() {
237        return transformer;
238    }
239
240    /**
241     * Sets the transformer used to transform messages before they are sent on
242     * to the JMS bus
243     */
244    public void setTransformer(MessageTransformer transformer) {
245        this.transformer = transformer;
246    }
247
248    /**
249     * @return the time in milli second when this object was created.
250     */
251    protected long getStartTime() {
252        return this.startTime;
253    }
254
255    /**
256     * @return Returns the messageSequence.
257     */
258    protected long getMessageSequence() {
259        return messageSequence.incrementAndGet();
260    }
261
262    /**
263     * @param messageSequence The messageSequence to set.
264     */
265    protected void setMessageSequence(AtomicLong messageSequence) {
266        this.messageSequence = messageSequence;
267    }
268
269    /**
270     * @return Returns the info.
271     */
272    protected ProducerInfo getProducerInfo() {
273        return this.info != null ? this.info : null;
274    }
275
276    /**
277     * @param info The info to set
278     */
279    protected void setProducerInfo(ProducerInfo info) {
280        this.info = info;
281    }
282
283    @Override
284    public String toString() {
285        return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
286    }
287
288    public void onProducerAck(ProducerAck pa) {
289        if (this.producerWindow != null) {
290            this.producerWindow.decreaseUsage(pa.getSize());
291        }
292    }
293
294}