in src/main/java/com/aliyun/dms/subscribe/clients/UserRecordGeneratorWithDBMapping.java [33:80]
public void run() {
while (!consumerContext.isExited()) {
ConsumerRecord<byte[], byte[]> toProcess = null;
Record record = null;
int fetchFailedCount = 0;
try {
while (null == (toProcess = toProcessRecord.peek()) && !consumerContext.isExited()) {
sleepMS(5);
fetchFailedCount++;
if (fetchFailedCount % 1000 == 0 && consumerContext.hasValidTopicPartitions()) {
log.info("UserRecordGenerator: haven't receive records from generator for 5s");
}
}
if (consumerContext.isExited()) {
return;
}
final ConsumerRecord<byte[], byte[]> consumerRecord = toProcess;
consumerRecord.timestamp();
record = fastDeserializer.deserialize(consumerRecord.value());
log.debug("UserRecordGenerator: meet [{}] record type", record.getOperation());
if (consumerContext.getDbMapper() != null && consumerContext.getDbMapper().isMapping()) {
record = consumerContext.getDbMapper().transform(record);
}
DefaultUserRecord defaultUserRecord = new DefaultUserRecord(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(),
record,
(tp, commitRecord, offset, metadata) -> {
recordStoreOutCountSensor.record(1);
recordStoreOutByteSensor.record(consumerRecord.value().length);
commitCheckpoint = new Checkpoint(tp, commitRecord.getSourceTimestamp(), offset, metadata);
commit();
});
int offerTryCount = 0;
while (!offerRecord(1000, TimeUnit.MILLISECONDS, defaultUserRecord) && !consumerContext.isExited()) {
if (++offerTryCount % 10 == 0) {
log.info("UserRecordGenerator: offer user record has failed for a period (10s) [ " + record + "]");
}
}
toProcessRecord.poll();
} catch (Exception e) {
log.error("UserRecordGenerator: process record failed, raw consumer record [" + toProcess + "], parsed record [" + record + "], cause " + e.getMessage(), e);
consumerContext.exit();
}
}
}