001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.transport.stomp;
019
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.net.Socket;
026import java.net.UnknownHostException;
027import java.util.HashMap;
028
029public class StompConnection {
030
031    public static final long RECEIVE_TIMEOUT = 10000;
032
033    private Socket stompSocket;
034    private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
035
036    public void open(String host, int port) throws IOException, UnknownHostException {
037        open(new Socket(host, port));
038    }
039
040    public void open(Socket socket) {
041        stompSocket = socket;
042    }
043
044    public void close() throws IOException {
045        if (stompSocket != null) {
046            stompSocket.close();
047            stompSocket = null;
048        }
049    }
050
051    public void sendFrame(String data) throws Exception {
052        byte[] bytes = data.getBytes("UTF-8");
053        OutputStream outputStream = stompSocket.getOutputStream();
054        outputStream.write(bytes);
055        outputStream.write(0);
056        outputStream.flush();
057    }
058
059    public void sendFrame(String frame, byte[] data) throws Exception {
060        byte[] bytes = frame.getBytes("UTF-8");
061        OutputStream outputStream = stompSocket.getOutputStream();
062        outputStream.write(bytes);
063        outputStream.write(data);
064        outputStream.write(0);
065        outputStream.flush();
066    }
067
068    public StompFrame receive() throws Exception {
069        return receive(RECEIVE_TIMEOUT);
070    }
071
072    public StompFrame receive(long timeOut) throws Exception {
073        stompSocket.setSoTimeout((int)timeOut);
074        InputStream is = stompSocket.getInputStream();
075        StompWireFormat wf = new StompWireFormat();
076        DataInputStream dis = new DataInputStream(is);
077        return (StompFrame)wf.unmarshal(dis);
078    }
079
080    public String receiveFrame() throws Exception {
081        return receiveFrame(RECEIVE_TIMEOUT);
082    }
083
084    public String receiveFrame(long timeOut) throws Exception {
085        stompSocket.setSoTimeout((int)timeOut);
086        InputStream is = stompSocket.getInputStream();
087        int c = 0;
088        for (;;) {
089            c = is.read();
090            if (c < 0) {
091                throw new IOException("socket closed.");
092            } else if (c == 0) {
093                c = is.read();
094                if (c == '\n') {
095                    // end of frame
096                    return stringFromBuffer(inputBuffer);
097                } else {
098                    inputBuffer.write(0);
099                    inputBuffer.write(c);
100                }
101            } else {
102                inputBuffer.write(c);
103            }
104        }
105    }
106
107        private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
108            byte[] ba = inputBuffer.toByteArray();
109        inputBuffer.reset();
110        return new String(ba, "UTF-8");
111    }
112
113    public Socket getStompSocket() {
114                return stompSocket;
115        }
116
117        public void setStompSocket(Socket stompSocket) {
118                this.stompSocket = stompSocket;
119        }
120
121    public void connect(String username, String password) throws Exception {
122        connect(username, password, null);
123    }
124
125    public void connect(String username, String password, String client) throws Exception {
126        HashMap<String, String> headers = new HashMap();
127        headers.put("login", username);
128        headers.put("passcode", password);
129        if (client != null) {
130                headers.put("client-id", client);
131        }
132        StompFrame frame = new StompFrame("CONNECT", headers);
133        sendFrame(frame.format());
134
135        StompFrame connect = receive();
136        if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
137                throw new Exception ("Not connected: " + connect.getBody());
138        }
139    }
140
141    public void disconnect() throws Exception {
142        StompFrame frame = new StompFrame("DISCONNECT");
143        sendFrame(frame.format());
144    }
145
146    public void send(String destination, String message) throws Exception {
147        send(destination, message, null, null);
148    }
149
150    public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
151        if (headers == null) {
152                headers = new HashMap<String, String>();
153        }
154        headers.put("destination", destination);
155        if (transaction != null) {
156                headers.put("transaction", transaction);
157        }
158        StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
159        sendFrame(frame.format());
160    }
161
162    public void subscribe(String destination) throws Exception {
163        subscribe(destination, null, null);
164    }
165
166    public void subscribe(String destination, String ack) throws Exception {
167        subscribe(destination, ack, new HashMap<String, String>());
168    }
169
170    public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
171                if (headers == null) {
172                        headers = new HashMap<String, String>();
173                }
174                headers.put("destination", destination);
175        if (ack != null) {
176                headers.put("ack", ack);
177        }
178        StompFrame frame = new StompFrame("SUBSCRIBE", headers);
179        sendFrame(frame.format());
180    }
181
182    public void unsubscribe(String destination) throws Exception {
183        unsubscribe(destination, null);
184    }
185
186    public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
187                if (headers == null) {
188                        headers = new HashMap<String, String>();
189                }
190                headers.put("destination", destination);
191        StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
192        sendFrame(frame.format());
193    }    
194    
195    public void begin(String transaction) throws Exception {
196        HashMap<String, String> headers = new HashMap<String, String>();
197        headers.put("transaction", transaction);
198        StompFrame frame = new StompFrame("BEGIN", headers);
199        sendFrame(frame.format());
200    }
201
202    public void abort(String transaction) throws Exception {
203        HashMap<String, String> headers = new HashMap<String, String>();
204        headers.put("transaction", transaction);
205        StompFrame frame = new StompFrame("ABORT", headers);
206        sendFrame(frame.format());
207    }
208
209    public void commit(String transaction) throws Exception {
210        HashMap<String, String> headers = new HashMap<String, String>();
211        headers.put("transaction", transaction);
212        StompFrame frame = new StompFrame("COMMIT", headers);
213        sendFrame(frame.format());
214    }
215
216    public void ack(StompFrame frame) throws Exception {
217        ack(frame.getHeaders().get("message-id"), null);
218    }
219
220    public void ack(StompFrame frame, String transaction) throws Exception {
221        ack(frame.getHeaders().get("message-id"), transaction);
222    }
223
224    public void ack(String messageId) throws Exception {
225        ack(messageId, null);
226    }
227
228    public void ack(String messageId, String transaction) throws Exception {
229        HashMap<String, String> headers = new HashMap<String, String>();
230        headers.put("message-id", messageId);
231        if (transaction != null)
232                headers.put("transaction", transaction);
233        StompFrame frame = new StompFrame("ACK", headers);
234        sendFrame(frame.format());
235    }
236
237    protected String appendHeaders(HashMap<String, Object> headers) {
238        StringBuffer result = new StringBuffer();
239        for (String key : headers.keySet()) {
240                result.append(key + ":" + headers.get(key) + "\n");
241        }
242        result.append("\n");
243        return result.toString();
244    }
245
246}