in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java [125:240]
public void execute(
ChangeEventSourceContext context,
PostgresPartition partition,
PostgresOffsetContext offsetContext)
throws InterruptedException {
if (!snapshotter.shouldStream()) {
LOGGER.info("Streaming is not enabled in correct configuration");
return;
}
lsnFlushingAllowed = false;
// replication slot could exist at the time of starting Debezium so we will stream from the
// position in the slot
// instead of the last position in the database
boolean hasStartLsnStoredInContext = offsetContext != null;
if (!hasStartLsnStoredInContext) {
offsetContext =
PostgresOffsetContext.initialContext(connectorConfig, connection, clock);
}
try {
final WalPositionLocator walPosition;
((PostgresReplicationConnection) replicationConnection)
.setEndingPos(offsetContext.getStreamingStoppingLsn());
if (hasStartLsnStoredInContext) {
// start streaming from the last recorded position in the offset
final Lsn lsn =
offsetContext.lastCompletelyProcessedLsn() != null
? offsetContext.lastCompletelyProcessedLsn()
: offsetContext.lsn();
LOGGER.info("Retrieved latest position from stored offset '{}'", lsn);
walPosition = new WalPositionLocator(offsetContext.lastCommitLsn(), lsn);
replicationStream.compareAndSet(
null, replicationConnection.startStreaming(lsn, walPosition));
} else {
LOGGER.info(
"No previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN...");
walPosition = new WalPositionLocator();
replicationStream.compareAndSet(
null, replicationConnection.startStreaming(walPosition));
}
// for large dbs, the refresh of schema can take too much time
// such that the connection times out. We must enable keep
// alive to ensure that it doesn't time out
ReplicationStream stream = this.replicationStream.get();
stream.startKeepAlive(
Threads.newSingleThreadExecutor(
PostgresConnector.class,
connectorConfig.getLogicalName(),
KEEP_ALIVE_THREAD_NAME));
init();
// If we need to do a pre-snapshot streaming catch up, we should allow the snapshot
// transaction to persist
// but normally we want to start streaming without any open transactions.
if (!isInPreSnapshotCatchUpStreaming(offsetContext)) {
connection.commit();
}
this.lastCompletelyProcessedLsn = replicationStream.get().startLsn();
if (walPosition.searchingEnabled()) {
searchWalPosition(context, stream, walPosition);
try {
if (!isInPreSnapshotCatchUpStreaming(offsetContext)) {
connection.commit();
}
} catch (Exception e) {
LOGGER.info("Commit failed while preparing for reconnect", e);
}
walPosition.enableFiltering();
stream.stopKeepAlive();
replicationConnection.reconnect();
replicationStream.set(
replicationConnection.startStreaming(
walPosition.getLastEventStoredLsn(), walPosition));
stream = this.replicationStream.get();
stream.startKeepAlive(
Threads.newSingleThreadExecutor(
PostgresConnector.class,
connectorConfig.getLogicalName(),
KEEP_ALIVE_THREAD_NAME));
}
processMessages(context, partition, offsetContext, stream);
} catch (Throwable e) {
errorHandler.setProducerThrowable(e);
} finally {
if (replicationConnection != null) {
LOGGER.debug("stopping streaming...");
// stop the keep alive thread, this also shuts down the
// executor pool
ReplicationStream stream = replicationStream.get();
if (stream != null) {
stream.stopKeepAlive();
}
// TODO author=Horia Chiorean date=08/11/2016 description=Ideally we'd close the
// stream, but it's not reliable atm (see javadoc)
// replicationStream.close();
// close the connection - this should also disconnect the current stream even if
// it's blocking
try {
if (!isInPreSnapshotCatchUpStreaming(offsetContext)) {
connection.commit();
}
replicationConnection.close();
} catch (Exception e) {
LOGGER.debug("Exception while closing the connection", e);
}
replicationStream.set(null);
}
}
}