001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.reliable;
018
019import java.io.IOException;
020import java.util.SortedSet;
021import java.util.TreeSet;
022
023import org.apache.activemq.command.Command;
024import org.apache.activemq.command.ReplayCommand;
025import org.apache.activemq.command.Response;
026import org.apache.activemq.openwire.CommandIdComparator;
027import org.apache.activemq.transport.FutureResponse;
028import org.apache.activemq.transport.ResponseCorrelator;
029import org.apache.activemq.transport.Transport;
030import org.apache.activemq.transport.udp.UdpTransport;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * This interceptor deals with out of order commands together with being able to
036 * handle dropped commands and the re-requesting dropped commands.
037 * 
038 * 
039 */
040public class ReliableTransport extends ResponseCorrelator {
041    private static final Logger LOG = LoggerFactory.getLogger(ReliableTransport.class);
042
043    private ReplayStrategy replayStrategy;
044    private SortedSet<Command> commands = new TreeSet<Command>(new CommandIdComparator());
045    private int expectedCounter = 1;
046    private int replayBufferCommandCount = 50;
047    private int requestTimeout = 2000;
048    private ReplayBuffer replayBuffer;
049    private Replayer replayer;
050    private UdpTransport udpTransport;
051
052    public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
053        super(next);
054        this.replayStrategy = replayStrategy;
055    }
056
057    public ReliableTransport(Transport next, UdpTransport udpTransport) throws IOException {
058        super(next, udpTransport.getSequenceGenerator());
059        this.udpTransport = udpTransport;
060        this.replayer = udpTransport.createReplayer();
061    }
062
063    /**
064     * Requests that a range of commands be replayed
065     */
066    public void requestReplay(int fromCommandId, int toCommandId) {
067        ReplayCommand replay = new ReplayCommand();
068        replay.setFirstNakNumber(fromCommandId);
069        replay.setLastNakNumber(toCommandId);
070        try {
071            oneway(replay);
072        } catch (IOException e) {
073            getTransportListener().onException(e);
074        }
075    }
076
077    public Object request(Object o) throws IOException {
078        final Command command = (Command)o;
079        FutureResponse response = asyncRequest(command, null);
080        while (true) {
081            Response result = response.getResult(requestTimeout);
082            if (result != null) {
083                return result;
084            }
085            onMissingResponse(command, response);
086        }
087    }
088
089    public Object request(Object o, int timeout) throws IOException {
090        final Command command = (Command)o;
091        FutureResponse response = asyncRequest(command, null);
092        while (timeout > 0) {
093            int time = timeout;
094            if (timeout > requestTimeout) {
095                time = requestTimeout;
096            }
097            Response result = response.getResult(time);
098            if (result != null) {
099                return result;
100            }
101            onMissingResponse(command, response);
102            timeout -= time;
103        }
104        return response.getResult(0);
105    }
106
107    public void onCommand(Object o) {
108        Command command = (Command)o;
109        // lets pass wireformat through
110        if (command.isWireFormatInfo()) {
111            super.onCommand(command);
112            return;
113        } else if (command.getDataStructureType() == ReplayCommand.DATA_STRUCTURE_TYPE) {
114            replayCommands((ReplayCommand)command);
115            return;
116        }
117
118        int actualCounter = command.getCommandId();
119        boolean valid = expectedCounter == actualCounter;
120
121        if (!valid) {
122            synchronized (commands) {
123                int nextCounter = actualCounter;
124                boolean empty = commands.isEmpty();
125                if (!empty) {
126                    Command nextAvailable = commands.first();
127                    nextCounter = nextAvailable.getCommandId();
128                }
129
130                try {
131                    boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter, nextCounter);
132
133                    if (keep) {
134                        // lets add it to the list for later on
135                        if (LOG.isDebugEnabled()) {
136                            LOG.debug("Received out of order command which is being buffered for later: " + command);
137                        }
138                        commands.add(command);
139                    }
140                } catch (IOException e) {
141                    onException(e);
142                }
143
144                if (!empty) {
145                    // lets see if the first item in the set is the next
146                    // expected
147                    command = commands.first();
148                    valid = expectedCounter == command.getCommandId();
149                    if (valid) {
150                        commands.remove(command);
151                    }
152                }
153            }
154        }
155
156        while (valid) {
157            // we've got a valid header so increment counter
158            replayStrategy.onReceivedPacket(this, expectedCounter);
159            expectedCounter++;
160            super.onCommand(command);
161
162            synchronized (commands) {
163                // we could have more commands left
164                valid = !commands.isEmpty();
165                if (valid) {
166                    // lets see if the first item in the set is the next
167                    // expected
168                    command = commands.first();
169                    valid = expectedCounter == command.getCommandId();
170                    if (valid) {
171                        commands.remove(command);
172                    }
173                }
174            }
175        }
176    }
177
178    public int getBufferedCommandCount() {
179        synchronized (commands) {
180            return commands.size();
181        }
182    }
183
184    public int getExpectedCounter() {
185        return expectedCounter;
186    }
187
188    /**
189     * This property should never really be set - but is mutable primarily for
190     * test cases
191     */
192    public void setExpectedCounter(int expectedCounter) {
193        this.expectedCounter = expectedCounter;
194    }
195
196    public int getRequestTimeout() {
197        return requestTimeout;
198    }
199
200    /**
201     * Sets the default timeout of requests before starting to request commands
202     * are replayed
203     */
204    public void setRequestTimeout(int requestTimeout) {
205        this.requestTimeout = requestTimeout;
206    }
207
208    public ReplayStrategy getReplayStrategy() {
209        return replayStrategy;
210    }
211
212    public ReplayBuffer getReplayBuffer() {
213        if (replayBuffer == null) {
214            replayBuffer = createReplayBuffer();
215        }
216        return replayBuffer;
217    }
218
219    public void setReplayBuffer(ReplayBuffer replayBuffer) {
220        this.replayBuffer = replayBuffer;
221    }
222
223    public int getReplayBufferCommandCount() {
224        return replayBufferCommandCount;
225    }
226
227    /**
228     * Sets the default number of commands which are buffered
229     */
230    public void setReplayBufferCommandCount(int replayBufferSize) {
231        this.replayBufferCommandCount = replayBufferSize;
232    }
233
234    public void setReplayStrategy(ReplayStrategy replayStrategy) {
235        this.replayStrategy = replayStrategy;
236    }
237
238    public Replayer getReplayer() {
239        return replayer;
240    }
241
242    public void setReplayer(Replayer replayer) {
243        this.replayer = replayer;
244    }
245
246    public String toString() {
247        return next.toString();
248    }
249
250    public void start() throws Exception {
251        if (udpTransport != null) {
252            udpTransport.setReplayBuffer(getReplayBuffer());
253        }
254        if (replayStrategy == null) {
255            throw new IllegalArgumentException("Property replayStrategy not specified");
256        }
257        super.start();
258    }
259
260    /**
261     * Lets attempt to replay the request as a command may have disappeared
262     */
263    protected void onMissingResponse(Command command, FutureResponse response) {
264        LOG.debug("Still waiting for response on: " + this + " to command: " + command + " sending replay message");
265
266        int commandId = command.getCommandId();
267        requestReplay(commandId, commandId);
268    }
269
270    protected ReplayBuffer createReplayBuffer() {
271        return new DefaultReplayBuffer(getReplayBufferCommandCount());
272    }
273
274    protected void replayCommands(ReplayCommand command) {
275        try {
276            if (replayer == null) {
277                onException(new IOException("Cannot replay commands. No replayer property configured"));
278            }
279            if (LOG.isDebugEnabled()) {
280                LOG.debug("Processing replay command: " + command);
281            }
282            getReplayBuffer().replayMessages(command.getFirstNakNumber(), command.getLastNakNumber(), replayer);
283
284            // TODO we could proactively remove ack'd stuff from the replay
285            // buffer
286            // if we only have a single client talking to us
287        } catch (IOException e) {
288            onException(e);
289        }
290    }
291
292}