in flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java [774:1053]
public PersistableEvent fetchAndDeleteEvent(String channel,
Connection connection) {
PersistableEvent.Builder peBuilder = null;
PreparedStatement baseEventFetchStmt = null;
PreparedStatement spillEventFetchStmt = null;
InputStream payloadInputStream = null;
PreparedStatement baseHeaderFetchStmt = null;
PreparedStatement nameSpillHeaderStmt = null;
PreparedStatement valueSpillHeaderStmt = null;
PreparedStatement deleteSpillEventStmt = null;
PreparedStatement deleteNameSpillHeaderStmt = null;
PreparedStatement deleteValueSpillHeaderStmt = null;
PreparedStatement deleteBaseHeaderStmt = null;
PreparedStatement deleteBaseEventStmt = null;
try {
baseEventFetchStmt = connection.prepareStatement(STMT_FETCH_PAYLOAD_BASE);
baseEventFetchStmt.setString(1, channel);
ResultSet rsetBaseEvent = baseEventFetchStmt.executeQuery();
if (!rsetBaseEvent.next()) {
// Empty result set
LOGGER.debug("No events found for channel: " + channel);
return null;
}
// Populate event id, payload
long eventId = rsetBaseEvent.getLong(1);
peBuilder = new PersistableEvent.Builder(channel, eventId);
peBuilder.setBasePayload(rsetBaseEvent.getBytes(2));
boolean hasSpill = rsetBaseEvent.getBoolean(3);
if (hasSpill) {
spillEventFetchStmt =
connection.prepareStatement(STMT_FETCH_PAYLOAD_SPILL);
spillEventFetchStmt.setLong(1, eventId);
ResultSet rsetSpillEvent = spillEventFetchStmt.executeQuery();
if (!rsetSpillEvent.next()) {
throw new JdbcChannelException("Payload spill expected but not "
+ "found for event: " + eventId);
}
Blob payloadSpillBlob = rsetSpillEvent.getBlob(1);
payloadInputStream = payloadSpillBlob.getBinaryStream();
ByteArrayOutputStream spillStream = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int length = 0;
while ((length = payloadInputStream.read(buffer)) != -1) {
spillStream.write(buffer, 0, length);
}
peBuilder.setSpillPayload(spillStream.toByteArray());
// Delete this spill
deleteSpillEventStmt =
connection.prepareStatement(STMT_DELETE_EVENT_SPILL);
deleteSpillEventStmt.setLong(1, eventId);
int updateCount = deleteSpillEventStmt.executeUpdate();
if (updateCount != 1) {
throw new JdbcChannelException("Unexpected row count for spill "
+ "delete: " + updateCount);
}
}
if (rsetBaseEvent.next()) {
throw new JdbcChannelException("More than expected events retrieved");
}
// Populate headers
List<Long> nameSpillHeaders = null;
List<Long> valueSpillHeaders = null;
baseHeaderFetchStmt = connection.prepareStatement(STMT_FETCH_HEADER_BASE);
baseHeaderFetchStmt.setLong(1, eventId);
int headerCount = 0; // for later delete validation
ResultSet rsetBaseHeader = baseHeaderFetchStmt.executeQuery();
while (rsetBaseHeader.next()) {
headerCount++;
long headerId = rsetBaseHeader.getLong(1);
String baseName = rsetBaseHeader.getString(2);
String baseValue = rsetBaseHeader.getString(3);
boolean hasNameSpill = rsetBaseHeader.getBoolean(4);
boolean hasValueSpill = rsetBaseHeader.getBoolean(5);
peBuilder.setHeader(headerId, baseName, baseValue);
if (hasNameSpill) {
if (nameSpillHeaders == null) {
nameSpillHeaders = new ArrayList<Long>();
}
nameSpillHeaders.add(headerId);
}
if (hasValueSpill) {
if (valueSpillHeaders == null) {
valueSpillHeaders = new ArrayList<Long>();
}
valueSpillHeaders.add(headerId);
}
}
if (nameSpillHeaders != null) {
nameSpillHeaderStmt =
connection.prepareStatement(STMT_FETCH_HEADER_NAME_SPILL);
deleteNameSpillHeaderStmt =
connection.prepareStatement(STMT_DELETE_HEADER_NAME_SPILL);
for (long headerId : nameSpillHeaders) {
nameSpillHeaderStmt.setLong(1, headerId);
ResultSet rsetHeaderNameSpill = nameSpillHeaderStmt.executeQuery();
if (!rsetHeaderNameSpill.next()) {
throw new JdbcChannelException("Name spill was set for header "
+ headerId + " but was not found");
}
String nameSpill = rsetHeaderNameSpill.getString(1);
peBuilder.setHeaderNameSpill(headerId, nameSpill);
deleteNameSpillHeaderStmt.setLong(1, headerId);
deleteNameSpillHeaderStmt.addBatch();
}
// Delete header name spills
int[] headerNameSpillDelete = deleteNameSpillHeaderStmt.executeBatch();
if (headerNameSpillDelete.length != nameSpillHeaders.size()) {
throw new JdbcChannelException("Unexpected number of header name "
+ "spill deletes: expected " + nameSpillHeaders.size()
+ ", found: " + headerNameSpillDelete.length);
}
for (int numRowsAffected : headerNameSpillDelete) {
if (numRowsAffected != 1) {
throw new JdbcChannelException("Unexpected number of deleted rows "
+ "for header name spill deletes: " + numRowsAffected);
}
}
}
if (valueSpillHeaders != null) {
valueSpillHeaderStmt =
connection.prepareStatement(STMT_FETCH_HEADER_VALUE_SPILL);
deleteValueSpillHeaderStmt =
connection.prepareStatement(STMT_DELETE_HEADER_VALUE_SPILL);
for (long headerId: valueSpillHeaders) {
valueSpillHeaderStmt.setLong(1, headerId);
ResultSet rsetHeaderValueSpill = valueSpillHeaderStmt.executeQuery();
if (!rsetHeaderValueSpill.next()) {
throw new JdbcChannelException("Value spill was set for header "
+ headerId + " but was not found");
}
String valueSpill = rsetHeaderValueSpill.getString(1);
peBuilder.setHeaderValueSpill(headerId, valueSpill);
deleteValueSpillHeaderStmt.setLong(1, headerId);
deleteValueSpillHeaderStmt.addBatch();
}
// Delete header value spills
int[] headerValueSpillDelete = deleteValueSpillHeaderStmt.executeBatch();
if (headerValueSpillDelete.length != valueSpillHeaders.size()) {
throw new JdbcChannelException("Unexpected number of header value "
+ "spill deletes: expected " + valueSpillHeaders.size()
+ ", found: " + headerValueSpillDelete.length);
}
for (int numRowsAffected : headerValueSpillDelete) {
if (numRowsAffected != 1) {
throw new JdbcChannelException("Unexpected number of deleted rows "
+ "for header value spill deletes: " + numRowsAffected);
}
}
}
// Now delete Headers
if (headerCount > 0) {
deleteBaseHeaderStmt =
connection.prepareStatement(STMT_DELETE_HEADER_BASE);
deleteBaseHeaderStmt.setLong(1, eventId);
int rowCount = deleteBaseHeaderStmt.executeUpdate();
if (rowCount != headerCount) {
throw new JdbcChannelException("Unexpected base header delete count: "
+ "expected: " + headerCount + ", found: " + rowCount);
}
}
// Now delete the Event
deleteBaseEventStmt = connection.prepareStatement(STMT_DELETE_EVENT_BASE);
deleteBaseEventStmt.setLong(1, eventId);
int rowCount = deleteBaseEventStmt.executeUpdate();
if (rowCount != 1) {
throw new JdbcChannelException("Unexpected row count for delete of "
+ "event-id: " + eventId + ", count: " + rowCount);
}
} catch (SQLException ex) {
throw new JdbcChannelException("Unable to retrieve event", ex);
} catch (IOException ex) {
throw new JdbcChannelException("Unable to read data", ex);
} finally {
if (payloadInputStream != null) {
try {
payloadInputStream.close();
} catch (IOException ex) {
LOGGER.error("Unable to close payload spill stream", ex);
}
}
if (baseEventFetchStmt != null) {
try {
baseEventFetchStmt.close();
} catch (SQLException ex) {
LOGGER.error("Unable to close base event fetch statement", ex);
}
}
if (spillEventFetchStmt != null) {
try {
spillEventFetchStmt.close();
} catch (SQLException ex) {
LOGGER.error("Unable to close spill event fetch statment", ex);
}
}
if (deleteSpillEventStmt != null) {
try {
deleteSpillEventStmt.close();
} catch (SQLException ex) {
LOGGER.error("Unable to close event spill delete statement", ex);
}
}
if (baseHeaderFetchStmt != null) {
try {
baseHeaderFetchStmt.close();
} catch (SQLException ex) {
LOGGER.error("Unable to close base header fetch statement", ex);
}
}
if (nameSpillHeaderStmt != null) {
try {
nameSpillHeaderStmt.close();
} catch (SQLException ex) {
LOGGER.error("Unable to close name spill fetch statement", ex);
}
}
if (valueSpillHeaderStmt != null) {
try {
valueSpillHeaderStmt.close();
} catch (SQLException ex) {
LOGGER.error("Unable to close value spill fetch statement", ex);
}
}
if (deleteNameSpillHeaderStmt != null) {
try {
deleteNameSpillHeaderStmt.close();
} catch (SQLException ex) {
LOGGER.error("Unable to close value spill delete statement", ex);
}
}
if (deleteValueSpillHeaderStmt != null) {
try {
deleteValueSpillHeaderStmt.close();
} catch (SQLException ex) {
LOGGER.error("Unable to close value spill delete statement", ex);
}
}
if (deleteBaseHeaderStmt != null) {
try {
deleteBaseHeaderStmt.close();
} catch (SQLException ex) {
LOGGER.error("Unable to close base header delete statement", ex);
}
}
if (deleteBaseEventStmt != null) {
try {
deleteBaseEventStmt.close();
} catch (SQLException ex) {
LOGGER.error("Unable to close base event delete statement", ex);
}
}
}
return peBuilder.build();
}