public void run()

in src/main/java/com/aliyun/dts/subscribe/clients/recordgenerator/UserRecordGenerator.java [76:120]


    public void run() {
        while (!consumerContext.isExited()) {
            ConsumerRecord<byte[], byte[]> toProcess = null;
            Record record = null;
            int fetchFailedCount = 0;
            try {
                while (null == (toProcess = toProcessRecord.peek()) && !consumerContext.isExited()) {
                    sleepMS(5);
                    fetchFailedCount++;
                    if (fetchFailedCount % 1000 == 0 && consumerContext.hasValidTopicPartitions()) {
                        log.info("UserRecordGenerator: haven't receive records from generator for  5s");
                    }
                }
                if (consumerContext.isExited()) {
                    return;
                }
                final ConsumerRecord<byte[], byte[]> consumerRecord = toProcess;
                consumerRecord.timestamp();
                record = fastDeserializer.deserialize(consumerRecord.value());
                log.debug("UserRecordGenerator: meet [{}] record type", record.getOperation());

                DefaultUserRecord defaultUserRecord = new DefaultUserRecord(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(),
                        record,
                        (tp, commitRecord, offset, metadata) -> {
                            recordStoreOutCountSensor.record(1);
                            recordStoreOutByteSensor.record(consumerRecord.value().length);
                            commitCheckpoint = new Checkpoint(tp, commitRecord.getSourceTimestamp(), offset, metadata);
                            commit();
                        });

                int offerTryCount = 0;

                while (!offerRecord(1000, TimeUnit.MILLISECONDS, defaultUserRecord) && !consumerContext.isExited()) {
                    if (++offerTryCount % 10 == 0) {
                        log.info("UserRecordGenerator: offer user record has failed for a period (10s) [ " + record + "]");
                    }
                }

                toProcessRecord.poll();
            } catch (Exception e) {
                log.error("UserRecordGenerator: process record failed, raw consumer record [" + toProcess + "], parsed record [" + record + "], cause " + e.getMessage(), e);
                consumerContext.exit();
            }
        }
    }