001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.openwire; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.IOException; 022import java.lang.reflect.Method; 023import java.util.HashMap; 024import java.util.Map; 025 026import org.apache.activemq.command.CommandTypes; 027import org.apache.activemq.command.DataStructure; 028import org.apache.activemq.command.WireFormatInfo; 029import org.apache.activemq.util.ByteSequence; 030import org.apache.activemq.util.ByteSequenceData; 031import org.apache.activemq.util.DataByteArrayInputStream; 032import org.apache.activemq.util.DataByteArrayOutputStream; 033import org.apache.activemq.wireformat.WireFormat; 034 035/** 036 * 037 * 038 */ 039public final class OpenWireFormat implements WireFormat { 040 041 public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION; 042 043 static final byte NULL_TYPE = CommandTypes.NULL; 044 private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2; 045 private static final int MARSHAL_CACHE_FREE_SPACE = 100; 046 047 private DataStreamMarshaller dataMarshallers[]; 048 private int version; 049 private boolean stackTraceEnabled; 050 private boolean tcpNoDelayEnabled; 051 private boolean cacheEnabled; 052 private boolean tightEncodingEnabled; 053 private boolean sizePrefixDisabled; 054 055 // The following fields are used for value caching 056 private short nextMarshallCacheIndex; 057 private short nextMarshallCacheEvictionIndex; 058 private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>(); 059 private DataStructure marshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE]; 060 private DataStructure unmarshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE]; 061 private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream(); 062 private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream(); 063 private WireFormatInfo preferedWireFormatInfo; 064 065 public OpenWireFormat() { 066 this(DEFAULT_VERSION); 067 } 068 069 public OpenWireFormat(int i) { 070 setVersion(i); 071 } 072 073 public int hashCode() { 074 return version ^ (cacheEnabled ? 0x10000000 : 0x20000000) 075 ^ (stackTraceEnabled ? 0x01000000 : 0x02000000) 076 ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000) 077 ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000); 078 } 079 080 public OpenWireFormat copy() { 081 OpenWireFormat answer = new OpenWireFormat(); 082 answer.version = version; 083 answer.stackTraceEnabled = stackTraceEnabled; 084 answer.tcpNoDelayEnabled = tcpNoDelayEnabled; 085 answer.cacheEnabled = cacheEnabled; 086 answer.tightEncodingEnabled = tightEncodingEnabled; 087 answer.sizePrefixDisabled = sizePrefixDisabled; 088 answer.preferedWireFormatInfo = preferedWireFormatInfo; 089 return answer; 090 } 091 092 public boolean equals(Object object) { 093 if (object == null) { 094 return false; 095 } 096 OpenWireFormat o = (OpenWireFormat)object; 097 return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled 098 && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled 099 && o.sizePrefixDisabled == sizePrefixDisabled; 100 } 101 102 103 public String toString() { 104 return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled=" 105 + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + "}"; 106 // return "OpenWireFormat{id="+id+", 107 // tightEncodingEnabled="+tightEncodingEnabled+"}"; 108 } 109 110 public int getVersion() { 111 return version; 112 } 113 114 public synchronized ByteSequence marshal(Object command) throws IOException { 115 116 if (cacheEnabled) { 117 runMarshallCacheEvictionSweep(); 118 } 119 120// MarshallAware ma = null; 121// // If not using value caching, then the marshaled form is always the 122// // same 123// if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) { 124// ma = (MarshallAware)command; 125// } 126 127 ByteSequence sequence = null; 128 // if( ma!=null ) { 129 // sequence = ma.getCachedMarshalledForm(this); 130 // } 131 132 if (sequence == null) { 133 134 int size = 1; 135 if (command != null) { 136 137 DataStructure c = (DataStructure)command; 138 byte type = c.getDataStructureType(); 139 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 140 if (dsm == null) { 141 throw new IOException("Unknown data type: " + type); 142 } 143 if (tightEncodingEnabled) { 144 145 BooleanStream bs = new BooleanStream(); 146 size += dsm.tightMarshal1(this, c, bs); 147 size += bs.marshalledSize(); 148 149 bytesOut.restart(size); 150 if (!sizePrefixDisabled) { 151 bytesOut.writeInt(size); 152 } 153 bytesOut.writeByte(type); 154 bs.marshal(bytesOut); 155 dsm.tightMarshal2(this, c, bytesOut, bs); 156 sequence = bytesOut.toByteSequence(); 157 158 } else { 159 bytesOut.restart(); 160 if (!sizePrefixDisabled) { 161 bytesOut.writeInt(0); // we don't know the final size 162 // yet but write this here for 163 // now. 164 } 165 bytesOut.writeByte(type); 166 dsm.looseMarshal(this, c, bytesOut); 167 sequence = bytesOut.toByteSequence(); 168 169 if (!sizePrefixDisabled) { 170 size = sequence.getLength() - 4; 171 int pos = sequence.offset; 172 ByteSequenceData.writeIntBig(sequence, size); 173 sequence.offset = pos; 174 } 175 } 176 177 } else { 178 bytesOut.restart(5); 179 bytesOut.writeInt(size); 180 bytesOut.writeByte(NULL_TYPE); 181 sequence = bytesOut.toByteSequence(); 182 } 183 184 // if( ma!=null ) { 185 // ma.setCachedMarshalledForm(this, sequence); 186 // } 187 } 188 return sequence; 189 } 190 191 public synchronized Object unmarshal(ByteSequence sequence) throws IOException { 192 bytesIn.restart(sequence); 193 // DataInputStream dis = new DataInputStream(new 194 // ByteArrayInputStream(sequence)); 195 196 if (!sizePrefixDisabled) { 197 int size = bytesIn.readInt(); 198 if (sequence.getLength() - 4 != size) { 199 // throw new IOException("Packet size does not match marshaled 200 // size"); 201 } 202 } 203 204 Object command = doUnmarshal(bytesIn); 205 // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) { 206 // ((MarshallAware) command).setCachedMarshalledForm(this, sequence); 207 // } 208 return command; 209 } 210 211 public synchronized void marshal(Object o, DataOutput dataOut) throws IOException { 212 213 if (cacheEnabled) { 214 runMarshallCacheEvictionSweep(); 215 } 216 217 int size = 1; 218 if (o != null) { 219 220 DataStructure c = (DataStructure)o; 221 byte type = c.getDataStructureType(); 222 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 223 if (dsm == null) { 224 throw new IOException("Unknown data type: " + type); 225 } 226 if (tightEncodingEnabled) { 227 BooleanStream bs = new BooleanStream(); 228 size += dsm.tightMarshal1(this, c, bs); 229 size += bs.marshalledSize(); 230 231 if (!sizePrefixDisabled) { 232 dataOut.writeInt(size); 233 } 234 235 dataOut.writeByte(type); 236 bs.marshal(dataOut); 237 dsm.tightMarshal2(this, c, dataOut, bs); 238 239 } else { 240 DataOutput looseOut = dataOut; 241 242 if (!sizePrefixDisabled) { 243 bytesOut.restart(); 244 looseOut = bytesOut; 245 } 246 247 looseOut.writeByte(type); 248 dsm.looseMarshal(this, c, looseOut); 249 250 if (!sizePrefixDisabled) { 251 ByteSequence sequence = bytesOut.toByteSequence(); 252 dataOut.writeInt(sequence.getLength()); 253 dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength()); 254 } 255 256 } 257 258 } else { 259 if (!sizePrefixDisabled) { 260 dataOut.writeInt(size); 261 } 262 dataOut.writeByte(NULL_TYPE); 263 } 264 } 265 266 public Object unmarshal(DataInput dis) throws IOException { 267 DataInput dataIn = dis; 268 if (!sizePrefixDisabled) { 269 dis.readInt(); 270 // int size = dis.readInt(); 271 // byte[] data = new byte[size]; 272 // dis.readFully(data); 273 // bytesIn.restart(data); 274 // dataIn = bytesIn; 275 } 276 return doUnmarshal(dataIn); 277 } 278 279 /** 280 * Used by NIO or AIO transports 281 */ 282 public int tightMarshal1(Object o, BooleanStream bs) throws IOException { 283 int size = 1; 284 if (o != null) { 285 DataStructure c = (DataStructure)o; 286 byte type = c.getDataStructureType(); 287 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 288 if (dsm == null) { 289 throw new IOException("Unknown data type: " + type); 290 } 291 292 size += dsm.tightMarshal1(this, c, bs); 293 size += bs.marshalledSize(); 294 } 295 return size; 296 } 297 298 /** 299 * Used by NIO or AIO transports; note that the size is not written as part 300 * of this method. 301 */ 302 public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException { 303 if (cacheEnabled) { 304 runMarshallCacheEvictionSweep(); 305 } 306 307 if (o != null) { 308 DataStructure c = (DataStructure)o; 309 byte type = c.getDataStructureType(); 310 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 311 if (dsm == null) { 312 throw new IOException("Unknown data type: " + type); 313 } 314 ds.writeByte(type); 315 bs.marshal(ds); 316 dsm.tightMarshal2(this, c, ds, bs); 317 } 318 } 319 320 /** 321 * Allows you to dynamically switch the version of the openwire protocol 322 * being used. 323 * 324 * @param version 325 */ 326 public void setVersion(int version) { 327 String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory"; 328 Class mfClass; 329 try { 330 mfClass = Class.forName(mfName, false, getClass().getClassLoader()); 331 } catch (ClassNotFoundException e) { 332 throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version 333 + ", could not load " + mfName) 334 .initCause(e); 335 } 336 try { 337 Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class}); 338 dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this}); 339 } catch (Throwable e) { 340 throw (IllegalArgumentException)new IllegalArgumentException( 341 "Invalid version: " 342 + version 343 + ", " 344 + mfName 345 + " does not properly implement the createMarshallerMap method.") 346 .initCause(e); 347 } 348 this.version = version; 349 } 350 351 public Object doUnmarshal(DataInput dis) throws IOException { 352 byte dataType = dis.readByte(); 353 if (dataType != NULL_TYPE) { 354 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; 355 if (dsm == null) { 356 throw new IOException("Unknown data type: " + dataType); 357 } 358 Object data = dsm.createObject(); 359 if (this.tightEncodingEnabled) { 360 BooleanStream bs = new BooleanStream(); 361 bs.unmarshal(dis); 362 dsm.tightUnmarshal(this, data, dis, bs); 363 } else { 364 dsm.looseUnmarshal(this, data, dis); 365 } 366 return data; 367 } else { 368 return null; 369 } 370 } 371 372 // public void debug(String msg) { 373 // String t = (Thread.currentThread().getName()+" ").substring(0, 40); 374 // System.out.println(t+": "+msg); 375 // } 376 public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException { 377 bs.writeBoolean(o != null); 378 if (o == null) { 379 return 0; 380 } 381 382 if (o.isMarshallAware()) { 383 // MarshallAware ma = (MarshallAware)o; 384 ByteSequence sequence = null; 385 // sequence=ma.getCachedMarshalledForm(this); 386 bs.writeBoolean(sequence != null); 387 if (sequence != null) { 388 return 1 + sequence.getLength(); 389 } 390 } 391 392 byte type = o.getDataStructureType(); 393 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 394 if (dsm == null) { 395 throw new IOException("Unknown data type: " + type); 396 } 397 return 1 + dsm.tightMarshal1(this, o, bs); 398 } 399 400 public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs) 401 throws IOException { 402 if (!bs.readBoolean()) { 403 return; 404 } 405 406 byte type = o.getDataStructureType(); 407 ds.writeByte(type); 408 409 if (o.isMarshallAware() && bs.readBoolean()) { 410 411 // We should not be doing any caching 412 throw new IOException("Corrupted stream"); 413 // MarshallAware ma = (MarshallAware) o; 414 // ByteSequence sequence=ma.getCachedMarshalledForm(this); 415 // ds.write(sequence.getData(), sequence.getOffset(), 416 // sequence.getLength()); 417 418 } else { 419 420 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 421 if (dsm == null) { 422 throw new IOException("Unknown data type: " + type); 423 } 424 dsm.tightMarshal2(this, o, ds, bs); 425 426 } 427 } 428 429 public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException { 430 if (bs.readBoolean()) { 431 432 byte dataType = dis.readByte(); 433 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; 434 if (dsm == null) { 435 throw new IOException("Unknown data type: " + dataType); 436 } 437 DataStructure data = dsm.createObject(); 438 439 if (data.isMarshallAware() && bs.readBoolean()) { 440 441 dis.readInt(); 442 dis.readByte(); 443 444 BooleanStream bs2 = new BooleanStream(); 445 bs2.unmarshal(dis); 446 dsm.tightUnmarshal(this, data, dis, bs2); 447 448 // TODO: extract the sequence from the dis and associate it. 449 // MarshallAware ma = (MarshallAware)data 450 // ma.setCachedMarshalledForm(this, sequence); 451 452 } else { 453 dsm.tightUnmarshal(this, data, dis, bs); 454 } 455 456 return data; 457 } else { 458 return null; 459 } 460 } 461 462 public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException { 463 if (dis.readBoolean()) { 464 465 byte dataType = dis.readByte(); 466 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; 467 if (dsm == null) { 468 throw new IOException("Unknown data type: " + dataType); 469 } 470 DataStructure data = dsm.createObject(); 471 dsm.looseUnmarshal(this, data, dis); 472 return data; 473 474 } else { 475 return null; 476 } 477 } 478 479 public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException { 480 dataOut.writeBoolean(o != null); 481 if (o != null) { 482 byte type = o.getDataStructureType(); 483 dataOut.writeByte(type); 484 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; 485 if (dsm == null) { 486 throw new IOException("Unknown data type: " + type); 487 } 488 dsm.looseMarshal(this, o, dataOut); 489 } 490 } 491 492 public void runMarshallCacheEvictionSweep() { 493 // Do we need to start evicting?? 494 while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) { 495 496 marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]); 497 marshallCache[nextMarshallCacheEvictionIndex] = null; 498 499 nextMarshallCacheEvictionIndex++; 500 if (nextMarshallCacheEvictionIndex >= marshallCache.length) { 501 nextMarshallCacheEvictionIndex = 0; 502 } 503 504 } 505 } 506 507 public Short getMarshallCacheIndex(DataStructure o) { 508 return marshallCacheMap.get(o); 509 } 510 511 public Short addToMarshallCache(DataStructure o) { 512 short i = nextMarshallCacheIndex++; 513 if (nextMarshallCacheIndex >= marshallCache.length) { 514 nextMarshallCacheIndex = 0; 515 } 516 517 // We can only cache that item if there is space left. 518 if (marshallCacheMap.size() < marshallCache.length) { 519 marshallCache[i] = o; 520 Short index = new Short(i); 521 marshallCacheMap.put(o, index); 522 return index; 523 } else { 524 // Use -1 to indicate that the value was not cached due to cache 525 // being full. 526 return new Short((short)-1); 527 } 528 } 529 530 public void setInUnmarshallCache(short index, DataStructure o) { 531 532 // There was no space left in the cache, so we can't 533 // put this in the cache. 534 if (index == -1) { 535 return; 536 } 537 538 unmarshallCache[index] = o; 539 } 540 541 public DataStructure getFromUnmarshallCache(short index) { 542 return unmarshallCache[index]; 543 } 544 545 public void setStackTraceEnabled(boolean b) { 546 stackTraceEnabled = b; 547 } 548 549 public boolean isStackTraceEnabled() { 550 return stackTraceEnabled; 551 } 552 553 public boolean isTcpNoDelayEnabled() { 554 return tcpNoDelayEnabled; 555 } 556 557 public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) { 558 this.tcpNoDelayEnabled = tcpNoDelayEnabled; 559 } 560 561 public boolean isCacheEnabled() { 562 return cacheEnabled; 563 } 564 565 public void setCacheEnabled(boolean cacheEnabled) { 566 this.cacheEnabled = cacheEnabled; 567 } 568 569 public boolean isTightEncodingEnabled() { 570 return tightEncodingEnabled; 571 } 572 573 public void setTightEncodingEnabled(boolean tightEncodingEnabled) { 574 this.tightEncodingEnabled = tightEncodingEnabled; 575 } 576 577 public boolean isSizePrefixDisabled() { 578 return sizePrefixDisabled; 579 } 580 581 public void setSizePrefixDisabled(boolean prefixPacketSize) { 582 this.sizePrefixDisabled = prefixPacketSize; 583 } 584 585 public void setPreferedWireFormatInfo(WireFormatInfo info) { 586 this.preferedWireFormatInfo = info; 587 } 588 589 public WireFormatInfo getPreferedWireFormatInfo() { 590 return preferedWireFormatInfo; 591 } 592 593 public void renegotiateWireFormat(WireFormatInfo info) throws IOException { 594 595 if (preferedWireFormatInfo == null) { 596 throw new IllegalStateException("Wireformat cannot not be renegotiated."); 597 } 598 599 this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion())); 600 info.setVersion(this.getVersion()); 601 602 this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled(); 603 info.setStackTraceEnabled(this.stackTraceEnabled); 604 605 this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled(); 606 info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled); 607 608 this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled(); 609 info.setCacheEnabled(this.cacheEnabled); 610 611 this.tightEncodingEnabled = info.isTightEncodingEnabled() 612 && preferedWireFormatInfo.isTightEncodingEnabled(); 613 info.setTightEncodingEnabled(this.tightEncodingEnabled); 614 615 this.sizePrefixDisabled = info.isSizePrefixDisabled() 616 && preferedWireFormatInfo.isSizePrefixDisabled(); 617 info.setSizePrefixDisabled(this.sizePrefixDisabled); 618 619 if (cacheEnabled) { 620 621 int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize()); 622 info.setCacheSize(size); 623 624 if (size == 0) { 625 size = MARSHAL_CACHE_SIZE; 626 } 627 628 marshallCache = new DataStructure[size]; 629 unmarshallCache = new DataStructure[size]; 630 nextMarshallCacheIndex = 0; 631 nextMarshallCacheEvictionIndex = 0; 632 marshallCacheMap = new HashMap<DataStructure, Short>(); 633 } else { 634 marshallCache = null; 635 unmarshallCache = null; 636 nextMarshallCacheIndex = 0; 637 nextMarshallCacheEvictionIndex = 0; 638 marshallCacheMap = null; 639 } 640 641 } 642 643 protected int min(int version1, int version2) { 644 if (version1 < version2 && version1 > 0 || version2 <= 0) { 645 return version1; 646 } 647 return version2; 648 } 649}