001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.io.IOException;
020import java.util.Arrays;
021
022import javax.jms.JMSException;
023
024import org.apache.activemq.filter.BooleanExpression;
025import org.apache.activemq.filter.MessageEvaluationContext;
026import org.apache.activemq.util.JMSExceptionSupport;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * @openwire:marshaller code="91"
032 * 
033 */
034public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
035
036    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
037    static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class);
038
039    private BrokerId networkBrokerId;
040    private int networkTTL;
041
042    public NetworkBridgeFilter() {
043    }
044
045    public NetworkBridgeFilter(BrokerId remoteBrokerPath, int networkTTL) {
046        this.networkBrokerId = remoteBrokerPath;
047        this.networkTTL = networkTTL;
048    }
049
050    public byte getDataStructureType() {
051        return DATA_STRUCTURE_TYPE;
052    }
053
054    public boolean isMarshallAware() {
055        return false;
056    }
057
058    public boolean matches(MessageEvaluationContext mec) throws JMSException {
059        try {
060            // for Queues - the message can be acknowledged and dropped whilst
061            // still
062            // in the dispatch loop
063            // so need to get the reference to it
064            Message message = mec.getMessage();
065            return message != null && matchesForwardingFilter(message);
066        } catch (IOException e) {
067            throw JMSExceptionSupport.create(e);
068        }
069    }
070
071    public Object evaluate(MessageEvaluationContext message) throws JMSException {
072        return matches(message) ? Boolean.TRUE : Boolean.FALSE;
073    }
074
075    protected boolean matchesForwardingFilter(Message message) {
076
077        if (contains(message.getBrokerPath(), networkBrokerId)) {
078            if (LOG.isTraceEnabled()) {
079                LOG.trace("Message all ready routed once through this broker ("
080                        + networkBrokerId + "), path: "
081                        + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
082            }
083            return false;
084        }
085
086        int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
087
088        if (hops >= networkTTL) {
089            if (LOG.isTraceEnabled()) {
090                LOG.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message);
091            }
092            return false;
093        }
094
095        // Don't propagate advisory messages about network subscriptions
096        if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
097            ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
098            hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
099            if (hops >= networkTTL) {
100                if (LOG.isTraceEnabled()) {
101                    LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
102                }
103                return false;
104            }
105        }
106        return true;
107    }
108
109    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
110        if (brokerPath != null && brokerId != null) {
111            for (int i = 0; i < brokerPath.length; i++) {
112                if (brokerId.equals(brokerPath[i])) {
113                    return true;
114                }
115            }
116        }
117        return false;
118    }
119
120    /**
121     * @openwire:property version=1
122     */
123    public int getNetworkTTL() {
124        return networkTTL;
125    }
126
127    public void setNetworkTTL(int networkTTL) {
128        this.networkTTL = networkTTL;
129    }
130
131    /**
132     * @openwire:property version=1 cache=true
133     */
134    public BrokerId getNetworkBrokerId() {
135        return networkBrokerId;
136    }
137
138    public void setNetworkBrokerId(BrokerId remoteBrokerPath) {
139        this.networkBrokerId = remoteBrokerPath;
140    }
141
142}