in src/main/java/com/aliyun/dts/subscribe/clients/recordgenerator/UserRecordGenerator.java [46:73]
public UserRecordGenerator(ConsumerContext consumerContext, LinkedBlockingQueue<ConsumerRecord> toProcessRecord, LinkedBlockingQueue<DefaultUserRecord> processedRecord,
OffsetCommitCallBack offsetCommitCallBack) {
this.consumerContext = consumerContext;
this.toProcessRecord = toProcessRecord;
this.fastDeserializer = new AvroDeserializer();
this.processedRecord = processedRecord;
this.offsetCommitCallBack = offsetCommitCallBack;
commitCheckpoint = new Checkpoint(null, -1, -1, "-1");
metrics = consumerContext.getDtsMetrics().getCoreMetrics();
metrics.addMetric(
metrics.metricName("DStoreRecordQueue", "UserRecordGenerator"),
(config, now) -> (toProcessRecord.size()));
metrics.addMetric(
metrics.metricName("DefaultUserRecordQueue", "UserRecordGenerator"),
(config, now) -> (processedRecord.size()));
this.recordStoreOutCountSensor = metrics.sensor("record-store-out-row");
this.recordStoreOutCountSensor.add(metrics.metricName("outCounts", "recordstore"), new Total());
this.recordStoreOutCountSensor.add(metrics.metricName("outRps", "recordstore"), new SimpleRate());
this.recordStoreOutByteSensor = metrics.sensor("record-store-out-byte");
this.recordStoreOutByteSensor.add(metrics.metricName("outBytes", "recordstore"), new Total());
this.recordStoreOutByteSensor.add(metrics.metricName("outBps", "recordstore"), new SimpleRate());
}