private ReplicationStream createReplicationStream()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java [491:709]


    private ReplicationStream createReplicationStream(
            final Lsn startLsn, WalPositionLocator walPosition)
            throws SQLException, InterruptedException {
        PGReplicationStream s;

        try {
            try {
                s =
                        startPgReplicationStream(
                                startLsn,
                                plugin.forceRds()
                                        ? messageDecoder::optionsWithoutMetadata
                                        : messageDecoder::optionsWithMetadata);
                messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true);
            } catch (PSQLException e) {
                LOGGER.debug(
                        "Could not register for streaming, retrying without optional options", e);

                // re-init the slot after a failed start of slot, as this
                // may have closed the slot
                if (useTemporarySlot()) {
                    initReplicationSlot();
                }

                s =
                        startPgReplicationStream(
                                startLsn,
                                plugin.forceRds()
                                        ? messageDecoder::optionsWithoutMetadata
                                        : messageDecoder::optionsWithMetadata);
                messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true);
            }
        } catch (PSQLException e) {
            if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
                // It is possible we are connecting to an old wal2json plug-in
                LOGGER.warn(
                        "Could not register for streaming with metadata in messages, falling back to messages without metadata");

                // re-init the slot after a failed start of slot, as this
                // may have closed the slot
                if (useTemporarySlot()) {
                    initReplicationSlot();
                }

                s = startPgReplicationStream(startLsn, messageDecoder::optionsWithoutMetadata);
                messageDecoder.setContainsMetadata(false);
            } else if (e.getMessage()
                    .matches("(?s)ERROR: requested WAL segment .* has already been removed.*")) {
                LOGGER.error("Cannot rewind to last processed WAL position", e);
                throw new ConnectException(
                        "The offset to start reading from has been removed from the database write-ahead log. Create a new snapshot and consider setting of PostgreSQL parameter wal_keep_segments = 0.");
            } else {
                throw e;
            }
        }

        final PGReplicationStream stream = s;

        return new ReplicationStream() {

            private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
            private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
            private ExecutorService keepAliveExecutor = null;
            private AtomicBoolean keepAliveRunning;
            private final Metronome metronome =
                    Metronome.sleeper(statusUpdateInterval, Clock.SYSTEM);

            // make sure this is volatile since multiple threads may be interested in this value
            private volatile Lsn lastReceivedLsn;

            @Override
            public void read(ReplicationMessageProcessor processor)
                    throws SQLException, InterruptedException {
                processWarnings(false);
                ByteBuffer read = stream.read();
                final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
                LOGGER.trace(
                        "Streaming requested from LSN {}, received LSN {}",
                        startLsn,
                        lastReceiveLsn);
                if (reachEnd(lastReceivedLsn)) {
                    lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
                    LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
                    processor.process(new ReplicationMessage.NoopMessage(null, null));
                    return;
                }
                if (messageDecoder.shouldMessageBeSkipped(
                        read, lastReceiveLsn, startLsn, walPosition)) {
                    return;
                }
                deserializeMessages(read, processor);
            }

            @Override
            public boolean readPending(ReplicationMessageProcessor processor)
                    throws SQLException, InterruptedException {
                processWarnings(false);
                ByteBuffer read = stream.readPending();
                final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
                LOGGER.trace(
                        "Streaming requested from LSN {}, received LSN {}",
                        startLsn,
                        lastReceiveLsn);

                if (reachEnd(lastReceiveLsn)) {
                    lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
                    LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
                    processor.process(new ReplicationMessage.NoopMessage(null, null));
                    return true;
                }

                if (read == null) {
                    return false;
                }

                if (messageDecoder.shouldMessageBeSkipped(
                        read, lastReceiveLsn, startLsn, walPosition)) {
                    return true;
                }

                deserializeMessages(read, processor);

                return true;
            }

            private void deserializeMessages(
                    ByteBuffer buffer, ReplicationMessageProcessor processor)
                    throws SQLException, InterruptedException {
                lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
                LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
                messageDecoder.processMessage(buffer, processor, typeRegistry);
            }

            @Override
            public void close() throws SQLException {
                processWarnings(true);
                stream.close();
            }

            @Override
            public void flushLsn(Lsn lsn) throws SQLException {
                doFlushLsn(lsn);
            }

            private void doFlushLsn(Lsn lsn) throws SQLException {
                stream.setFlushedLSN(lsn.asLogSequenceNumber());
                stream.setAppliedLSN(lsn.asLogSequenceNumber());

                stream.forceUpdateStatus();
            }

            @Override
            public Lsn lastReceivedLsn() {
                return lastReceivedLsn;
            }

            @Override
            public void startKeepAlive(ExecutorService service) {
                if (keepAliveExecutor == null) {
                    keepAliveExecutor = service;
                    keepAliveRunning = new AtomicBoolean(true);
                    keepAliveExecutor.submit(
                            () -> {
                                while (keepAliveRunning.get()) {
                                    try {
                                        LOGGER.trace(
                                                "Forcing status update with replication stream");
                                        stream.forceUpdateStatus();
                                        metronome.pause();
                                    } catch (Exception exp) {
                                        throw new RuntimeException(
                                                "received unexpected exception will perform keep alive",
                                                exp);
                                    }
                                }
                            });
                }
            }

            @Override
            public void stopKeepAlive() {
                if (keepAliveExecutor != null) {
                    keepAliveRunning.set(false);
                    keepAliveExecutor.shutdownNow();
                    keepAliveExecutor = null;
                }
            }

            private void processWarnings(final boolean forced) throws SQLException {
                if (--warningCheckCounter == 0 || forced) {
                    warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
                    for (SQLWarning w = connection().getWarnings();
                            w != null;
                            w = w.getNextWarning()) {
                        LOGGER.debug(
                                "Server-side message: '{}', state = {}, code = {}",
                                w.getMessage(),
                                w.getSQLState(),
                                w.getErrorCode());
                    }
                    connection().clearWarnings();
                }
            }

            @Override
            public Lsn startLsn() {
                return startLsn;
            }

            private boolean reachEnd(Lsn receivedLsn) {
                if (receivedLsn == null) {
                    return false;
                }
                return endingPos != null
                        && (!endingPos.isNonStopping())
                        && endingPos.compareTo(receivedLsn) < 0;
            }
        };
    }