in src/main/java/com/aliyun/dts/subscribe/clients/recordprocessor/EtlRecordProcessor.java [42:70]
public void run() {
while (!consumerContext.isExited()) {
DefaultUserRecord toProcess = null;
int fetchFailedCount = 0;
try {
while (null == (toProcess = toProcessRecord.peek()) && !consumerContext.isExited()) {
sleepMS(5);
fetchFailedCount++;
if (fetchFailedCount % 1000 == 0 && consumerContext.hasValidTopicPartitions()) {
log.info("EtlRecordProcessor: haven't receive records from generator for 5s");
}
}
if (consumerContext.isExited()) {
return;
}
fetchFailedCount = 0;
final DefaultUserRecord consumerRecord = toProcess;
for (RecordListener recordListener : recordListeners.values()) {
recordListener.consume(consumerRecord);
}
toProcessRecord.poll();
} catch (Exception e) {
log.error("EtlRecordProcessor: process record failed, raw consumer record [" + toProcess + "], cause " + e.getMessage(), e);
consumerContext.exit();
}
}
}