src/main/java/com/aliyun/dms/subscribe/clients/UserRecordGeneratorWithDBMapping.java [32:52]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    @Override
    public void run() {
        while (!consumerContext.isExited()) {
            ConsumerRecord<byte[], byte[]> toProcess = null;
            Record record = null;
            int fetchFailedCount = 0;
            try {
                while (null == (toProcess = toProcessRecord.peek()) && !consumerContext.isExited()) {
                    sleepMS(5);
                    fetchFailedCount++;
                    if (fetchFailedCount % 1000 == 0 && consumerContext.hasValidTopicPartitions()) {
                        log.info("UserRecordGenerator: haven't receive records from generator for  5s");
                    }
                }
                if (consumerContext.isExited()) {
                    return;
                }
                final ConsumerRecord<byte[], byte[]> consumerRecord = toProcess;
                consumerRecord.timestamp();
                record = fastDeserializer.deserialize(consumerRecord.value());
                log.debug("UserRecordGenerator: meet [{}] record type", record.getOperation());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/main/java/com/aliyun/dts/subscribe/clients/recordgenerator/UserRecordGenerator.java [75:95]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    @Override
    public void run() {
        while (!consumerContext.isExited()) {
            ConsumerRecord<byte[], byte[]> toProcess = null;
            Record record = null;
            int fetchFailedCount = 0;
            try {
                while (null == (toProcess = toProcessRecord.peek()) && !consumerContext.isExited()) {
                    sleepMS(5);
                    fetchFailedCount++;
                    if (fetchFailedCount % 1000 == 0 && consumerContext.hasValidTopicPartitions()) {
                        log.info("UserRecordGenerator: haven't receive records from generator for  5s");
                    }
                }
                if (consumerContext.isExited()) {
                    return;
                }
                final ConsumerRecord<byte[], byte[]> consumerRecord = toProcess;
                consumerRecord.timestamp();
                record = fastDeserializer.deserialize(consumerRecord.value());
                log.debug("UserRecordGenerator: meet [{}] record type", record.getOperation());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



