private FileCheckpoint createOrUpdateCheckpoint()

in src/com/amazon/kinesis/streaming/agent/tailing/checkpoints/SQLiteFileCheckpointStore.java [148:199]


    private FileCheckpoint createOrUpdateCheckpoint(FileCheckpoint cp) {
        if (!ensureConnected())
            return null;
        try {
            @Cleanup PreparedStatement update = connection.prepareStatement(
                    "update FILE_CHECKPOINTS " +
                    "set fileId=?, offset=?, lastModifiedTime=?, size=?, " +
                    "lastUpdated=strftime('%Y-%m-%d %H:%M:%f', 'now') " +
                    "where flow=? and path=?");
            update.setString(1, cp.getFile().getId().toString());
            update.setLong(2, cp.getOffset());
            update.setLong(3, cp.getFile().getLastModifiedTime());
            update.setLong(4, cp.getFile().getSize());
            update.setString(5, cp.getFile().getFlow().getId());
            update.setString(6, cp.getFile().getPath().toAbsolutePath().toString());
            int affected = update.executeUpdate();
            if (affected == 0) {
                @Cleanup PreparedStatement insert = connection.prepareStatement(
                        "insert or ignore into FILE_CHECKPOINTS " +
                        "values(?, ?, ?, ?, ?, ?, strftime('%Y-%m-%d %H:%M:%f', 'now'))");
                insert.setString(1, cp.getFile().getFlow().getId());
                insert.setString(2, cp.getFile().getPath().toAbsolutePath().toString());
                insert.setString(3, cp.getFile().getId().toString());
                insert.setLong(4, cp.getFile().getLastModifiedTime());
                insert.setLong(5, cp.getFile().getSize());
                insert.setLong(6, cp.getOffset());
                affected = insert.executeUpdate();
                if (affected == 1) {
                    LOGGER.trace("Created new database checkpoint: {}@{}", cp.getFile(), cp.getOffset());
                } else {
                    // SANITYCHECK: This should never happen since method is synchronized.
                    // TODO: Remove when done debugging.
                    LOGGER.error("Did not update or create checkpoint because of race condition: {}@{}", cp.getFile(), cp.getOffset());
                    throw new RuntimeException("Race condition detected when setting checkpoint for file: " + cp.getFile().getPath());
                }
            } else {
                LOGGER.trace("Updated database checkpoint: {}@{}", cp.getFile(), cp.getOffset());
            }
            connection.commit();
            return cp;
        } catch (SQLException e) {
            LOGGER.error("Failed to create the checkpoint {}@{} in database {}", cp.getFile(), cp.getOffset(), dbFile);
            try {
                connection.rollback();
            } catch (SQLException e2) {
                LOGGER.error("Failed to rollback checkpointing transaction: {}@{}", cp.getFile(), cp.getOffset());
                LOGGER.info("Reinitializing connection to database {}", dbFile);
                close();
            }
            return null;
        }
    }