public void execute()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java [125:240]


    public void execute(
            ChangeEventSourceContext context,
            PostgresPartition partition,
            PostgresOffsetContext offsetContext)
            throws InterruptedException {
        if (!snapshotter.shouldStream()) {
            LOGGER.info("Streaming is not enabled in correct configuration");
            return;
        }

        lsnFlushingAllowed = false;

        // replication slot could exist at the time of starting Debezium so we will stream from the
        // position in the slot
        // instead of the last position in the database
        boolean hasStartLsnStoredInContext = offsetContext != null;

        if (!hasStartLsnStoredInContext) {
            offsetContext =
                    PostgresOffsetContext.initialContext(connectorConfig, connection, clock);
        }

        try {
            final WalPositionLocator walPosition;

            ((PostgresReplicationConnection) replicationConnection)
                    .setEndingPos(offsetContext.getStreamingStoppingLsn());
            if (hasStartLsnStoredInContext) {
                // start streaming from the last recorded position in the offset
                final Lsn lsn =
                        offsetContext.lastCompletelyProcessedLsn() != null
                                ? offsetContext.lastCompletelyProcessedLsn()
                                : offsetContext.lsn();
                LOGGER.info("Retrieved latest position from stored offset '{}'", lsn);
                walPosition = new WalPositionLocator(offsetContext.lastCommitLsn(), lsn);
                replicationStream.compareAndSet(
                        null, replicationConnection.startStreaming(lsn, walPosition));
            } else {
                LOGGER.info(
                        "No previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN...");
                walPosition = new WalPositionLocator();
                replicationStream.compareAndSet(
                        null, replicationConnection.startStreaming(walPosition));
            }
            // for large dbs, the refresh of schema can take too much time
            // such that the connection times out. We must enable keep
            // alive to ensure that it doesn't time out
            ReplicationStream stream = this.replicationStream.get();
            stream.startKeepAlive(
                    Threads.newSingleThreadExecutor(
                            PostgresConnector.class,
                            connectorConfig.getLogicalName(),
                            KEEP_ALIVE_THREAD_NAME));

            init();

            // If we need to do a pre-snapshot streaming catch up, we should allow the snapshot
            // transaction to persist
            // but normally we want to start streaming without any open transactions.
            if (!isInPreSnapshotCatchUpStreaming(offsetContext)) {
                connection.commit();
            }

            this.lastCompletelyProcessedLsn = replicationStream.get().startLsn();

            if (walPosition.searchingEnabled()) {
                searchWalPosition(context, stream, walPosition);
                try {
                    if (!isInPreSnapshotCatchUpStreaming(offsetContext)) {
                        connection.commit();
                    }
                } catch (Exception e) {
                    LOGGER.info("Commit failed while preparing for reconnect", e);
                }
                walPosition.enableFiltering();
                stream.stopKeepAlive();
                replicationConnection.reconnect();
                replicationStream.set(
                        replicationConnection.startStreaming(
                                walPosition.getLastEventStoredLsn(), walPosition));
                stream = this.replicationStream.get();
                stream.startKeepAlive(
                        Threads.newSingleThreadExecutor(
                                PostgresConnector.class,
                                connectorConfig.getLogicalName(),
                                KEEP_ALIVE_THREAD_NAME));
            }
            processMessages(context, partition, offsetContext, stream);
        } catch (Throwable e) {
            errorHandler.setProducerThrowable(e);
        } finally {
            if (replicationConnection != null) {
                LOGGER.debug("stopping streaming...");
                // stop the keep alive thread, this also shuts down the
                // executor pool
                ReplicationStream stream = replicationStream.get();
                if (stream != null) {
                    stream.stopKeepAlive();
                }
                // TODO author=Horia Chiorean date=08/11/2016 description=Ideally we'd close the
                // stream, but it's not reliable atm (see javadoc)
                // replicationStream.close();
                // close the connection - this should also disconnect the current stream even if
                // it's blocking
                try {
                    if (!isInPreSnapshotCatchUpStreaming(offsetContext)) {
                        connection.commit();
                    }
                    replicationConnection.close();
                } catch (Exception e) {
                    LOGGER.debug("Exception while closing the connection", e);
                }
                replicationStream.set(null);
            }
        }
    }