public void setFetchOffsetByTimestamp()

in src/main/java/com/aliyun/dts/subscribe/clients/recordfetcher/ConsumerWrap.java [65:82]


        public void setFetchOffsetByTimestamp(TopicPartition topicPartition, Checkpoint checkpoint, boolean isCheckpointNotExistThrowException) {
            long timeStamp = checkpoint.getTimeStamp();
            Map<TopicPartition, OffsetAndTimestamp> remoteOffset = consumer.offsetsForTimes(Collections.singletonMap(topicPartition, timeStamp));
            OffsetAndTimestamp toSet = remoteOffset.get(topicPartition);
            if (null == toSet) {
                log.warn("Failed seek timestamp for topic [" + topicPartition + "] with timestamp [" + timeStamp + "] failed");
                if (isCheckpointNotExistThrowException) {
                    throw new RuntimeException("Failed seek timestamp for topic [\" + topicPartition + \"] with timestamp [\" + timeStamp + \"] failed");
                } else {
                    log.warn("Set to beginning");
                    consumer.seekToBeginning(Collections.singleton(topicPartition));
                }
            } else {
                log.info("RecordFetcher: seek for {} with checkpoint {}", topicPartition, checkpoint);

                consumer.seek(topicPartition, toSet.offset());
            }
        }