001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.io.IOException; 020import java.util.Arrays; 021 022import javax.jms.JMSException; 023 024import org.apache.activemq.filter.BooleanExpression; 025import org.apache.activemq.filter.MessageEvaluationContext; 026import org.apache.activemq.util.JMSExceptionSupport; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * @openwire:marshaller code="91" 032 * 033 */ 034public class NetworkBridgeFilter implements DataStructure, BooleanExpression { 035 036 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER; 037 static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class); 038 039 private BrokerId networkBrokerId; 040 private int networkTTL; 041 042 public NetworkBridgeFilter() { 043 } 044 045 public NetworkBridgeFilter(BrokerId remoteBrokerPath, int networkTTL) { 046 this.networkBrokerId = remoteBrokerPath; 047 this.networkTTL = networkTTL; 048 } 049 050 public byte getDataStructureType() { 051 return DATA_STRUCTURE_TYPE; 052 } 053 054 public boolean isMarshallAware() { 055 return false; 056 } 057 058 public boolean matches(MessageEvaluationContext mec) throws JMSException { 059 try { 060 // for Queues - the message can be acknowledged and dropped whilst 061 // still 062 // in the dispatch loop 063 // so need to get the reference to it 064 Message message = mec.getMessage(); 065 return message != null && matchesForwardingFilter(message); 066 } catch (IOException e) { 067 throw JMSExceptionSupport.create(e); 068 } 069 } 070 071 public Object evaluate(MessageEvaluationContext message) throws JMSException { 072 return matches(message) ? Boolean.TRUE : Boolean.FALSE; 073 } 074 075 protected boolean matchesForwardingFilter(Message message) { 076 077 if (contains(message.getBrokerPath(), networkBrokerId)) { 078 if (LOG.isTraceEnabled()) { 079 LOG.trace("Message all ready routed once through this broker (" 080 + networkBrokerId + "), path: " 081 + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message); 082 } 083 return false; 084 } 085 086 int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length; 087 088 if (hops >= networkTTL) { 089 if (LOG.isTraceEnabled()) { 090 LOG.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message); 091 } 092 return false; 093 } 094 095 // Don't propagate advisory messages about network subscriptions 096 if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) { 097 ConsumerInfo info = (ConsumerInfo)message.getDataStructure(); 098 hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length; 099 if (hops >= networkTTL) { 100 if (LOG.isTraceEnabled()) { 101 LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message); 102 } 103 return false; 104 } 105 } 106 return true; 107 } 108 109 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 110 if (brokerPath != null && brokerId != null) { 111 for (int i = 0; i < brokerPath.length; i++) { 112 if (brokerId.equals(brokerPath[i])) { 113 return true; 114 } 115 } 116 } 117 return false; 118 } 119 120 /** 121 * @openwire:property version=1 122 */ 123 public int getNetworkTTL() { 124 return networkTTL; 125 } 126 127 public void setNetworkTTL(int networkTTL) { 128 this.networkTTL = networkTTL; 129 } 130 131 /** 132 * @openwire:property version=1 cache=true 133 */ 134 public BrokerId getNetworkBrokerId() { 135 return networkBrokerId; 136 } 137 138 public void setNetworkBrokerId(BrokerId remoteBrokerPath) { 139 this.networkBrokerId = remoteBrokerPath; 140 } 141 142}