001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.IOException;
020import java.io.Serializable;
021import java.io.StringReader;
022import java.io.StringWriter;
023import java.util.HashMap;
024import java.util.Map;
025
026import javax.jms.JMSException;
027
028import org.apache.activemq.advisory.AdvisorySupport;
029import org.apache.activemq.broker.BrokerContext;
030import org.apache.activemq.broker.BrokerContextAware;
031import org.apache.activemq.command.ActiveMQMapMessage;
032import org.apache.activemq.command.ActiveMQMessage;
033import org.apache.activemq.command.ActiveMQObjectMessage;
034import org.apache.activemq.command.DataStructure;
035import org.apache.activemq.util.JettisonMappedXmlDriver;
036import org.codehaus.jettison.mapped.Configuration;
037import org.springframework.beans.BeansException;
038import org.springframework.context.ApplicationContext;
039
040import com.thoughtworks.xstream.XStream;
041import com.thoughtworks.xstream.io.HierarchicalStreamReader;
042import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
043import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
044import com.thoughtworks.xstream.io.xml.XppReader;
045
046/**
047 * Frame translator implementation that uses XStream to convert messages to and
048 * from XML and JSON
049 *
050 * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
051 */
052public class JmsFrameTranslator extends LegacyFrameTranslator implements
053                BrokerContextAware {
054
055        XStream xStream = null;
056        BrokerContext brokerContext;
057
058        public ActiveMQMessage convertFrame(ProtocolConverter converter,
059                        StompFrame command) throws JMSException, ProtocolException {
060                Map headers = command.getHeaders();
061                ActiveMQMessage msg;
062                String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
063                if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
064                        msg = super.convertFrame(converter, command);
065                } else {
066                        HierarchicalStreamReader in;
067
068                        try {
069                                String text = new String(command.getContent(), "UTF-8");
070                                switch (Stomp.Transformations.getValue(transformation)) {
071                                case JMS_OBJECT_XML:
072                                        in = new XppReader(new StringReader(text));
073                                        msg = createObjectMessage(in);
074                                        break;
075                                case JMS_OBJECT_JSON:
076                                        in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
077                                        msg = createObjectMessage(in);
078                                        break;
079                                case JMS_MAP_XML:
080                                        in = new XppReader(new StringReader(text));
081                                        msg = createMapMessage(in);
082                                        break;
083                                case JMS_MAP_JSON:
084                                        in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
085                                        msg = createMapMessage(in);
086                                        break;
087                                default:
088                                        throw new Exception("Unkown transformation: " + transformation);
089                                }
090                        } catch (Throwable e) {
091                                command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
092                                msg = super.convertFrame(converter, command);
093                        }
094                }
095                FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
096                return msg;
097        }
098
099        public StompFrame convertMessage(ProtocolConverter converter,
100                        ActiveMQMessage message) throws IOException, JMSException {
101                if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
102                        StompFrame command = new StompFrame();
103                        command.setAction(Stomp.Responses.MESSAGE);
104                        Map<String, String> headers = new HashMap<String, String>(25);
105                        command.setHeaders(headers);
106
107                        FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
108                                        converter, message, command, this);
109
110            if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
111                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
112            } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
113                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
114            }
115
116            ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
117                        command.setContent(marshall(msg.getObject(),
118                                        headers.get(Stomp.Headers.TRANSFORMATION))
119                                        .getBytes("UTF-8"));
120                        return command;
121
122                } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
123                        StompFrame command = new StompFrame();
124                        command.setAction(Stomp.Responses.MESSAGE);
125                        Map<String, String> headers = new HashMap<String, String>(25);
126                        command.setHeaders(headers);
127
128                        FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
129                                        converter, message, command, this);
130
131            if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
132                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
133            } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
134                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
135            }
136
137                        ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
138                        command.setContent(marshall((Serializable)msg.getContentMap(),
139                                        headers.get(Stomp.Headers.TRANSFORMATION))
140                                        .getBytes("UTF-8"));
141                        return command;
142        } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
143                AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
144
145                        StompFrame command = new StompFrame();
146                        command.setAction(Stomp.Responses.MESSAGE);
147                        Map<String, String> headers = new HashMap<String, String>(25);
148                        command.setHeaders(headers);
149
150            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
151                                        converter, message, command, this);
152
153            if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
154                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
155            } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
156                headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
157            }
158
159            String body = marshallAdvisory(message.getDataStructure(),
160                        headers.get(Stomp.Headers.TRANSFORMATION));
161            command.setContent(body.getBytes("UTF-8"));
162            return command;
163                } else {
164                        return super.convertMessage(converter, message);
165                }
166        }
167
168        /**
169         * Marshalls the Object to a string using XML or JSON encoding
170         */
171        protected String marshall(Serializable object, String transformation)
172                        throws JMSException {
173                StringWriter buffer = new StringWriter();
174                HierarchicalStreamWriter out;
175                if (transformation.toLowerCase().endsWith("json")) {
176                        out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
177                } else {
178                        out = new PrettyPrintWriter(buffer);
179                }
180                getXStream().marshal(object, out);
181                return buffer.toString();
182        }
183
184        protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
185                ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
186                Object obj = getXStream().unmarshal(in);
187                objMsg.setObject((Serializable) obj);
188                return objMsg;
189        }
190
191        protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
192                ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
193                Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
194                for (String key : map.keySet()) {
195                        mapMsg.setObject(key, map.get(key));
196                }
197                return mapMsg;
198        }
199
200    protected String marshallAdvisory(final DataStructure ds, String transformation) {
201
202                StringWriter buffer = new StringWriter();
203                HierarchicalStreamWriter out;
204                if (transformation.toLowerCase().endsWith("json")) {
205                        out = new JettisonMappedXmlDriver().createWriter(buffer);
206                } else {
207                        out = new PrettyPrintWriter(buffer);
208                }
209
210                XStream xstream = getXStream();
211        xstream.setMode(XStream.NO_REFERENCES);
212        xstream.aliasPackage("", "org.apache.activemq.command");
213                xstream.marshal(ds, out);
214                return buffer.toString();
215    }
216
217        // Properties
218        // -------------------------------------------------------------------------
219        public XStream getXStream() {
220                if (xStream == null) {
221                        xStream = createXStream();
222                }
223                return xStream;
224        }
225
226        public void setXStream(XStream xStream) {
227                this.xStream = xStream;
228        }
229
230        // Implementation methods
231        // -------------------------------------------------------------------------
232        protected XStream createXStream() {
233                XStream xstream = null;
234                if (brokerContext != null) {
235                        Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class);
236                        for (XStream bean : beans.values()) {
237                            if (bean != null) {
238                                xstream = bean;
239                                break;
240                            }
241                        }
242                }
243
244                if (xstream == null) {
245                        xstream = new XStream();
246                }
247                return xstream;
248
249        }
250
251        public void setBrokerContext(BrokerContext brokerContext) {
252                this.brokerContext = brokerContext;
253        }
254
255}