in pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java [104:138]
protected void process() {
while (running) {
try {
MDC.put(DESTINATION, canalSourceConfig.getDestination());
connector.connect();
log.info("start canal process");
connector.subscribe();
while (running) {
Message message = connector.getWithoutAck(canalSourceConfig.getBatchSize());
message.setRaw(false);
List<FlatMessage> flatMessages = MessageUtils.messageConverter(message);
long batchId = getMessageId(message);
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
if (flatMessages != null) {
CanalRecord<V> canalRecord = new CanalRecord<>(connector);
canalRecord.setId(batchId);
canalRecord.setRecord(extractValue(flatMessages));
consume(canalRecord);
}
}
}
} catch (Exception e) {
log.error("process error!", e);
} finally {
connector.disconnect();
MDC.remove(DESTINATION);
}
}
}