in src/main/java/com/aliyun/dts/subscribe/clients/recordfetcher/ConsumerWrap.java [65:82]
public void setFetchOffsetByTimestamp(TopicPartition topicPartition, Checkpoint checkpoint, boolean isCheckpointNotExistThrowException) {
long timeStamp = checkpoint.getTimeStamp();
Map<TopicPartition, OffsetAndTimestamp> remoteOffset = consumer.offsetsForTimes(Collections.singletonMap(topicPartition, timeStamp));
OffsetAndTimestamp toSet = remoteOffset.get(topicPartition);
if (null == toSet) {
log.warn("Failed seek timestamp for topic [" + topicPartition + "] with timestamp [" + timeStamp + "] failed");
if (isCheckpointNotExistThrowException) {
throw new RuntimeException("Failed seek timestamp for topic [\" + topicPartition + \"] with timestamp [\" + timeStamp + \"] failed");
} else {
log.warn("Set to beginning");
consumer.seekToBeginning(Collections.singleton(topicPartition));
}
} else {
log.info("RecordFetcher: seek for {} with checkpoint {}", topicPartition, checkpoint);
consumer.seek(topicPartition, toSet.offset());
}
}