001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.IOException;
020import java.io.OutputStreamWriter;
021import java.io.PrintWriter;
022import java.util.HashMap;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.atomic.AtomicBoolean;
027
028import javax.jms.JMSException;
029
030import org.apache.activemq.broker.BrokerContext;
031import org.apache.activemq.broker.BrokerContextAware;
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.ActiveMQMessage;
034import org.apache.activemq.command.ActiveMQTempQueue;
035import org.apache.activemq.command.ActiveMQTempTopic;
036import org.apache.activemq.command.Command;
037import org.apache.activemq.command.ConnectionError;
038import org.apache.activemq.command.ConnectionId;
039import org.apache.activemq.command.ConnectionInfo;
040import org.apache.activemq.command.ConsumerId;
041import org.apache.activemq.command.ConsumerInfo;
042import org.apache.activemq.command.DestinationInfo;
043import org.apache.activemq.command.ExceptionResponse;
044import org.apache.activemq.command.LocalTransactionId;
045import org.apache.activemq.command.MessageAck;
046import org.apache.activemq.command.MessageDispatch;
047import org.apache.activemq.command.MessageId;
048import org.apache.activemq.command.ProducerId;
049import org.apache.activemq.command.ProducerInfo;
050import org.apache.activemq.command.RemoveSubscriptionInfo;
051import org.apache.activemq.command.Response;
052import org.apache.activemq.command.SessionId;
053import org.apache.activemq.command.SessionInfo;
054import org.apache.activemq.command.ShutdownInfo;
055import org.apache.activemq.command.TransactionId;
056import org.apache.activemq.command.TransactionInfo;
057import org.apache.activemq.util.ByteArrayOutputStream;
058import org.apache.activemq.util.FactoryFinder;
059import org.apache.activemq.util.IOExceptionSupport;
060import org.apache.activemq.util.IdGenerator;
061import org.apache.activemq.util.IntrospectionSupport;
062import org.apache.activemq.util.LongSequenceGenerator;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065import org.springframework.context.ApplicationContextAware;
066
067/**
068 * @author <a href="http://hiramchirino.com">chirino</a>
069 */
070public class ProtocolConverter {
071
072    private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
073    
074    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
075
076    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
077    private final SessionId sessionId = new SessionId(connectionId, -1);
078    private final ProducerId producerId = new ProducerId(sessionId, 1);
079
080    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
081    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
082    private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
083    private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
084
085    private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
086    private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
087    private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
088    private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
089    private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
090    private final StompTransport stompTransport;
091
092    private final Object commnadIdMutex = new Object();
093    private int lastCommandId;
094    private final AtomicBoolean connected = new AtomicBoolean(false);
095    private final FrameTranslator frameTranslator;
096    private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
097    private final BrokerContext brokerContext;
098
099    public ProtocolConverter(StompTransport stompTransport, FrameTranslator translator, BrokerContext brokerContext) {
100        this.stompTransport = stompTransport;
101        this.frameTranslator = translator;
102        this.brokerContext = brokerContext;
103    }
104
105    protected int generateCommandId() {
106        synchronized (commnadIdMutex) {
107            return lastCommandId++;
108        }
109    }
110
111    protected ResponseHandler createResponseHandler(final StompFrame command) {
112        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
113        if (receiptId != null) {
114            return new ResponseHandler() {
115                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
116                    if (response.isException()) {
117                        // Generally a command can fail.. but that does not invalidate the connection.
118                        // We report back the failure but we don't close the connection.
119                        Throwable exception = ((ExceptionResponse)response).getException();
120                        handleException(exception, command);
121                    } else {
122                        StompFrame sc = new StompFrame();
123                        sc.setAction(Stomp.Responses.RECEIPT);
124                        sc.setHeaders(new HashMap<String, String>(1));
125                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
126                        stompTransport.sendToStomp(sc);
127                    }
128                }
129            };
130        }
131        return null;
132    }
133
134    protected void sendToActiveMQ(Command command, ResponseHandler handler) {
135        command.setCommandId(generateCommandId());
136        if (handler != null) {
137            command.setResponseRequired(true);
138            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
139        }
140        stompTransport.sendToActiveMQ(command);
141    }
142
143    protected void sendToStomp(StompFrame command) throws IOException {
144        stompTransport.sendToStomp(command);
145    }
146
147    protected FrameTranslator findTranslator(String header) {
148        FrameTranslator translator = frameTranslator;
149        try {
150            if (header != null) {
151                translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
152                        .newInstance(header);
153                if (translator instanceof BrokerContextAware) {
154                    ((BrokerContextAware)translator).setBrokerContext(brokerContext);
155                }
156            }
157        } catch (Exception ignore) {
158            // if anything goes wrong use the default translator
159        }
160
161        return translator;
162    }
163
164    /**
165     * Convert a stomp command
166     *
167     * @param command
168     */
169    public void onStompCommand(StompFrame command) throws IOException, JMSException {
170        try {
171
172            if (command.getClass() == StompFrameError.class) {
173                throw ((StompFrameError)command).getException();
174            }
175
176            String action = command.getAction();
177            if (action.startsWith(Stomp.Commands.SEND)) {
178                onStompSend(command);
179            } else if (action.startsWith(Stomp.Commands.ACK)) {
180                onStompAck(command);
181            } else if (action.startsWith(Stomp.Commands.BEGIN)) {
182                onStompBegin(command);
183            } else if (action.startsWith(Stomp.Commands.COMMIT)) {
184                onStompCommit(command);
185            } else if (action.startsWith(Stomp.Commands.ABORT)) {
186                onStompAbort(command);
187            } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
188                onStompSubscribe(command);
189            } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
190                onStompUnsubscribe(command);
191            } else if (action.startsWith(Stomp.Commands.CONNECT)) {
192                onStompConnect(command);
193            } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
194                onStompDisconnect(command);
195            } else {
196                throw new ProtocolException("Unknown STOMP action: " + action);
197            }
198
199        } catch (ProtocolException e) {
200            handleException(e, command);
201            // Some protocol errors can cause the connection to get closed.
202            if( e.isFatal() ) {
203               getStompTransport().onException(e);
204            }
205        }
206    }
207
208    protected void handleException(Throwable exception, StompFrame command) throws IOException {
209        LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
210        if (LOG.isDebugEnabled()) {
211            LOG.debug("Exception detail", exception);
212        }
213
214        // Let the stomp client know about any protocol errors.
215        ByteArrayOutputStream baos = new ByteArrayOutputStream();
216        PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
217        exception.printStackTrace(stream);
218        stream.close();
219
220        HashMap<String, String> headers = new HashMap<String, String>();
221        headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
222
223        if (command != null) {
224            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
225            if (receiptId != null) {
226                headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
227            }
228        }
229
230        StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
231        sendToStomp(errorMessage);
232    }
233
234    protected void onStompSend(StompFrame command) throws IOException, JMSException {
235        checkConnected();
236
237        Map<String, String> headers = command.getHeaders();
238        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
239        headers.remove("transaction");
240
241        ActiveMQMessage message = convertMessage(command);
242
243        message.setProducerId(producerId);
244        MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
245        message.setMessageId(id);
246        message.setJMSTimestamp(System.currentTimeMillis());
247
248        if (stompTx != null) {
249            TransactionId activemqTx = transactions.get(stompTx);
250            if (activemqTx == null) {
251                throw new ProtocolException("Invalid transaction id: " + stompTx);
252            }
253            message.setTransactionId(activemqTx);
254        }
255
256        message.onSend();
257        sendToActiveMQ(message, createResponseHandler(command));
258
259    }
260
261    protected void onStompAck(StompFrame command) throws ProtocolException {
262        checkConnected();
263
264        // TODO: acking with just a message id is very bogus
265        // since the same message id could have been sent to 2 different
266        // subscriptions
267        // on the same stomp connection. For example, when 2 subs are created on
268        // the same topic.
269
270        Map<String, String> headers = command.getHeaders();
271        String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
272        if (messageId == null) {
273            throw new ProtocolException("ACK received without a message-id to acknowledge!");
274        }
275
276        TransactionId activemqTx = null;
277        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
278        if (stompTx != null) {
279            activemqTx = transactions.get(stompTx);
280            if (activemqTx == null) {
281                throw new ProtocolException("Invalid transaction id: " + stompTx);
282            }
283        }
284
285        boolean acked = false;
286        for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
287            StompSubscription sub = iter.next();
288            MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
289            if (ack != null) {
290                ack.setTransactionId(activemqTx);
291                sendToActiveMQ(ack, createResponseHandler(command));
292                acked = true;
293                break;
294            }
295        }
296
297        if (!acked) {
298            throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
299        }
300
301    }
302
303    protected void onStompBegin(StompFrame command) throws ProtocolException {
304        checkConnected();
305
306        Map<String, String> headers = command.getHeaders();
307
308        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
309
310        if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
311            throw new ProtocolException("Must specify the transaction you are beginning");
312        }
313
314        if (transactions.get(stompTx) != null) {
315            throw new ProtocolException("The transaction was allready started: " + stompTx);
316        }
317
318        LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
319        transactions.put(stompTx, activemqTx);
320
321        TransactionInfo tx = new TransactionInfo();
322        tx.setConnectionId(connectionId);
323        tx.setTransactionId(activemqTx);
324        tx.setType(TransactionInfo.BEGIN);
325
326        sendToActiveMQ(tx, createResponseHandler(command));
327
328    }
329
330    protected void onStompCommit(StompFrame command) throws ProtocolException {
331        checkConnected();
332
333        Map<String, String> headers = command.getHeaders();
334
335        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
336        if (stompTx == null) {
337            throw new ProtocolException("Must specify the transaction you are committing");
338        }
339
340        TransactionId activemqTx = transactions.remove(stompTx);
341        if (activemqTx == null) {
342            throw new ProtocolException("Invalid transaction id: " + stompTx);
343        }
344
345        for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
346            StompSubscription sub = iter.next();
347            sub.onStompCommit(activemqTx);
348        }
349
350        TransactionInfo tx = new TransactionInfo();
351        tx.setConnectionId(connectionId);
352        tx.setTransactionId(activemqTx);
353        tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
354
355        sendToActiveMQ(tx, createResponseHandler(command));
356
357    }
358
359    protected void onStompAbort(StompFrame command) throws ProtocolException {
360        checkConnected();
361        Map<String, String> headers = command.getHeaders();
362
363        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
364        if (stompTx == null) {
365            throw new ProtocolException("Must specify the transaction you are committing");
366        }
367
368        TransactionId activemqTx = transactions.remove(stompTx);
369        if (activemqTx == null) {
370            throw new ProtocolException("Invalid transaction id: " + stompTx);
371        }
372        for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
373            StompSubscription sub = iter.next();
374            try {
375                sub.onStompAbort(activemqTx);
376            } catch (Exception e) {
377                throw new ProtocolException("Transaction abort failed", false, e);
378            }
379        }
380
381        TransactionInfo tx = new TransactionInfo();
382        tx.setConnectionId(connectionId);
383        tx.setTransactionId(activemqTx);
384        tx.setType(TransactionInfo.ROLLBACK);
385
386        sendToActiveMQ(tx, createResponseHandler(command));
387
388    }
389
390    protected void onStompSubscribe(StompFrame command) throws ProtocolException {
391        checkConnected();
392        FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
393        Map<String, String> headers = command.getHeaders();
394
395        String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
396        String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
397
398        ActiveMQDestination actualDest = translator.convertDestination(this, destination);
399
400        if (actualDest == null) {
401            throw new ProtocolException("Invalid Destination.");
402        }
403
404        ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
405        ConsumerInfo consumerInfo = new ConsumerInfo(id);
406        consumerInfo.setPrefetchSize(1000);
407        consumerInfo.setDispatchAsync(true);
408
409        String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
410        consumerInfo.setSelector(selector);
411
412        IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
413
414        consumerInfo.setDestination(translator.convertDestination(this, destination));
415
416        StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
417        stompSubscription.setDestination(actualDest);
418
419        String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
420        if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
421            stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
422        } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
423            stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
424        } else {
425            stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
426        }
427
428        subscriptionsByConsumerId.put(id, stompSubscription);
429        sendToActiveMQ(consumerInfo, createResponseHandler(command));
430
431    }
432
433    protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
434        checkConnected();
435        Map<String, String> headers = command.getHeaders();
436
437        ActiveMQDestination destination = null;
438        Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
439        if (o != null) {
440            destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o);
441        }
442
443        String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
444
445        if (subscriptionId == null && destination == null) {
446            throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
447        }
448
449        // check if it is a durable subscription
450        String durable = command.getHeaders().get("activemq.subscriptionName");
451        if (durable != null) {
452            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
453            info.setClientId(durable);
454            info.setSubscriptionName(durable);
455            info.setConnectionId(connectionId);
456            sendToActiveMQ(info, createResponseHandler(command));
457            return;
458        }
459
460        // TODO: Unsubscribing using a destination is a bit wierd if multiple
461        // subscriptions
462        // are created with the same destination. Perhaps this should be
463        // removed.
464        //
465        for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
466            StompSubscription sub = iter.next();
467            if ((subscriptionId != null && subscriptionId.equals(sub.getSubscriptionId())) || (destination != null && destination.equals(sub.getDestination()))) {
468                sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
469                iter.remove();
470                return;
471            }
472        }
473
474        throw new ProtocolException("No subscription matched.");
475    }
476
477    ConnectionInfo connectionInfo = new ConnectionInfo();
478
479    protected void onStompConnect(final StompFrame command) throws ProtocolException {
480
481        if (connected.get()) {
482            throw new ProtocolException("Allready connected.");
483        }
484
485        final Map<String, String> headers = command.getHeaders();
486
487        // allow anyone to login for now
488        String login = headers.get(Stomp.Headers.Connect.LOGIN);
489        String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
490        String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
491
492
493        IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
494
495        connectionInfo.setConnectionId(connectionId);
496        if (clientId != null) {
497            connectionInfo.setClientId(clientId);
498        } else {
499            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
500        }
501
502        connectionInfo.setResponseRequired(true);
503        connectionInfo.setUserName(login);
504        connectionInfo.setPassword(passcode);
505        connectionInfo.setTransportContext(stompTransport.getPeerCertificates());
506
507        sendToActiveMQ(connectionInfo, new ResponseHandler() {
508            public void onResponse(ProtocolConverter converter, Response response) throws IOException {
509
510                if (response.isException()) {
511                    // If the connection attempt fails we close the socket.
512                    Throwable exception = ((ExceptionResponse)response).getException();
513                    handleException(exception, command);
514                    getStompTransport().onException(IOExceptionSupport.create(exception));
515                    return;
516                }
517
518                final SessionInfo sessionInfo = new SessionInfo(sessionId);
519                sendToActiveMQ(sessionInfo, null);
520
521                final ProducerInfo producerInfo = new ProducerInfo(producerId);
522                sendToActiveMQ(producerInfo, new ResponseHandler() {
523                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
524
525                        if (response.isException()) {
526                            // If the connection attempt fails we close the socket.
527                            Throwable exception = ((ExceptionResponse)response).getException();
528                            handleException(exception, command);
529                            getStompTransport().onException(IOExceptionSupport.create(exception));
530                        }
531
532                        connected.set(true);
533                        HashMap<String, String> responseHeaders = new HashMap<String, String>();
534
535                        responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
536                        String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
537                        if (requestId == null) {
538                            // TODO legacy
539                            requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
540                        }
541                        if (requestId != null) {
542                            // TODO legacy
543                            responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
544                            responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
545                        }
546
547                        StompFrame sc = new StompFrame();
548                        sc.setAction(Stomp.Responses.CONNECTED);
549                        sc.setHeaders(responseHeaders);
550                        sendToStomp(sc);
551                    }
552                });
553
554            }
555        });
556    }
557
558    protected void onStompDisconnect(StompFrame command) throws ProtocolException {
559        checkConnected();
560        sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
561        sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
562        connected.set(false);
563    }
564
565    protected void checkConnected() throws ProtocolException {
566        if (!connected.get()) {
567            throw new ProtocolException("Not connected.");
568        }
569    }
570
571    /**
572     * Dispatch a ActiveMQ command
573     *
574     * @param command
575     * @throws IOException
576     */
577    public void onActiveMQCommand(Command command) throws IOException, JMSException {
578        if (command.isResponse()) {
579
580            Response response = (Response)command;
581            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
582            if (rh != null) {
583                rh.onResponse(this, response);
584            } else {
585                // Pass down any unexpected errors. Should this close the connection?
586                if (response.isException()) {
587                    Throwable exception = ((ExceptionResponse)response).getException();
588                    handleException(exception, null);
589                }
590            }
591        } else if (command.isMessageDispatch()) {
592
593            MessageDispatch md = (MessageDispatch)command;
594            StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
595            if (sub != null) {
596                sub.onMessageDispatch(md);
597            }
598        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
599            // Pass down any unexpected async errors. Should this close the connection?
600            Throwable exception = ((ConnectionError)command).getException();
601            handleException(exception, null);
602        }
603    }
604
605    public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
606        ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
607        return msg;
608    }
609
610    public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
611        if (ignoreTransformation == true) {
612            return frameTranslator.convertMessage(this, message);
613        } else {
614            return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
615        }
616    }
617
618    public StompTransport getStompTransport() {
619        return stompTransport;
620    }
621
622    public ActiveMQDestination createTempQueue(String name) {
623        ActiveMQDestination rc = tempDestinations.get(name);
624        if( rc == null ) {
625            rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
626            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
627            tempDestinations.put(name, rc);
628        }
629        return rc;
630    }
631
632    public ActiveMQDestination createTempTopic(String name) {
633        ActiveMQDestination rc = tempDestinations.get(name);
634        if( rc == null ) {
635            rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
636            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
637            tempDestinations.put(name, rc);
638            tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
639        }
640        return rc;
641    }
642
643    public String getCreatedTempDestinationName(ActiveMQDestination destination) {
644        return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
645    }
646}