001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.openwire;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.lang.reflect.Method;
023import java.util.HashMap;
024import java.util.Map;
025
026import org.apache.activemq.command.CommandTypes;
027import org.apache.activemq.command.DataStructure;
028import org.apache.activemq.command.WireFormatInfo;
029import org.apache.activemq.util.ByteSequence;
030import org.apache.activemq.util.ByteSequenceData;
031import org.apache.activemq.util.DataByteArrayInputStream;
032import org.apache.activemq.util.DataByteArrayOutputStream;
033import org.apache.activemq.wireformat.WireFormat;
034
035/**
036 * 
037 * 
038 */
039public final class OpenWireFormat implements WireFormat {
040
041    public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
042
043    static final byte NULL_TYPE = CommandTypes.NULL;
044    private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
045    private static final int MARSHAL_CACHE_FREE_SPACE = 100;
046
047    private DataStreamMarshaller dataMarshallers[];
048    private int version;
049    private boolean stackTraceEnabled;
050    private boolean tcpNoDelayEnabled;
051    private boolean cacheEnabled;
052    private boolean tightEncodingEnabled;
053    private boolean sizePrefixDisabled;
054
055    // The following fields are used for value caching
056    private short nextMarshallCacheIndex;
057    private short nextMarshallCacheEvictionIndex;
058    private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>();
059    private DataStructure marshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
060    private DataStructure unmarshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
061    private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
062    private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
063    private WireFormatInfo preferedWireFormatInfo;
064    
065    public OpenWireFormat() {
066        this(DEFAULT_VERSION);
067    }
068
069    public OpenWireFormat(int i) {
070        setVersion(i);
071    }
072
073    public int hashCode() {
074        return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
075               ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
076               ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
077               ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
078    }
079
080    public OpenWireFormat copy() {
081        OpenWireFormat answer = new OpenWireFormat();
082        answer.version = version;
083        answer.stackTraceEnabled = stackTraceEnabled;
084        answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
085        answer.cacheEnabled = cacheEnabled;
086        answer.tightEncodingEnabled = tightEncodingEnabled;
087        answer.sizePrefixDisabled = sizePrefixDisabled;
088        answer.preferedWireFormatInfo = preferedWireFormatInfo;
089        return answer;
090    }
091
092    public boolean equals(Object object) {
093        if (object == null) {
094            return false;
095        }
096        OpenWireFormat o = (OpenWireFormat)object;
097        return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
098               && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
099               && o.sizePrefixDisabled == sizePrefixDisabled;
100    }
101
102
103    public String toString() {
104        return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
105               + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + "}";
106        // return "OpenWireFormat{id="+id+",
107        // tightEncodingEnabled="+tightEncodingEnabled+"}";
108    }
109
110    public int getVersion() {
111        return version;
112    }
113
114    public synchronized ByteSequence marshal(Object command) throws IOException {
115
116        if (cacheEnabled) {
117            runMarshallCacheEvictionSweep();
118        }
119
120//        MarshallAware ma = null;
121//        // If not using value caching, then the marshaled form is always the
122//        // same
123//        if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) {
124//            ma = (MarshallAware)command;
125//        }
126
127        ByteSequence sequence = null;
128        // if( ma!=null ) {
129        // sequence = ma.getCachedMarshalledForm(this);
130        // }
131
132        if (sequence == null) {
133
134            int size = 1;
135            if (command != null) {
136
137                DataStructure c = (DataStructure)command;
138                byte type = c.getDataStructureType();
139                DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
140                if (dsm == null) {
141                    throw new IOException("Unknown data type: " + type);
142                }
143                if (tightEncodingEnabled) {
144
145                    BooleanStream bs = new BooleanStream();
146                    size += dsm.tightMarshal1(this, c, bs);
147                    size += bs.marshalledSize();
148
149                    bytesOut.restart(size);
150                    if (!sizePrefixDisabled) {
151                        bytesOut.writeInt(size);
152                    }
153                    bytesOut.writeByte(type);
154                    bs.marshal(bytesOut);
155                    dsm.tightMarshal2(this, c, bytesOut, bs);
156                    sequence = bytesOut.toByteSequence();
157
158                } else {
159                    bytesOut.restart();
160                    if (!sizePrefixDisabled) {
161                        bytesOut.writeInt(0); // we don't know the final size
162                                                // yet but write this here for
163                                                // now.
164                    }
165                    bytesOut.writeByte(type);
166                    dsm.looseMarshal(this, c, bytesOut);
167                    sequence = bytesOut.toByteSequence();
168
169                    if (!sizePrefixDisabled) {
170                        size = sequence.getLength() - 4;
171                        int pos = sequence.offset;
172                        ByteSequenceData.writeIntBig(sequence, size);
173                        sequence.offset = pos;
174                    }
175                }
176
177            } else {
178                bytesOut.restart(5);
179                bytesOut.writeInt(size);
180                bytesOut.writeByte(NULL_TYPE);
181                sequence = bytesOut.toByteSequence();
182            }
183
184            // if( ma!=null ) {
185            // ma.setCachedMarshalledForm(this, sequence);
186            // }
187        }
188        return sequence;
189    }
190
191    public synchronized Object unmarshal(ByteSequence sequence) throws IOException {
192        bytesIn.restart(sequence);
193        // DataInputStream dis = new DataInputStream(new
194        // ByteArrayInputStream(sequence));
195
196        if (!sizePrefixDisabled) {
197            int size = bytesIn.readInt();
198            if (sequence.getLength() - 4 != size) {
199                // throw new IOException("Packet size does not match marshaled
200                // size");
201            }
202        }
203
204        Object command = doUnmarshal(bytesIn);
205        // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
206        // ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
207        // }
208        return command;
209    }
210
211    public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
212
213        if (cacheEnabled) {
214            runMarshallCacheEvictionSweep();
215        }
216
217        int size = 1;
218        if (o != null) {
219
220            DataStructure c = (DataStructure)o;
221            byte type = c.getDataStructureType();
222            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
223            if (dsm == null) {
224                throw new IOException("Unknown data type: " + type);
225            }
226            if (tightEncodingEnabled) {
227                BooleanStream bs = new BooleanStream();
228                size += dsm.tightMarshal1(this, c, bs);
229                size += bs.marshalledSize();
230
231                if (!sizePrefixDisabled) {
232                    dataOut.writeInt(size);
233                }
234
235                dataOut.writeByte(type);
236                bs.marshal(dataOut);
237                dsm.tightMarshal2(this, c, dataOut, bs);
238
239            } else {
240                DataOutput looseOut = dataOut;
241
242                if (!sizePrefixDisabled) {
243                    bytesOut.restart();
244                    looseOut = bytesOut;
245                }
246
247                looseOut.writeByte(type);
248                dsm.looseMarshal(this, c, looseOut);
249
250                if (!sizePrefixDisabled) {
251                    ByteSequence sequence = bytesOut.toByteSequence();
252                    dataOut.writeInt(sequence.getLength());
253                    dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
254                }
255
256            }
257
258        } else {
259            if (!sizePrefixDisabled) {
260                dataOut.writeInt(size);
261            }
262            dataOut.writeByte(NULL_TYPE);
263        }
264    }
265
266    public Object unmarshal(DataInput dis) throws IOException {
267        DataInput dataIn = dis;
268        if (!sizePrefixDisabled) {
269            dis.readInt();
270            // int size = dis.readInt();
271            // byte[] data = new byte[size];
272            // dis.readFully(data);
273            // bytesIn.restart(data);
274            // dataIn = bytesIn;
275        }
276        return doUnmarshal(dataIn);
277    }
278
279    /**
280     * Used by NIO or AIO transports
281     */
282    public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
283        int size = 1;
284        if (o != null) {
285            DataStructure c = (DataStructure)o;
286            byte type = c.getDataStructureType();
287            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
288            if (dsm == null) {
289                throw new IOException("Unknown data type: " + type);
290            }
291
292            size += dsm.tightMarshal1(this, c, bs);
293            size += bs.marshalledSize();
294        }
295        return size;
296    }
297
298    /**
299     * Used by NIO or AIO transports; note that the size is not written as part
300     * of this method.
301     */
302    public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException {
303        if (cacheEnabled) {
304            runMarshallCacheEvictionSweep();
305        }
306
307        if (o != null) {
308            DataStructure c = (DataStructure)o;
309            byte type = c.getDataStructureType();
310            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
311            if (dsm == null) {
312                throw new IOException("Unknown data type: " + type);
313            }
314            ds.writeByte(type);
315            bs.marshal(ds);
316            dsm.tightMarshal2(this, c, ds, bs);
317        }
318    }
319
320    /**
321     * Allows you to dynamically switch the version of the openwire protocol
322     * being used.
323     * 
324     * @param version
325     */
326    public void setVersion(int version) {
327        String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory";
328        Class mfClass;
329        try {
330            mfClass = Class.forName(mfName, false, getClass().getClassLoader());
331        } catch (ClassNotFoundException e) {
332            throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version
333                                                                         + ", could not load " + mfName)
334                .initCause(e);
335        }
336        try {
337            Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class});
338            dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this});
339        } catch (Throwable e) {
340            throw (IllegalArgumentException)new IllegalArgumentException(
341                                                                         "Invalid version: "
342                                                                             + version
343                                                                             + ", "
344                                                                             + mfName
345                                                                             + " does not properly implement the createMarshallerMap method.")
346                .initCause(e);
347        }
348        this.version = version;
349    }
350
351    public Object doUnmarshal(DataInput dis) throws IOException {
352        byte dataType = dis.readByte();
353        if (dataType != NULL_TYPE) {
354            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
355            if (dsm == null) {
356                throw new IOException("Unknown data type: " + dataType);
357            }
358            Object data = dsm.createObject();
359            if (this.tightEncodingEnabled) {
360                BooleanStream bs = new BooleanStream();
361                bs.unmarshal(dis);
362                dsm.tightUnmarshal(this, data, dis, bs);
363            } else {
364                dsm.looseUnmarshal(this, data, dis);
365            }
366            return data;
367        } else {
368            return null;
369        }
370    }
371
372    // public void debug(String msg) {
373    // String t = (Thread.currentThread().getName()+" ").substring(0, 40);
374    // System.out.println(t+": "+msg);
375    // }
376    public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException {
377        bs.writeBoolean(o != null);
378        if (o == null) {
379            return 0;
380        }
381
382        if (o.isMarshallAware()) {
383            // MarshallAware ma = (MarshallAware)o;
384            ByteSequence sequence = null;
385            // sequence=ma.getCachedMarshalledForm(this);
386            bs.writeBoolean(sequence != null);
387            if (sequence != null) {
388                return 1 + sequence.getLength();
389            }
390        }
391
392        byte type = o.getDataStructureType();
393        DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
394        if (dsm == null) {
395            throw new IOException("Unknown data type: " + type);
396        }
397        return 1 + dsm.tightMarshal1(this, o, bs);
398    }
399
400    public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs)
401        throws IOException {
402        if (!bs.readBoolean()) {
403            return;
404        }
405
406        byte type = o.getDataStructureType();
407        ds.writeByte(type);
408
409        if (o.isMarshallAware() && bs.readBoolean()) {
410
411            // We should not be doing any caching
412            throw new IOException("Corrupted stream");
413            // MarshallAware ma = (MarshallAware) o;
414            // ByteSequence sequence=ma.getCachedMarshalledForm(this);
415            // ds.write(sequence.getData(), sequence.getOffset(),
416            // sequence.getLength());
417
418        } else {
419
420            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
421            if (dsm == null) {
422                throw new IOException("Unknown data type: " + type);
423            }
424            dsm.tightMarshal2(this, o, ds, bs);
425
426        }
427    }
428
429    public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException {
430        if (bs.readBoolean()) {
431
432            byte dataType = dis.readByte();
433            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
434            if (dsm == null) {
435                throw new IOException("Unknown data type: " + dataType);
436            }
437            DataStructure data = dsm.createObject();
438
439            if (data.isMarshallAware() && bs.readBoolean()) {
440
441                dis.readInt();
442                dis.readByte();
443
444                BooleanStream bs2 = new BooleanStream();
445                bs2.unmarshal(dis);
446                dsm.tightUnmarshal(this, data, dis, bs2);
447
448                // TODO: extract the sequence from the dis and associate it.
449                // MarshallAware ma = (MarshallAware)data
450                // ma.setCachedMarshalledForm(this, sequence);
451
452            } else {
453                dsm.tightUnmarshal(this, data, dis, bs);
454            }
455
456            return data;
457        } else {
458            return null;
459        }
460    }
461
462    public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException {
463        if (dis.readBoolean()) {
464
465            byte dataType = dis.readByte();
466            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
467            if (dsm == null) {
468                throw new IOException("Unknown data type: " + dataType);
469            }
470            DataStructure data = dsm.createObject();
471            dsm.looseUnmarshal(this, data, dis);
472            return data;
473
474        } else {
475            return null;
476        }
477    }
478
479    public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException {
480        dataOut.writeBoolean(o != null);
481        if (o != null) {
482            byte type = o.getDataStructureType();
483            dataOut.writeByte(type);
484            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
485            if (dsm == null) {
486                throw new IOException("Unknown data type: " + type);
487            }
488            dsm.looseMarshal(this, o, dataOut);
489        }
490    }
491
492    public void runMarshallCacheEvictionSweep() {
493        // Do we need to start evicting??
494        while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) {
495
496            marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]);
497            marshallCache[nextMarshallCacheEvictionIndex] = null;
498
499            nextMarshallCacheEvictionIndex++;
500            if (nextMarshallCacheEvictionIndex >= marshallCache.length) {
501                nextMarshallCacheEvictionIndex = 0;
502            }
503
504        }
505    }
506
507    public Short getMarshallCacheIndex(DataStructure o) {
508        return marshallCacheMap.get(o);
509    }
510
511    public Short addToMarshallCache(DataStructure o) {
512        short i = nextMarshallCacheIndex++;
513        if (nextMarshallCacheIndex >= marshallCache.length) {
514            nextMarshallCacheIndex = 0;
515        }
516
517        // We can only cache that item if there is space left.
518        if (marshallCacheMap.size() < marshallCache.length) {
519            marshallCache[i] = o;
520            Short index = new Short(i);
521            marshallCacheMap.put(o, index);
522            return index;
523        } else {
524            // Use -1 to indicate that the value was not cached due to cache
525            // being full.
526            return new Short((short)-1);
527        }
528    }
529
530    public void setInUnmarshallCache(short index, DataStructure o) {
531
532        // There was no space left in the cache, so we can't
533        // put this in the cache.
534        if (index == -1) {
535            return;
536        }
537
538        unmarshallCache[index] = o;
539    }
540
541    public DataStructure getFromUnmarshallCache(short index) {
542        return unmarshallCache[index];
543    }
544
545    public void setStackTraceEnabled(boolean b) {
546        stackTraceEnabled = b;
547    }
548
549    public boolean isStackTraceEnabled() {
550        return stackTraceEnabled;
551    }
552
553    public boolean isTcpNoDelayEnabled() {
554        return tcpNoDelayEnabled;
555    }
556
557    public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
558        this.tcpNoDelayEnabled = tcpNoDelayEnabled;
559    }
560
561    public boolean isCacheEnabled() {
562        return cacheEnabled;
563    }
564
565    public void setCacheEnabled(boolean cacheEnabled) {
566        this.cacheEnabled = cacheEnabled;
567    }
568
569    public boolean isTightEncodingEnabled() {
570        return tightEncodingEnabled;
571    }
572
573    public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
574        this.tightEncodingEnabled = tightEncodingEnabled;
575    }
576
577    public boolean isSizePrefixDisabled() {
578        return sizePrefixDisabled;
579    }
580
581    public void setSizePrefixDisabled(boolean prefixPacketSize) {
582        this.sizePrefixDisabled = prefixPacketSize;
583    }
584
585    public void setPreferedWireFormatInfo(WireFormatInfo info) {
586        this.preferedWireFormatInfo = info;
587    }
588
589    public WireFormatInfo getPreferedWireFormatInfo() {
590        return preferedWireFormatInfo;
591    }
592
593    public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
594
595        if (preferedWireFormatInfo == null) {
596            throw new IllegalStateException("Wireformat cannot not be renegotiated.");
597        }
598
599        this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
600        info.setVersion(this.getVersion());
601
602        this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
603        info.setStackTraceEnabled(this.stackTraceEnabled);
604
605        this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
606        info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
607
608        this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
609        info.setCacheEnabled(this.cacheEnabled);
610
611        this.tightEncodingEnabled = info.isTightEncodingEnabled()
612                                    && preferedWireFormatInfo.isTightEncodingEnabled();
613        info.setTightEncodingEnabled(this.tightEncodingEnabled);
614
615        this.sizePrefixDisabled = info.isSizePrefixDisabled()
616                                  && preferedWireFormatInfo.isSizePrefixDisabled();
617        info.setSizePrefixDisabled(this.sizePrefixDisabled);
618
619        if (cacheEnabled) {
620
621            int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
622            info.setCacheSize(size);
623
624            if (size == 0) {
625                size = MARSHAL_CACHE_SIZE;
626            }
627
628            marshallCache = new DataStructure[size];
629            unmarshallCache = new DataStructure[size];
630            nextMarshallCacheIndex = 0;
631            nextMarshallCacheEvictionIndex = 0;
632            marshallCacheMap = new HashMap<DataStructure, Short>();
633        } else {
634            marshallCache = null;
635            unmarshallCache = null;
636            nextMarshallCacheIndex = 0;
637            nextMarshallCacheEvictionIndex = 0;
638            marshallCacheMap = null;
639        }
640
641    }
642
643    protected int min(int version1, int version2) {
644        if (version1 < version2 && version1 > 0 || version2 <= 0) {
645            return version1;
646        }
647        return version2;
648    }
649}