in src/main/java/com/aliyun/dts/subscribe/clients/recordfetcher/KafkaRecordFetcher.java [61:95]
public KafkaRecordFetcher(ConsumerContext consumerContext, LinkedBlockingQueue<ConsumerRecord> toProcessRecord) {
this.consumerContext = consumerContext;
this.toProcessRecord = toProcessRecord;
this.useCheckpointConfig = new AtomicBoolean(consumerContext.isForceUseCheckpoint());
this.initialCheckpoint = consumerContext.getInitialCheckpoint();
this.subscribeMode = consumerContext.getSubscribeMode();
this.topicPartition = new TopicPartition(consumerContext.getTopic(), 0);
this.groupID = consumerContext.getGroupID();
this.tryTime = 150;
this.tryBackTimeMS = 10000;
//existed = false;
if (consumerContext.isUseLocalCheckpointStore()) {
metaStoreCenter.registerStore(composeLocalFileStoreName(LOCAL_FILE_STORE_NAME, groupID), new LocalFileMetaStore(composeLocalFileStoreName(LOCAL_FILE_STORE_NAME, groupID)));
}
if (consumerContext.getUserRegisteredStore() != null) {
metaStoreCenter.registerStore(USER_STORE_NAME, consumerContext.getUserRegisteredStore());
}
isCheckpointNotExistThrowException = consumerContext.isCheckpointNotExistThrowException();
log.info("RecordGenerator: try time [" + tryTime + "], try backTimeMS [" + tryBackTimeMS + "], isCheckpointNotExistThrowException [" + isCheckpointNotExistThrowException + "]");
Metrics metrics = consumerContext.getDtsMetrics().getCoreMetrics();
this.recordStoreInCountSensor = metrics.sensor("record-store-in-row");
this.recordStoreInCountSensor.add(metrics.metricName("inCounts", "recordstore"), new Total());
this.recordStoreInCountSensor.add(metrics.metricName("inRps", "recordstore"), new SimpleRate());
this.recordStoreInByteSensor = metrics.sensor("record-store-in-byte");
this.recordStoreInByteSensor.add(metrics.metricName("inBytes", "recordstore"), new Total());
this.recordStoreInByteSensor.add(metrics.metricName("inBps", "recordstore"), new SimpleRate());
}