001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.state;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.LinkedHashMap;
022import java.util.Map;
023import java.util.Vector;
024import java.util.Map.Entry;
025import java.util.concurrent.ConcurrentHashMap;
026
027import javax.jms.TransactionRolledBackException;
028import javax.transaction.xa.XAResource;
029
030import org.apache.activemq.command.Command;
031import org.apache.activemq.command.ConnectionId;
032import org.apache.activemq.command.ConnectionInfo;
033import org.apache.activemq.command.ConsumerControl;
034import org.apache.activemq.command.ConsumerId;
035import org.apache.activemq.command.ConsumerInfo;
036import org.apache.activemq.command.DestinationInfo;
037import org.apache.activemq.command.ExceptionResponse;
038import org.apache.activemq.command.IntegerResponse;
039import org.apache.activemq.command.Message;
040import org.apache.activemq.command.MessageId;
041import org.apache.activemq.command.MessagePull;
042import org.apache.activemq.command.ProducerId;
043import org.apache.activemq.command.ProducerInfo;
044import org.apache.activemq.command.Response;
045import org.apache.activemq.command.SessionId;
046import org.apache.activemq.command.SessionInfo;
047import org.apache.activemq.command.TransactionInfo;
048import org.apache.activemq.transport.Transport;
049import org.apache.activemq.util.IOExceptionSupport;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Tracks the state of a connection so a newly established transport can be
055 * re-initialized to the state that was tracked.
056 * 
057 * 
058 */
059public class ConnectionStateTracker extends CommandVisitorAdapter {
060    private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
061
062    private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
063
064    protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); 
065
066    private boolean trackTransactions;
067    private boolean restoreSessions = true;
068    private boolean restoreConsumers = true;
069    private boolean restoreProducers = true;
070    private boolean restoreTransaction = true;
071    private boolean trackMessages = true;
072    private boolean trackTransactionProducers = true;
073    private int maxCacheSize = 128 * 1024;
074    private int currentCacheSize;
075    private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
076        protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
077            boolean result = currentCacheSize > maxCacheSize;
078            if (result) {
079                if (eldest.getValue() instanceof Message) {
080                    currentCacheSize -= ((Message)eldest.getValue()).getSize();
081                }
082            }
083            return result;
084        }
085    };
086    
087    private class RemoveTransactionAction implements ResponseHandler {
088        private final TransactionInfo info;
089
090        public RemoveTransactionAction(TransactionInfo info) {
091            this.info = info;
092        }
093
094        public void onResponse(Command response) {
095            ConnectionId connectionId = info.getConnectionId();
096            ConnectionState cs = connectionStates.get(connectionId);
097            cs.removeTransactionState(info.getTransactionId());
098        }
099    }
100    
101    private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
102
103        public PrepareReadonlyTransactionAction(TransactionInfo info) {
104            super(info);
105        }
106
107        public void onResponse(Command command) {
108            IntegerResponse response = (IntegerResponse) command;
109            if (XAResource.XA_RDONLY == response.getResult()) {
110                // all done, no commit or rollback from TM
111                super.onResponse(command);
112            }
113        }
114    }
115
116    /**
117     * 
118     * 
119     * @param command
120     * @return null if the command is not state tracked.
121     * @throws IOException
122     */
123    public Tracked track(Command command) throws IOException {
124        try {
125            return (Tracked)command.visit(this);
126        } catch (IOException e) {
127            throw e;
128        } catch (Throwable e) {
129            throw IOExceptionSupport.create(e);
130        }
131    }
132    
133    public void trackBack(Command command) {
134        if (command != null) {
135            if (trackMessages && command.isMessage()) {
136                Message message = (Message) command;
137                if (message.getTransactionId()==null) {
138                    currentCacheSize = currentCacheSize +  message.getSize();
139                }
140            } else if (command instanceof MessagePull) {
141                // just needs to be a rough estimate of size, ~4 identifiers
142                currentCacheSize += 400;
143            }
144        }
145    }
146
147    public void restore(Transport transport) throws IOException {
148        // Restore the connections.
149        for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
150            ConnectionState connectionState = iter.next();
151            connectionState.getInfo().setFailoverReconnect(true);
152            if (LOG.isDebugEnabled()) {
153                LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
154            }
155            transport.oneway(connectionState.getInfo());
156            restoreTempDestinations(transport, connectionState);
157
158            if (restoreSessions) {
159                restoreSessions(transport, connectionState);
160            }
161
162            if (restoreTransaction) {
163                restoreTransactions(transport, connectionState);
164            }
165        }
166        //now flush messages
167        for (Command msg:messageCache.values()) {
168            if (LOG.isDebugEnabled()) {
169                LOG.debug("command: " + msg.getCommandId());
170            }
171            transport.oneway(msg);
172        }
173    }
174
175    private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
176        Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
177        for (TransactionState transactionState : connectionState.getTransactionStates()) {
178            if (LOG.isDebugEnabled()) {
179                LOG.debug("tx: " + transactionState.getId());
180            }
181            
182            // rollback any completed transactions - no way to know if commit got there
183            // or if reply went missing
184            //
185            if (!transactionState.getCommands().isEmpty()) {
186                Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
187                if (lastCommand instanceof TransactionInfo) {
188                    TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
189                    if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
190                        if (LOG.isDebugEnabled()) {
191                            LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
192                        }
193                        toRollback.add(transactionInfo);
194                        continue;
195                    }
196                }
197            }
198            
199            // replay short lived producers that may have been involved in the transaction
200            for (ProducerState producerState : transactionState.getProducerStates().values()) {
201                if (LOG.isDebugEnabled()) {
202                    LOG.debug("tx replay producer :" + producerState.getInfo());
203                }
204                transport.oneway(producerState.getInfo());
205            }
206            
207            for (Command command : transactionState.getCommands()) {
208                if (LOG.isDebugEnabled()) {
209                    LOG.debug("tx replay: " + command);
210                }
211                transport.oneway(command);
212            }
213            
214            for (ProducerState producerState : transactionState.getProducerStates().values()) {
215                if (LOG.isDebugEnabled()) {
216                    LOG.debug("tx remove replayed producer :" + producerState.getInfo());
217                }
218                transport.oneway(producerState.getInfo().createRemoveCommand());
219            }
220        }
221        
222        for (TransactionInfo command: toRollback) {
223            // respond to the outstanding commit
224            ExceptionResponse response = new ExceptionResponse();
225            response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
226            response.setCorrelationId(command.getCommandId());
227            transport.getTransportListener().onCommand(response);
228        }
229    }
230
231    /**
232     * @param transport
233     * @param connectionState
234     * @throws IOException
235     */
236    protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
237        // Restore the connection's sessions
238        for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
239            SessionState sessionState = (SessionState)iter2.next();
240            if (LOG.isDebugEnabled()) {
241                LOG.debug("session: " + sessionState.getInfo().getSessionId());
242            }
243            transport.oneway(sessionState.getInfo());
244
245            if (restoreProducers) {
246                restoreProducers(transport, sessionState);
247            }
248
249            if (restoreConsumers) {
250                restoreConsumers(transport, sessionState);
251            }
252        }
253    }
254
255    /**
256     * @param transport
257     * @param sessionState
258     * @throws IOException
259     */
260    protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
261        // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
262        final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
263        final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
264        for (ConsumerState consumerState : sessionState.getConsumerStates()) {   
265            ConsumerInfo infoToSend = consumerState.getInfo();
266            if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
267                infoToSend = consumerState.getInfo().copy();
268                connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo());
269                infoToSend.setPrefetchSize(0);
270                if (LOG.isDebugEnabled()) {
271                    LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize());
272                }
273            }
274            if (LOG.isDebugEnabled()) {
275                LOG.debug("restore consumer: " + infoToSend.getConsumerId());
276            }
277            transport.oneway(infoToSend);
278        }
279    }
280
281    /**
282     * @param transport
283     * @param sessionState
284     * @throws IOException
285     */
286    protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
287        // Restore the session's producers
288        for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
289            ProducerState producerState = (ProducerState)iter3.next();
290            if (LOG.isDebugEnabled()) {
291                LOG.debug("producer: " + producerState.getInfo().getProducerId());
292            }
293            transport.oneway(producerState.getInfo());
294        }
295    }
296
297    /**
298     * @param transport
299     * @param connectionState
300     * @throws IOException
301     */
302    protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
303        throws IOException {
304        // Restore the connection's temp destinations.
305        for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) {
306            transport.oneway((DestinationInfo)iter2.next());
307        }
308    }
309
310    public Response processAddDestination(DestinationInfo info) {
311        if (info != null) {
312            ConnectionState cs = connectionStates.get(info.getConnectionId());
313            if (cs != null && info.getDestination().isTemporary()) {
314                cs.addTempDestination(info);
315            }
316        }
317        return TRACKED_RESPONSE_MARKER;
318    }
319
320    public Response processRemoveDestination(DestinationInfo info) {
321        if (info != null) {
322            ConnectionState cs = connectionStates.get(info.getConnectionId());
323            if (cs != null && info.getDestination().isTemporary()) {
324                cs.removeTempDestination(info.getDestination());
325            }
326        }
327        return TRACKED_RESPONSE_MARKER;
328    }
329
330    public Response processAddProducer(ProducerInfo info) {
331        if (info != null && info.getProducerId() != null) {
332            SessionId sessionId = info.getProducerId().getParentId();
333            if (sessionId != null) {
334                ConnectionId connectionId = sessionId.getParentId();
335                if (connectionId != null) {
336                    ConnectionState cs = connectionStates.get(connectionId);
337                    if (cs != null) {
338                        SessionState ss = cs.getSessionState(sessionId);
339                        if (ss != null) {
340                            ss.addProducer(info);
341                        }
342                    }
343                }
344            }
345        }
346        return TRACKED_RESPONSE_MARKER;
347    }
348
349    public Response processRemoveProducer(ProducerId id) {
350        if (id != null) {
351            SessionId sessionId = id.getParentId();
352            if (sessionId != null) {
353                ConnectionId connectionId = sessionId.getParentId();
354                if (connectionId != null) {
355                    ConnectionState cs = connectionStates.get(connectionId);
356                    if (cs != null) {
357                        SessionState ss = cs.getSessionState(sessionId);
358                        if (ss != null) {
359                            ss.removeProducer(id);
360                        }
361                    }
362                }
363            }
364        }
365        return TRACKED_RESPONSE_MARKER;
366    }
367
368    public Response processAddConsumer(ConsumerInfo info) {
369        if (info != null) {
370            SessionId sessionId = info.getConsumerId().getParentId();
371            if (sessionId != null) {
372                ConnectionId connectionId = sessionId.getParentId();
373                if (connectionId != null) {
374                    ConnectionState cs = connectionStates.get(connectionId);
375                    if (cs != null) {
376                        SessionState ss = cs.getSessionState(sessionId);
377                        if (ss != null) {
378                            ss.addConsumer(info);
379                        }
380                    }
381                }
382            }
383        }
384        return TRACKED_RESPONSE_MARKER;
385    }
386
387    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
388        if (id != null) {
389            SessionId sessionId = id.getParentId();
390            if (sessionId != null) {
391                ConnectionId connectionId = sessionId.getParentId();
392                if (connectionId != null) {
393                    ConnectionState cs = connectionStates.get(connectionId);
394                    if (cs != null) {
395                        SessionState ss = cs.getSessionState(sessionId);
396                        if (ss != null) {
397                            ss.removeConsumer(id);
398                        }
399                    }
400                }
401            }
402        }
403        return TRACKED_RESPONSE_MARKER;
404    }
405
406    public Response processAddSession(SessionInfo info) {
407        if (info != null) {
408            ConnectionId connectionId = info.getSessionId().getParentId();
409            if (connectionId != null) {
410                ConnectionState cs = connectionStates.get(connectionId);
411                if (cs != null) {
412                    cs.addSession(info);
413                }
414            }
415        }
416        return TRACKED_RESPONSE_MARKER;
417    }
418
419    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
420        if (id != null) {
421            ConnectionId connectionId = id.getParentId();
422            if (connectionId != null) {
423                ConnectionState cs = connectionStates.get(connectionId);
424                if (cs != null) {
425                    cs.removeSession(id);
426                }
427            }
428        }
429        return TRACKED_RESPONSE_MARKER;
430    }
431
432    public Response processAddConnection(ConnectionInfo info) {
433        if (info != null) {
434            connectionStates.put(info.getConnectionId(), new ConnectionState(info));
435        }
436        return TRACKED_RESPONSE_MARKER;
437    }
438
439    public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
440        if (id != null) {
441            connectionStates.remove(id);
442        }
443        return TRACKED_RESPONSE_MARKER;
444    }
445
446    public Response processMessage(Message send) throws Exception {
447        if (send != null) {
448            if (trackTransactions && send.getTransactionId() != null) {
449                ProducerId producerId = send.getProducerId();
450                ConnectionId connectionId = producerId.getParentId().getParentId();
451                if (connectionId != null) {
452                    ConnectionState cs = connectionStates.get(connectionId);
453                    if (cs != null) {
454                        TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
455                        if (transactionState != null) {
456                            transactionState.addCommand(send);
457                            
458                            if (trackTransactionProducers) {
459                                // for jmstemplate, track the producer in case it is closed before commit
460                                // and needs to be replayed
461                                SessionState ss = cs.getSessionState(producerId.getParentId());
462                                ProducerState producerState = ss.getProducerState(producerId);
463                                producerState.setTransactionState(transactionState);            
464                            }
465                        }
466                    }
467                }
468                return TRACKED_RESPONSE_MARKER;
469            }else if (trackMessages) {
470                messageCache.put(send.getMessageId(), send.copy());
471            }
472        }
473        return null;
474    }
475
476    public Response processBeginTransaction(TransactionInfo info) {
477        if (trackTransactions && info != null && info.getTransactionId() != null) {
478            ConnectionId connectionId = info.getConnectionId();
479            if (connectionId != null) {
480                ConnectionState cs = connectionStates.get(connectionId);
481                if (cs != null) {
482                    cs.addTransactionState(info.getTransactionId());
483                    TransactionState state = cs.getTransactionState(info.getTransactionId());
484                    state.addCommand(info);
485                }
486            }
487            return TRACKED_RESPONSE_MARKER;
488        }
489        return null;
490    }
491
492    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
493        if (trackTransactions && info != null) {
494            ConnectionId connectionId = info.getConnectionId();
495            if (connectionId != null) {
496                ConnectionState cs = connectionStates.get(connectionId);
497                if (cs != null) {
498                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
499                    if (transactionState != null) {
500                        transactionState.addCommand(info);
501                        return new Tracked(new PrepareReadonlyTransactionAction(info));
502                    }
503                }
504            }
505        }
506        return null;
507    }
508
509    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
510        if (trackTransactions && info != null) {
511            ConnectionId connectionId = info.getConnectionId();
512            if (connectionId != null) {
513                ConnectionState cs = connectionStates.get(connectionId);
514                if (cs != null) {
515                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
516                    if (transactionState != null) {
517                        transactionState.addCommand(info);
518                        return new Tracked(new RemoveTransactionAction(info));
519                    }
520                }
521            }
522        }
523        return null;
524    }
525
526    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
527        if (trackTransactions && info != null) {
528            ConnectionId connectionId = info.getConnectionId();
529            if (connectionId != null) {
530                ConnectionState cs = connectionStates.get(connectionId);
531                if (cs != null) {
532                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
533                    if (transactionState != null) {
534                        transactionState.addCommand(info);
535                        return new Tracked(new RemoveTransactionAction(info));
536                    }
537                }
538            }
539        }
540        return null;
541    }
542
543    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
544        if (trackTransactions && info != null) {
545            ConnectionId connectionId = info.getConnectionId();
546            if (connectionId != null) {
547                ConnectionState cs = connectionStates.get(connectionId);
548                if (cs != null) {
549                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
550                    if (transactionState != null) {
551                        transactionState.addCommand(info);
552                        return new Tracked(new RemoveTransactionAction(info));
553                    }
554                }
555            }
556        }
557        return null;
558    }
559
560    public Response processEndTransaction(TransactionInfo info) throws Exception {
561        if (trackTransactions && info != null) {
562            ConnectionId connectionId = info.getConnectionId();
563            if (connectionId != null) {
564                ConnectionState cs = connectionStates.get(connectionId);
565                if (cs != null) {
566                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
567                    if (transactionState != null) {
568                        transactionState.addCommand(info);
569                    }
570                }
571            }
572            return TRACKED_RESPONSE_MARKER;
573        }
574        return null;
575    }
576
577    @Override
578    public Response processMessagePull(MessagePull pull) throws Exception {
579        if (pull != null) {
580            // leave a single instance in the cache
581            final String id = pull.getDestination() + "::" + pull.getConsumerId();
582            messageCache.put(id.intern(), pull);
583        }
584        return null;
585    }
586
587    public boolean isRestoreConsumers() {
588        return restoreConsumers;
589    }
590
591    public void setRestoreConsumers(boolean restoreConsumers) {
592        this.restoreConsumers = restoreConsumers;
593    }
594
595    public boolean isRestoreProducers() {
596        return restoreProducers;
597    }
598
599    public void setRestoreProducers(boolean restoreProducers) {
600        this.restoreProducers = restoreProducers;
601    }
602
603    public boolean isRestoreSessions() {
604        return restoreSessions;
605    }
606
607    public void setRestoreSessions(boolean restoreSessions) {
608        this.restoreSessions = restoreSessions;
609    }
610
611    public boolean isTrackTransactions() {
612        return trackTransactions;
613    }
614
615    public void setTrackTransactions(boolean trackTransactions) {
616        this.trackTransactions = trackTransactions;
617    }
618    
619    public boolean isTrackTransactionProducers() {
620        return this.trackTransactionProducers;
621    }
622
623    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
624        this.trackTransactionProducers = trackTransactionProducers;
625    }
626    
627    public boolean isRestoreTransaction() {
628        return restoreTransaction;
629    }
630
631    public void setRestoreTransaction(boolean restoreTransaction) {
632        this.restoreTransaction = restoreTransaction;
633    }
634
635    public boolean isTrackMessages() {
636        return trackMessages;
637    }
638
639    public void setTrackMessages(boolean trackMessages) {
640        this.trackMessages = trackMessages;
641    }
642
643    public int getMaxCacheSize() {
644        return maxCacheSize;
645    }
646
647    public void setMaxCacheSize(int maxCacheSize) {
648        this.maxCacheSize = maxCacheSize;
649    }
650
651    public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
652        ConnectionState connectionState = connectionStates.get(connectionId);
653        if (connectionState != null) {
654            connectionState.setConnectionInterruptProcessingComplete(true);
655            Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers();
656            for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) {
657                ConsumerControl control = new ConsumerControl();
658                control.setConsumerId(entry.getKey());
659                control.setPrefetch(entry.getValue().getPrefetchSize());
660                control.setDestination(entry.getValue().getDestination());
661                try {
662                    if (LOG.isDebugEnabled()) {
663                        LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
664                    }
665                    transport.oneway(control);  
666                } catch (Exception ex) {
667                    if (LOG.isDebugEnabled()) {
668                        LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()
669                                + " with: " + control.getPrefetch(), ex);
670                    }
671                }
672            }
673            stalledConsumers.clear();
674        }
675    }
676
677    public void transportInterrupted(ConnectionId connectionId) {
678        ConnectionState connectionState = connectionStates.get(connectionId);
679        if (connectionState != null) {
680            connectionState.setConnectionInterruptProcessingComplete(false);
681        }
682    }
683}