private ConsumerWrap getConsumerWrap()

in src/main/java/com/aliyun/dts/subscribe/clients/recordfetcher/KafkaRecordFetcher.java [157:203]


    private ConsumerWrap getConsumerWrap(String message) {
        ConsumerWrap kafkaConsumerWrap = getConsumerWrap();
        Checkpoint checkpoint = null;
        metaStoreCenter.registerStore(KAFKA_STORE_NAME, new KafkaMetaStore(kafkaConsumerWrap.getRawConsumer()));

        if (useCheckpointConfig.compareAndSet(true, false)) {
            log.info("RecordGenerator: force use initial checkpoint [{}] to start", checkpoint);
            checkpoint = initialCheckpoint;
        } else {
            checkpoint = getCheckpoint();
            if (null == checkpoint || Checkpoint.INVALID_STREAM_CHECKPOINT == checkpoint) {
                checkpoint = initialCheckpoint;
                log.info("RecordGenerator: use initial checkpoint [{}] to start", checkpoint);
            } else {
                log.info("RecordGenerator: load checkpoint from checkpoint store success, current checkpoint [{}]", checkpoint);
            }
        }

        switch (subscribeMode) {
            case SUBSCRIBE: {
                kafkaConsumerWrap.subscribeTopic(topicPartition, () -> {
                    Checkpoint ret = getSubscribeCheckpoint();

                    if (null == ret || Checkpoint.INVALID_STREAM_CHECKPOINT == ret) {
                        log.info("Subscribe checkpoint is null, use initialCheckpoint: " + initialCheckpoint);

                        ret = initialCheckpoint;
                    }
                    return ret;
                }, isCheckpointNotExistThrowException);
                break;
            }
            case ASSIGN:{
                kafkaConsumerWrap.assignTopic(topicPartition, checkpoint, isCheckpointNotExistThrowException);
                break;
            }
            default: {
                throw new RuntimeException("RecordGenerator: unknown mode not support");
            }
        }

        //seek checkpoint firstly
        //kafkaConsumerWrap.setFetchOffsetByTimestamp(topicPartition, checkpoint);

        log.info("RecordGenerator:" + message + ", checkpoint " + checkpoint);
        return kafkaConsumerWrap;
    }