001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc.adapter; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.io.OutputStream; 022import java.sql.Blob; 023import java.sql.Connection; 024import java.sql.PreparedStatement; 025import java.sql.ResultSet; 026import java.sql.SQLException; 027 028import javax.jms.JMSException; 029 030import org.apache.activemq.store.jdbc.TransactionContext; 031import org.apache.activemq.util.ByteArrayOutputStream; 032 033/** 034 * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob() 035 * operations. This is a little more involved since to insert a blob you have 036 * to: 037 * 038 * 1: insert empty blob. 2: select the blob 3: finally update the blob with data 039 * value. 040 * 041 * The databases/JDBC drivers that use this adapter are: 042 * <ul> 043 * <li></li> 044 * </ul> 045 * 046 * @org.apache.xbean.XBean element="blobJDBCAdapter" 047 * 048 * 049 */ 050public class BlobJDBCAdapter extends DefaultJDBCAdapter { 051 052 public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) 053 throws SQLException, JMSException { 054 PreparedStatement s = null; 055 ResultSet rs = null; 056 try { 057 058 // Add the Blob record. 059 s = c.prepareStatement(statements.getAddMessageStatement()); 060 s.setLong(1, seq); 061 s.setString(2, destinationName); 062 s.setString(3, messageID); 063 s.setString(4, " "); 064 065 if (s.executeUpdate() != 1) { 066 throw new JMSException("Failed to broker message: " + messageID + " in container."); 067 } 068 s.close(); 069 070 // Select the blob record so that we can update it. 071 s = c.prepareStatement(statements.getFindMessageStatement()); 072 s.setLong(1, seq); 073 rs = s.executeQuery(); 074 if (!rs.next()) { 075 throw new JMSException("Failed to broker message: " + messageID + " in container."); 076 } 077 078 // Update the blob 079 Blob blob = rs.getBlob(1); 080 OutputStream stream = blob.setBinaryStream(data.length); 081 stream.write(data); 082 stream.close(); 083 s.close(); 084 085 // Update the row with the updated blob 086 s = c.prepareStatement(statements.getUpdateMessageStatement()); 087 s.setBlob(1, blob); 088 s.setLong(2, seq); 089 090 } catch (IOException e) { 091 throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e); 092 } finally { 093 try { 094 rs.close(); 095 } catch (Throwable ignore) { 096 } 097 try { 098 s.close(); 099 } catch (Throwable ignore) { 100 } 101 } 102 } 103 104 public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException { 105 PreparedStatement s = null; 106 ResultSet rs = null; 107 try { 108 109 s = c.getConnection().prepareStatement(statements.getFindMessageStatement()); 110 s.setLong(1, seq); 111 rs = s.executeQuery(); 112 113 if (!rs.next()) { 114 return null; 115 } 116 Blob blob = rs.getBlob(1); 117 InputStream is = blob.getBinaryStream(); 118 119 ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length()); 120 int ch; 121 while ((ch = is.read()) >= 0) { 122 os.write(ch); 123 } 124 is.close(); 125 os.close(); 126 127 return os.toByteArray(); 128 129 } catch (IOException e) { 130 throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e); 131 } finally { 132 try { 133 rs.close(); 134 } catch (Throwable ignore) { 135 } 136 try { 137 s.close(); 138 } catch (Throwable ignore) { 139 } 140 } 141 } 142 143}