in src/main/java/com/aliyun/dts/subscribe/clients/metastore/KafkaMetaStore.java [32:57]
public Future<Checkpoint> serializeTo(TopicPartition topicPartition, String group, Checkpoint value) {
KafkaFutureImpl ret = new KafkaFutureImpl();
if (null != kafkaConsumer) {
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(value.getOffset(), String.valueOf(value.getTimeStamp()));
// Notice: commitAsync is only put commit offset request to sending queue, the future result will be driven by KafkaConsumer.poll() function
// So if you only call this method but not poll, you may not wait offset commit call back
kafkaConsumer.commitAsync(Collections.singletonMap(topicPartition, offsetAndMetadata), new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (null != exception) {
log.warn("KafkaMetaStore: Commit offset for group[" + group + "] topicPartition[" + topicPartition.toString() + "] " +
value.toString() + " failed cause " + exception.getMessage(), exception);
ret.completeExceptionally(exception);
} else {
log.debug("KafkaMetaStore:Commit offset success for group[{}] topicPartition [{}] {}", group, topicPartition, value);
ret.complete(value);
}
}
});
} else {
log.warn("KafkaMetaStore: kafka consumer not set, ignore report");
ret.complete(value);
}
return ret;
}