public KafkaRecordFetcher()

in src/main/java/com/aliyun/dts/subscribe/clients/recordfetcher/KafkaRecordFetcher.java [61:95]


    public KafkaRecordFetcher(ConsumerContext consumerContext, LinkedBlockingQueue<ConsumerRecord> toProcessRecord) {
        this.consumerContext = consumerContext;
        this.toProcessRecord = toProcessRecord;

        this.useCheckpointConfig = new AtomicBoolean(consumerContext.isForceUseCheckpoint());
        this.initialCheckpoint = consumerContext.getInitialCheckpoint();
        this.subscribeMode = consumerContext.getSubscribeMode();

        this.topicPartition = new TopicPartition(consumerContext.getTopic(), 0);
        this.groupID = consumerContext.getGroupID();

        this.tryTime = 150;
        this.tryBackTimeMS = 10000;

        //existed = false;
        if (consumerContext.isUseLocalCheckpointStore()) {
            metaStoreCenter.registerStore(composeLocalFileStoreName(LOCAL_FILE_STORE_NAME, groupID), new LocalFileMetaStore(composeLocalFileStoreName(LOCAL_FILE_STORE_NAME, groupID)));
        }

        if (consumerContext.getUserRegisteredStore() != null) {
            metaStoreCenter.registerStore(USER_STORE_NAME, consumerContext.getUserRegisteredStore());
        }

        isCheckpointNotExistThrowException = consumerContext.isCheckpointNotExistThrowException();

        log.info("RecordGenerator: try time [" + tryTime + "], try backTimeMS [" + tryBackTimeMS + "], isCheckpointNotExistThrowException [" + isCheckpointNotExistThrowException +  "]");

        Metrics metrics = consumerContext.getDtsMetrics().getCoreMetrics();
        this.recordStoreInCountSensor = metrics.sensor("record-store-in-row");
        this.recordStoreInCountSensor.add(metrics.metricName("inCounts", "recordstore"), new Total());
        this.recordStoreInCountSensor.add(metrics.metricName("inRps", "recordstore"), new SimpleRate());
        this.recordStoreInByteSensor = metrics.sensor("record-store-in-byte");
        this.recordStoreInByteSensor.add(metrics.metricName("inBytes", "recordstore"), new Total());
        this.recordStoreInByteSensor.add(metrics.metricName("inBps", "recordstore"), new SimpleRate());
    }