public UserRecordGenerator()

in src/main/java/com/aliyun/dts/subscribe/clients/recordgenerator/UserRecordGenerator.java [46:73]


    public UserRecordGenerator(ConsumerContext consumerContext, LinkedBlockingQueue<ConsumerRecord> toProcessRecord, LinkedBlockingQueue<DefaultUserRecord> processedRecord,
                               OffsetCommitCallBack offsetCommitCallBack) {
        this.consumerContext = consumerContext;
        this.toProcessRecord = toProcessRecord;
        this.fastDeserializer = new AvroDeserializer();
        this.processedRecord = processedRecord;

        this.offsetCommitCallBack = offsetCommitCallBack;

        commitCheckpoint = new Checkpoint(null, -1, -1, "-1");

        metrics = consumerContext.getDtsMetrics().getCoreMetrics();

        metrics.addMetric(
                metrics.metricName("DStoreRecordQueue", "UserRecordGenerator"),
                (config, now) -> (toProcessRecord.size()));

        metrics.addMetric(
                metrics.metricName("DefaultUserRecordQueue", "UserRecordGenerator"),
                (config, now) -> (processedRecord.size()));

        this.recordStoreOutCountSensor = metrics.sensor("record-store-out-row");
        this.recordStoreOutCountSensor.add(metrics.metricName("outCounts", "recordstore"), new Total());
        this.recordStoreOutCountSensor.add(metrics.metricName("outRps", "recordstore"), new SimpleRate());
        this.recordStoreOutByteSensor = metrics.sensor("record-store-out-byte");
        this.recordStoreOutByteSensor.add(metrics.metricName("outBytes", "recordstore"), new Total());
        this.recordStoreOutByteSensor.add(metrics.metricName("outBps", "recordstore"), new SimpleRate());
    }