in pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java [193:237]
protected void recoverRecords(Consumer<HistoryRecord> records) {
setupClientIfNeeded();
try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
.topic(topicName)
.startMessageId(MessageId.earliest)
.create()
) {
log.info("Scanning the database history topic '{}'", topicName);
// Read all messages in the topic ...
MessageId lastProcessedMessageId = null;
// read the topic until the end
while (historyReader.hasMessageAvailable()) {
Message<String> msg = historyReader.readNext();
try {
if (null == lastProcessedMessageId || lastProcessedMessageId.compareTo(msg.getMessageId()) < 0) {
if (!isBlank(msg.getValue())) {
HistoryRecord recordObj = new HistoryRecord(reader.read(msg.getValue()));
if (log.isTraceEnabled()) {
log.trace("Recovering database history: {}", recordObj);
}
if (recordObj == null || !recordObj.isValid()) {
log.warn("Skipping invalid database history record '{}'. " +
"This is often not an issue, but if it happens repeatedly please check the '{}' topic.",
recordObj, topicName);
} else {
records.accept(recordObj);
log.trace("Recovered database history: {}", recordObj);
}
}
lastProcessedMessageId = msg.getMessageId();
}
} catch (IOException ioe) {
log.error("Error while deserializing history record '{}'", msg.getValue(), ioe);
} catch (final Exception e) {
throw e;
}
}
log.info("Successfully completed scanning the database history topic '{}'", topicName);
} catch (IOException ioe) {
log.error("Encountered issues on recovering history records", ioe);
throw new RuntimeException("Encountered issues on recovering history records", ioe);
}
}