public void run()

in src/main/java/com/aliyun/dts/subscribe/clients/recordprocessor/EtlRecordProcessor.java [42:70]


    public void run() {
        while (!consumerContext.isExited()) {
            DefaultUserRecord toProcess = null;
            int fetchFailedCount = 0;
            try {
                while (null == (toProcess = toProcessRecord.peek()) && !consumerContext.isExited()) {
                    sleepMS(5);
                    fetchFailedCount++;
                    if (fetchFailedCount % 1000 == 0 && consumerContext.hasValidTopicPartitions()) {
                        log.info("EtlRecordProcessor: haven't receive records from generator for  5s");
                    }
                }
                if (consumerContext.isExited()) {
                    return;
                }
                fetchFailedCount = 0;
                final DefaultUserRecord consumerRecord = toProcess;

                for (RecordListener recordListener : recordListeners.values()) {
                    recordListener.consume(consumerRecord);
                }

                toProcessRecord.poll();
            } catch (Exception e) {
                log.error("EtlRecordProcessor: process record failed, raw consumer record [" + toProcess + "],  cause " + e.getMessage(), e);
                consumerContext.exit();
            }
        }
    }