in src/com/amazon/kinesis/streaming/agent/tailing/checkpoints/SQLiteFileCheckpointStore.java [148:199]
private FileCheckpoint createOrUpdateCheckpoint(FileCheckpoint cp) {
if (!ensureConnected())
return null;
try {
@Cleanup PreparedStatement update = connection.prepareStatement(
"update FILE_CHECKPOINTS " +
"set fileId=?, offset=?, lastModifiedTime=?, size=?, " +
"lastUpdated=strftime('%Y-%m-%d %H:%M:%f', 'now') " +
"where flow=? and path=?");
update.setString(1, cp.getFile().getId().toString());
update.setLong(2, cp.getOffset());
update.setLong(3, cp.getFile().getLastModifiedTime());
update.setLong(4, cp.getFile().getSize());
update.setString(5, cp.getFile().getFlow().getId());
update.setString(6, cp.getFile().getPath().toAbsolutePath().toString());
int affected = update.executeUpdate();
if (affected == 0) {
@Cleanup PreparedStatement insert = connection.prepareStatement(
"insert or ignore into FILE_CHECKPOINTS " +
"values(?, ?, ?, ?, ?, ?, strftime('%Y-%m-%d %H:%M:%f', 'now'))");
insert.setString(1, cp.getFile().getFlow().getId());
insert.setString(2, cp.getFile().getPath().toAbsolutePath().toString());
insert.setString(3, cp.getFile().getId().toString());
insert.setLong(4, cp.getFile().getLastModifiedTime());
insert.setLong(5, cp.getFile().getSize());
insert.setLong(6, cp.getOffset());
affected = insert.executeUpdate();
if (affected == 1) {
LOGGER.trace("Created new database checkpoint: {}@{}", cp.getFile(), cp.getOffset());
} else {
// SANITYCHECK: This should never happen since method is synchronized.
// TODO: Remove when done debugging.
LOGGER.error("Did not update or create checkpoint because of race condition: {}@{}", cp.getFile(), cp.getOffset());
throw new RuntimeException("Race condition detected when setting checkpoint for file: " + cp.getFile().getPath());
}
} else {
LOGGER.trace("Updated database checkpoint: {}@{}", cp.getFile(), cp.getOffset());
}
connection.commit();
return cp;
} catch (SQLException e) {
LOGGER.error("Failed to create the checkpoint {}@{} in database {}", cp.getFile(), cp.getOffset(), dbFile);
try {
connection.rollback();
} catch (SQLException e2) {
LOGGER.error("Failed to rollback checkpointing transaction: {}@{}", cp.getFile(), cp.getOffset());
LOGGER.info("Reinitializing connection to database {}", dbFile);
close();
}
return null;
}
}