001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc.adapter;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.OutputStream;
022import java.sql.Blob;
023import java.sql.Connection;
024import java.sql.PreparedStatement;
025import java.sql.ResultSet;
026import java.sql.SQLException;
027
028import javax.jms.JMSException;
029
030import org.apache.activemq.store.jdbc.TransactionContext;
031import org.apache.activemq.util.ByteArrayOutputStream;
032
033/**
034 * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob()
035 * operations. This is a little more involved since to insert a blob you have
036 * to:
037 * 
038 * 1: insert empty blob. 2: select the blob 3: finally update the blob with data
039 * value.
040 * 
041 * The databases/JDBC drivers that use this adapter are:
042 * <ul>
043 * <li></li>
044 * </ul>
045 * 
046 * @org.apache.xbean.XBean element="blobJDBCAdapter"
047 * 
048 * 
049 */
050public class BlobJDBCAdapter extends DefaultJDBCAdapter {
051
052    public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data)
053        throws SQLException, JMSException {
054        PreparedStatement s = null;
055        ResultSet rs = null;
056        try {
057
058            // Add the Blob record.
059            s = c.prepareStatement(statements.getAddMessageStatement());
060            s.setLong(1, seq);
061            s.setString(2, destinationName);
062            s.setString(3, messageID);
063            s.setString(4, " ");
064
065            if (s.executeUpdate() != 1) {
066                throw new JMSException("Failed to broker message: " + messageID + " in container.");
067            }
068            s.close();
069
070            // Select the blob record so that we can update it.
071            s = c.prepareStatement(statements.getFindMessageStatement());
072            s.setLong(1, seq);
073            rs = s.executeQuery();
074            if (!rs.next()) {
075                throw new JMSException("Failed to broker message: " + messageID + " in container.");
076            }
077
078            // Update the blob
079            Blob blob = rs.getBlob(1);
080            OutputStream stream = blob.setBinaryStream(data.length);
081            stream.write(data);
082            stream.close();
083            s.close();
084
085            // Update the row with the updated blob
086            s = c.prepareStatement(statements.getUpdateMessageStatement());
087            s.setBlob(1, blob);
088            s.setLong(2, seq);
089
090        } catch (IOException e) {
091            throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e);
092        } finally {
093            try {
094                rs.close();
095            } catch (Throwable ignore) {
096            }
097            try {
098                s.close();
099            } catch (Throwable ignore) {
100            }
101        }
102    }
103
104    public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException {
105        PreparedStatement s = null;
106        ResultSet rs = null;
107        try {
108
109            s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
110            s.setLong(1, seq);
111            rs = s.executeQuery();
112
113            if (!rs.next()) {
114                return null;
115            }
116            Blob blob = rs.getBlob(1);
117            InputStream is = blob.getBinaryStream();
118
119            ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length());
120            int ch;
121            while ((ch = is.read()) >= 0) {
122                os.write(ch);
123            }
124            is.close();
125            os.close();
126
127            return os.toByteArray();
128
129        } catch (IOException e) {
130            throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e);
131        } finally {
132            try {
133                rs.close();
134            } catch (Throwable ignore) {
135            }
136            try {
137                s.close();
138            } catch (Throwable ignore) {
139            }
140        }
141    }
142
143}