in src/main/java/com/aliyun/dts/subscribe/clients/recordfetcher/KafkaRecordFetcher.java [157:203]
private ConsumerWrap getConsumerWrap(String message) {
ConsumerWrap kafkaConsumerWrap = getConsumerWrap();
Checkpoint checkpoint = null;
metaStoreCenter.registerStore(KAFKA_STORE_NAME, new KafkaMetaStore(kafkaConsumerWrap.getRawConsumer()));
if (useCheckpointConfig.compareAndSet(true, false)) {
log.info("RecordGenerator: force use initial checkpoint [{}] to start", checkpoint);
checkpoint = initialCheckpoint;
} else {
checkpoint = getCheckpoint();
if (null == checkpoint || Checkpoint.INVALID_STREAM_CHECKPOINT == checkpoint) {
checkpoint = initialCheckpoint;
log.info("RecordGenerator: use initial checkpoint [{}] to start", checkpoint);
} else {
log.info("RecordGenerator: load checkpoint from checkpoint store success, current checkpoint [{}]", checkpoint);
}
}
switch (subscribeMode) {
case SUBSCRIBE: {
kafkaConsumerWrap.subscribeTopic(topicPartition, () -> {
Checkpoint ret = getSubscribeCheckpoint();
if (null == ret || Checkpoint.INVALID_STREAM_CHECKPOINT == ret) {
log.info("Subscribe checkpoint is null, use initialCheckpoint: " + initialCheckpoint);
ret = initialCheckpoint;
}
return ret;
}, isCheckpointNotExistThrowException);
break;
}
case ASSIGN:{
kafkaConsumerWrap.assignTopic(topicPartition, checkpoint, isCheckpointNotExistThrowException);
break;
}
default: {
throw new RuntimeException("RecordGenerator: unknown mode not support");
}
}
//seek checkpoint firstly
//kafkaConsumerWrap.setFetchOffsetByTimestamp(topicPartition, checkpoint);
log.info("RecordGenerator:" + message + ", checkpoint " + checkpoint);
return kafkaConsumerWrap;
}