public void run()

in src/main/java/com/aliyun/dts/subscribe/clients/recordfetcher/KafkaRecordFetcher.java [98:136]


    public void run() {

        int haveTryTime = 0;
        String message = "first start";
        ConsumerWrap kafkaConsumerWrap = null;
        while (!consumerContext.isExited()) {
            try {
                kafkaConsumerWrap = getConsumerWrap(message);
                while (!consumerContext.isExited()) {
                    // kafka consumer is not threadsafe, so if you want commit checkpoint to kafka, commit it in same thread
                    mayCommitCheckpoint();
                    ConsumerRecords<byte[], byte[]> records = kafkaConsumerWrap.poll();
                    for (ConsumerRecord<byte[], byte[]> record : records) {
                        int offerTryCount = 0;
                        if (record.value() == null || record.value().length <= 2) {
                            // dStore may generate special mock record to push up consumer offset for next fetchRequest if all data is filtered
                            continue;
                        }
                        while (!offerRecord(1000, TimeUnit.MILLISECONDS, record) && !consumerContext.isExited()) {
                            if (++offerTryCount % 10 == 0) {
                                log.info("KafkaRecordFetcher: offer kafka record has failed for a period (10s) [ " + record + "]");
                            }
                        }
                    }
                }
            } catch (Throwable e) {
                if (isErrorRecoverable(e) && haveTryTime++ < tryTime) {
                    log.warn("KafkaRecordFetcher: error meet cause " + e.getMessage() + ", recover time [" + haveTryTime + "]", e);
                    sleepMS(tryBackTimeMS);
                    message = "reconnect";
                } else {
                    log.error("KafkaRecordFetcher: unrecoverable error  " + e.getMessage() + ", have try time [" + haveTryTime + "]", e);
                    consumerContext.exit();
                }
            } finally {
                swallowErrorClose(kafkaConsumerWrap);
            }
        }
    }