001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.io.OutputStream;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.Map;
024
025import javax.jms.InvalidDestinationException;
026import javax.jms.JMSException;
027
028import org.apache.activemq.command.ActiveMQBytesMessage;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.ActiveMQMessage;
031import org.apache.activemq.command.MessageId;
032import org.apache.activemq.command.ProducerId;
033import org.apache.activemq.command.ProducerInfo;
034import org.apache.activemq.util.IOExceptionSupport;
035
036/**
037 * 
038 */
039public class ActiveMQOutputStream extends OutputStream implements Disposable {
040
041    protected int count;
042
043    final byte buffer[];
044
045    private final ActiveMQConnection connection;
046    private final Map<String, Object> properties;
047    private final ProducerInfo info;
048
049    private long messageSequence;
050    private boolean closed;
051    private final int deliveryMode;
052    private final int priority;
053    private final long timeToLive;
054
055    /**
056     * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
057     */
058    public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE";
059
060    public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
061                                long timeToLive) throws JMSException {
062        this.connection = connection;
063        this.deliveryMode = deliveryMode;
064        this.priority = priority;
065        this.timeToLive = timeToLive;
066        this.properties = properties == null ? null : new HashMap<String, Object>(properties);
067
068        Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE);
069        if (chunkSize == null) {
070            chunkSize = 64 * 1024;
071        } else {
072            if (chunkSize < 1) {
073                throw new IllegalArgumentException("Chunk size must be greater then 0");
074            } else {
075                chunkSize *= 1024;
076            }
077        }
078
079        buffer = new byte[chunkSize];
080
081        if (destination == null) {
082            throw new InvalidDestinationException("Don't understand null destinations");
083        }
084
085        this.info = new ProducerInfo(producerId);
086        this.info.setDestination(destination);
087
088        this.connection.addOutputStream(this);
089        this.connection.asyncSendPacket(info);
090    }
091
092    public void close() throws IOException {
093        if (!closed) {
094            flushBuffer();
095            try {
096                // Send an EOS style empty message to signal EOS.
097                send(new ActiveMQMessage(), true);
098                dispose();
099                this.connection.asyncSendPacket(info.createRemoveCommand());
100            } catch (JMSException e) {
101                IOExceptionSupport.create(e);
102            }
103        }
104    }
105
106    public void dispose() {
107        if (!closed) {
108            this.connection.removeOutputStream(this);
109            closed = true;
110        }
111    }
112
113    public synchronized void write(int b) throws IOException {
114        buffer[count++] = (byte) b;
115        if (count == buffer.length) {
116            flushBuffer();
117        }
118    }
119
120    public synchronized void write(byte b[], int off, int len) throws IOException {
121        while (len > 0) {
122            int max = Math.min(len, buffer.length - count);
123            System.arraycopy(b, off, buffer, count, max);
124
125            len -= max;
126            count += max;
127            off += max;
128
129            if (count == buffer.length) {
130                flushBuffer();
131            }
132        }
133    }
134
135    public synchronized void flush() throws IOException {
136        flushBuffer();
137    }
138
139    private void flushBuffer() throws IOException {
140        if (count != 0) {
141            try {
142                ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
143                msg.writeBytes(buffer, 0, count);
144                send(msg, false);
145            } catch (JMSException e) {
146                throw IOExceptionSupport.create(e);
147            }
148            count = 0;
149        }
150    }
151
152    /**
153     * @param msg
154     * @throws JMSException
155     */
156    private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
157        if (properties != null) {
158            for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) {
159                String key = (String) iter.next();
160                Object value = properties.get(key);
161                msg.setObjectProperty(key, value);
162            }
163        }
164        msg.setType("org.apache.activemq.Stream");
165        msg.setGroupID(info.getProducerId().toString());
166        if (eosMessage) {
167            msg.setGroupSequence(-1);
168        } else {
169            msg.setGroupSequence((int) messageSequence);
170        }
171        MessageId id = new MessageId(info.getProducerId(), messageSequence++);
172        connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage);
173    }
174
175    public String toString() {
176        return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
177    }
178
179}