private void processMessages()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java [242:379]


    private void processMessages(
            ChangeEventSourceContext context,
            PostgresPartition partition,
            PostgresOffsetContext offsetContext,
            final ReplicationStream stream)
            throws SQLException, InterruptedException {
        LOGGER.info("Processing messages");
        int noMessageIterations = 0;
        while (context.isRunning()
                && (offsetContext.getStreamingStoppingLsn() == null
                        || (lastCompletelyProcessedLsn.compareTo(
                                        offsetContext.getStreamingStoppingLsn())
                                < 0))) {

            boolean receivedMessage =
                    stream.readPending(
                            message -> {
                                final Lsn lsn = stream.lastReceivedLsn();

                                if (message.isLastEventForLsn()) {
                                    lastCompletelyProcessedLsn = lsn;
                                }

                                // Tx BEGIN/END event
                                if (message.isTransactionalMessage()) {
                                    if (!connectorConfig.shouldProvideTransactionMetadata()) {
                                        LOGGER.trace("Received transactional message {}", message);
                                        // Don't skip on BEGIN message as it would flush LSN for the
                                        // whole transaction
                                        // too early
                                        if (message.getOperation() == Operation.COMMIT) {
                                            commitMessage(partition, offsetContext, lsn);
                                        }
                                        return;
                                    }

                                    offsetContext.updateWalPosition(
                                            lsn,
                                            lastCompletelyProcessedLsn,
                                            message.getCommitTime(),
                                            toLong(message.getTransactionId()),
                                            taskContext.getSlotXmin(connection),
                                            null);
                                    if (message.getOperation() == Operation.BEGIN) {
                                        dispatcher.dispatchTransactionStartedEvent(
                                                partition,
                                                toString(message.getTransactionId()),
                                                offsetContext);
                                    } else if (message.getOperation() == Operation.COMMIT) {
                                        commitMessage(partition, offsetContext, lsn);
                                        dispatcher.dispatchTransactionCommittedEvent(
                                                partition, offsetContext);
                                    }
                                    maybeWarnAboutGrowingWalBacklog(true);
                                } else if (message.getOperation() == Operation.MESSAGE) {
                                    offsetContext.updateWalPosition(
                                            lsn,
                                            lastCompletelyProcessedLsn,
                                            message.getCommitTime(),
                                            toLong(message.getTransactionId()),
                                            taskContext.getSlotXmin(connection));

                                    // non-transactional message that will not be followed by a
                                    // COMMIT message
                                    if (message.isLastEventForLsn()) {
                                        commitMessage(partition, offsetContext, lsn);
                                    }

                                    dispatcher.dispatchLogicalDecodingMessage(
                                            partition,
                                            offsetContext,
                                            clock.currentTimeAsInstant().toEpochMilli(),
                                            (LogicalDecodingMessage) message);

                                    maybeWarnAboutGrowingWalBacklog(true);
                                }
                                // DML event
                                else {
                                    TableId tableId = null;
                                    if (message.getOperation() != Operation.NOOP) {
                                        tableId = PostgresSchema.parse(message.getTable());
                                        Objects.requireNonNull(tableId);
                                    }

                                    offsetContext.updateWalPosition(
                                            lsn,
                                            lastCompletelyProcessedLsn,
                                            message.getCommitTime(),
                                            toLong(message.getTransactionId()),
                                            taskContext.getSlotXmin(connection),
                                            tableId);

                                    boolean dispatched =
                                            message.getOperation() != Operation.NOOP
                                                    && dispatcher.dispatchDataChangeEvent(
                                                            partition,
                                                            tableId,
                                                            new PostgresChangeRecordEmitter(
                                                                    partition,
                                                                    offsetContext,
                                                                    clock,
                                                                    connectorConfig,
                                                                    schema,
                                                                    connection,
                                                                    tableId,
                                                                    message));

                                    maybeWarnAboutGrowingWalBacklog(dispatched);
                                }
                            });

            probeConnectionIfNeeded();

            if (receivedMessage) {
                noMessageIterations = 0;
                lsnFlushingAllowed = true;
            } else {
                if (offsetContext.hasCompletelyProcessedPosition()) {
                    dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
                }
                noMessageIterations++;
                if (noMessageIterations >= THROTTLE_NO_MESSAGE_BEFORE_PAUSE) {
                    noMessageIterations = 0;
                    pauseNoMessage.sleepWhen(true);
                }
            }
            if (!isInPreSnapshotCatchUpStreaming(offsetContext)) {
                // During catch up streaming, the streaming phase needs to hold a transaction open
                // so that
                // the phase can stream event up to a specific lsn and the snapshot that occurs
                // after the catch up
                // streaming will not lose the current view of data. Since we need to hold the
                // transaction open
                // for the snapshot, this block must not commit during catch up streaming.
                connection.commit();
            }
        }
    }