001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.util; 018 019import org.apache.activemq.broker.BrokerPluginSupport; 020import org.apache.activemq.broker.ProducerBrokerExchange; 021import org.apache.activemq.command.Message; 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024 025/** 026 * A Broker interceptor which updates a JMS Client's timestamp on the message 027 * with a broker timestamp. Useful when the clocks on client machines are known 028 * to not be correct and you can only trust the time set on the broker machines. 029 * 030 * Enabling this plugin will break JMS compliance since the timestamp that the 031 * producer sees on the messages after as send() will be different from the 032 * timestamp the consumer will observe when he receives the message. This plugin 033 * is not enabled in the default ActiveMQ configuration. 034 * 035 * 2 new attributes have been added which will allow the administrator some override control 036 * over the expiration time for incoming messages: 037 * 038 * Attribute 'zeroExpirationOverride' can be used to apply an expiration 039 * time to incoming messages with no expiration defined (messages that would never expire) 040 * 041 * Attribute 'ttlCeiling' can be used to apply a limit to the expiration time 042 * 043 * @org.apache.xbean.XBean element="timeStampingBrokerPlugin" 044 * 045 * 046 */ 047public class TimeStampingBrokerPlugin extends BrokerPluginSupport { 048 private static final Logger LOG = LoggerFactory.getLogger(TimeStampingBrokerPlugin.class); 049 /** 050 * variable which (when non-zero) is used to override 051 * the expiration date for messages that arrive with 052 * no expiration date set (in Milliseconds). 053 */ 054 long zeroExpirationOverride = 0; 055 056 /** 057 * variable which (when non-zero) is used to limit 058 * the expiration date (in Milliseconds). 059 */ 060 long ttlCeiling = 0; 061 062 /** 063 * If true, the plugin will not update timestamp to past values 064 * False by default 065 */ 066 boolean futureOnly = false; 067 068 069 /** 070 * if true, update timestamp even if message has passed through a network 071 * default false 072 */ 073 boolean processNetworkMessages = false; 074 075 /** 076 * setter method for zeroExpirationOverride 077 */ 078 public void setZeroExpirationOverride(long ttl) 079 { 080 this.zeroExpirationOverride = ttl; 081 } 082 083 /** 084 * setter method for ttlCeiling 085 */ 086 public void setTtlCeiling(long ttlCeiling) 087 { 088 this.ttlCeiling = ttlCeiling; 089 } 090 091 public void setFutureOnly(boolean futureOnly) { 092 this.futureOnly = futureOnly; 093 } 094 095 public void setProcessNetworkMessages(Boolean processNetworkMessages) { 096 this.processNetworkMessages = processNetworkMessages; 097 } 098 099 @Override 100 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 101 if (message.getTimestamp() > 0 102 && (processNetworkMessages || (message.getBrokerPath() == null || message.getBrokerPath().length == 0))) { 103 // timestamp not been disabled and has not passed through a network or processNetworkMessages=true 104 long oldExpiration = message.getExpiration(); 105 long newTimeStamp = System.currentTimeMillis(); 106 long timeToLive = zeroExpirationOverride; 107 long oldTimestamp = message.getTimestamp(); 108 if (oldExpiration > 0) { 109 timeToLive = oldExpiration - oldTimestamp; 110 } 111 if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) { 112 timeToLive = ttlCeiling; 113 } 114 long expiration = timeToLive + newTimeStamp; 115 //In the scenario that the Broker is behind the clients we never want to set the Timestamp and Expiration in the past 116 if(!futureOnly || (expiration > oldExpiration)) { 117 if (timeToLive > 0 && expiration > 0) { 118 message.setExpiration(expiration); 119 } 120 message.setTimestamp(newTimeStamp); 121 if (LOG.isDebugEnabled()) { 122 LOG.debug("Set message " + message.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp); 123 } 124 } 125 } 126 super.send(producerExchange, message); 127 } 128}