001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq;
019
020import java.util.List;
021import javax.jms.JMSException;
022import org.apache.activemq.command.ConsumerId;
023import org.apache.activemq.command.MessageDispatch;
024import org.apache.activemq.thread.Task;
025import org.apache.activemq.thread.TaskRunner;
026import org.apache.activemq.util.JMSExceptionSupport;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * A utility class used by the Session for dispatching messages asynchronously
032 * to consumers
033 * 
034 * 
035 * @see javax.jms.Session
036 */
037public class ActiveMQSessionExecutor implements Task {
038    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSessionExecutor.class);
039
040    private final ActiveMQSession session;
041    private final MessageDispatchChannel messageQueue;
042    private boolean dispatchedBySessionPool;
043    private volatile TaskRunner taskRunner;
044    private boolean startedOrWarnedThatNotStarted;
045
046    ActiveMQSessionExecutor(ActiveMQSession session) {
047        this.session = session;
048        if (this.session.connection != null && this.session.connection.isMessagePrioritySupported()) {
049           this.messageQueue = new SimplePriorityMessageDispatchChannel();
050        }else {
051            this.messageQueue = new FifoMessageDispatchChannel();
052        }
053    }
054
055    void setDispatchedBySessionPool(boolean value) {
056        dispatchedBySessionPool = value;
057        wakeup();
058    }
059
060    void execute(MessageDispatch message) throws InterruptedException {
061
062        if (!startedOrWarnedThatNotStarted) {
063
064            ActiveMQConnection connection = session.connection;
065            long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
066            if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) {
067                startedOrWarnedThatNotStarted = true;
068            } else {
069                long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated();
070
071                // lets only warn when a significant amount of time has passed
072                // just in case its normal operation
073                if (elapsedTime > aboutUnstartedConnectionTimeout) {
074                    LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection
075                             + " Received: " + message);
076                    startedOrWarnedThatNotStarted = true;
077                }
078            }
079        }
080
081        if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
082            dispatch(message);
083        } else {
084            messageQueue.enqueue(message);
085            wakeup();
086        }
087    }
088
089    public void wakeup() {
090        if (!dispatchedBySessionPool) {
091            if (session.isSessionAsyncDispatch()) {
092                try {
093                    TaskRunner taskRunner = this.taskRunner;
094                    if (taskRunner == null) {
095                        synchronized (this) {
096                            if (this.taskRunner == null) {
097                                if (!isRunning()) {
098                                    // stop has been called
099                                    return;
100                                }
101                                this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
102                                        "ActiveMQ Session: " + session.getSessionId());
103                            }
104                            taskRunner = this.taskRunner;
105                        }
106                    }
107                    taskRunner.wakeup();
108                } catch (InterruptedException e) {
109                    Thread.currentThread().interrupt();
110                }
111            } else {
112                while (iterate()) {
113                }
114            }
115        }
116    }
117
118    void executeFirst(MessageDispatch message) {
119        messageQueue.enqueueFirst(message);
120        wakeup();
121    }
122
123    public boolean hasUncomsumedMessages() {
124        return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
125    }
126
127    void dispatch(MessageDispatch message) {
128
129        // TODO - we should use a Map for this indexed by consumerId
130
131        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
132            ConsumerId consumerId = message.getConsumerId();
133            if (consumerId.equals(consumer.getConsumerId())) {
134                consumer.dispatch(message);
135                break;
136            }
137        }
138    }
139
140    synchronized void start() {
141        if (!messageQueue.isRunning()) {
142            messageQueue.start();
143            if (hasUncomsumedMessages()) {
144                wakeup();
145            }
146        }
147    }
148
149    void stop() throws JMSException {
150        try {
151            if (messageQueue.isRunning()) {
152                synchronized(this) {
153                    messageQueue.stop();
154                    if (this.taskRunner != null) {
155                        this.taskRunner.shutdown();
156                        this.taskRunner = null;
157                    }
158                }
159            }
160        } catch (InterruptedException e) {
161            Thread.currentThread().interrupt();
162            throw JMSExceptionSupport.create(e);
163        }
164    }
165
166    boolean isRunning() {
167        return messageQueue.isRunning();
168    }
169
170    void close() {
171        messageQueue.close();
172    }
173
174    void clear() {
175        messageQueue.clear();
176    }
177
178    MessageDispatch dequeueNoWait() {
179        return messageQueue.dequeueNoWait();
180    }
181
182    protected void clearMessagesInProgress() {
183        messageQueue.clear();
184    }
185
186    public boolean isEmpty() {
187        return messageQueue.isEmpty();
188    }
189
190    public boolean iterate() {
191
192        // Deliver any messages queued on the consumer to their listeners.
193        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
194            if (consumer.iterate()) {
195                return true;
196            }
197        }
198
199        // No messages left queued on the listeners.. so now dispatch messages
200        // queued on the session
201        MessageDispatch message = messageQueue.dequeueNoWait();
202        if (message == null) {
203            return false;
204        } else {
205            dispatch(message);
206            return !messageQueue.isEmpty();
207        }
208    }
209
210    List getUnconsumedMessages() {
211        return messageQueue.removeAll();
212    }
213
214}