in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [155:180]
private void resumeCheckPoint(TopicPartition partition) {
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("SELECT MAX(offset) as offset ")
.append("FROM ").append(table).append(" ")
.append("WHERE topic=\"").append(partition.topic()).append("\" ")
.append("AND partition=").append(partition.partition()).append(";");
try {
Instance findLastCommittedOffset = SQLTask.run(odps, queryBuilder.toString());
findLastCommittedOffset.waitForSuccess();
ResultSet res = SQLTask.getResultSet(findLastCommittedOffset);
Long lastCommittedOffset = res.next().getBigint("offset");
LOGGER.info("Thread(" + Thread.currentThread().getId() +
") Last committed offset for (topic: " + partition.topic() + ", partition: "
+ partition.partition() + "): " + lastCommittedOffset);
if (lastCommittedOffset != null) {
// Offset should be reset to the last committed plus one, otherwise the last committed
// record will be duplicated
context.offset(partition, lastCommittedOffset + 1);
}
} catch (OdpsException | IOException e) {
LOGGER
.error("Thread(" + Thread.currentThread().getId() + ") Resume from checkpoint failed", e);
throw new RuntimeException(e);
}
}