in src/main/java/com/aliyun/dts/subscribe/clients/recordfetcher/KafkaRecordFetcher.java [98:136]
public void run() {
int haveTryTime = 0;
String message = "first start";
ConsumerWrap kafkaConsumerWrap = null;
while (!consumerContext.isExited()) {
try {
kafkaConsumerWrap = getConsumerWrap(message);
while (!consumerContext.isExited()) {
// kafka consumer is not threadsafe, so if you want commit checkpoint to kafka, commit it in same thread
mayCommitCheckpoint();
ConsumerRecords<byte[], byte[]> records = kafkaConsumerWrap.poll();
for (ConsumerRecord<byte[], byte[]> record : records) {
int offerTryCount = 0;
if (record.value() == null || record.value().length <= 2) {
// dStore may generate special mock record to push up consumer offset for next fetchRequest if all data is filtered
continue;
}
while (!offerRecord(1000, TimeUnit.MILLISECONDS, record) && !consumerContext.isExited()) {
if (++offerTryCount % 10 == 0) {
log.info("KafkaRecordFetcher: offer kafka record has failed for a period (10s) [ " + record + "]");
}
}
}
}
} catch (Throwable e) {
if (isErrorRecoverable(e) && haveTryTime++ < tryTime) {
log.warn("KafkaRecordFetcher: error meet cause " + e.getMessage() + ", recover time [" + haveTryTime + "]", e);
sleepMS(tryBackTimeMS);
message = "reconnect";
} else {
log.error("KafkaRecordFetcher: unrecoverable error " + e.getMessage() + ", have try time [" + haveTryTime + "]", e);
consumerContext.exit();
}
} finally {
swallowErrorClose(kafkaConsumerWrap);
}
}
}