protected void recoverRecords()

in pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java [193:237]


    protected void recoverRecords(Consumer<HistoryRecord> records) {
        setupClientIfNeeded();
        try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
            .topic(topicName)
            .startMessageId(MessageId.earliest)
            .create()
        ) {
            log.info("Scanning the database history topic '{}'", topicName);

            // Read all messages in the topic ...
            MessageId lastProcessedMessageId = null;

            // read the topic until the end
            while (historyReader.hasMessageAvailable()) {
                Message<String> msg = historyReader.readNext();
                try {
                    if (null == lastProcessedMessageId || lastProcessedMessageId.compareTo(msg.getMessageId()) < 0) {
                        if (!isBlank(msg.getValue())) {
                            HistoryRecord recordObj = new HistoryRecord(reader.read(msg.getValue()));
                            if (log.isTraceEnabled()) {
                                log.trace("Recovering database history: {}", recordObj);
                            }
                            if (recordObj == null || !recordObj.isValid()) {
                                log.warn("Skipping invalid database history record '{}'. " +
                                        "This is often not an issue, but if it happens repeatedly please check the '{}' topic.",
                                    recordObj, topicName);
                            } else {
                                records.accept(recordObj);
                                log.trace("Recovered database history: {}", recordObj);
                            }
                        }
                        lastProcessedMessageId = msg.getMessageId();
                    }
                } catch (IOException ioe) {
                    log.error("Error while deserializing history record '{}'", msg.getValue(), ioe);
                } catch (final Exception e) {
                    throw e;
                }
            }
            log.info("Successfully completed scanning the database history topic '{}'", topicName);
        } catch (IOException ioe) {
            log.error("Encountered issues on recovering history records", ioe);
            throw new RuntimeException("Encountered issues on recovering history records", ioe);
        }
    }