in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java [242:379]
private void processMessages(
ChangeEventSourceContext context,
PostgresPartition partition,
PostgresOffsetContext offsetContext,
final ReplicationStream stream)
throws SQLException, InterruptedException {
LOGGER.info("Processing messages");
int noMessageIterations = 0;
while (context.isRunning()
&& (offsetContext.getStreamingStoppingLsn() == null
|| (lastCompletelyProcessedLsn.compareTo(
offsetContext.getStreamingStoppingLsn())
< 0))) {
boolean receivedMessage =
stream.readPending(
message -> {
final Lsn lsn = stream.lastReceivedLsn();
if (message.isLastEventForLsn()) {
lastCompletelyProcessedLsn = lsn;
}
// Tx BEGIN/END event
if (message.isTransactionalMessage()) {
if (!connectorConfig.shouldProvideTransactionMetadata()) {
LOGGER.trace("Received transactional message {}", message);
// Don't skip on BEGIN message as it would flush LSN for the
// whole transaction
// too early
if (message.getOperation() == Operation.COMMIT) {
commitMessage(partition, offsetContext, lsn);
}
return;
}
offsetContext.updateWalPosition(
lsn,
lastCompletelyProcessedLsn,
message.getCommitTime(),
toLong(message.getTransactionId()),
taskContext.getSlotXmin(connection),
null);
if (message.getOperation() == Operation.BEGIN) {
dispatcher.dispatchTransactionStartedEvent(
partition,
toString(message.getTransactionId()),
offsetContext);
} else if (message.getOperation() == Operation.COMMIT) {
commitMessage(partition, offsetContext, lsn);
dispatcher.dispatchTransactionCommittedEvent(
partition, offsetContext);
}
maybeWarnAboutGrowingWalBacklog(true);
} else if (message.getOperation() == Operation.MESSAGE) {
offsetContext.updateWalPosition(
lsn,
lastCompletelyProcessedLsn,
message.getCommitTime(),
toLong(message.getTransactionId()),
taskContext.getSlotXmin(connection));
// non-transactional message that will not be followed by a
// COMMIT message
if (message.isLastEventForLsn()) {
commitMessage(partition, offsetContext, lsn);
}
dispatcher.dispatchLogicalDecodingMessage(
partition,
offsetContext,
clock.currentTimeAsInstant().toEpochMilli(),
(LogicalDecodingMessage) message);
maybeWarnAboutGrowingWalBacklog(true);
}
// DML event
else {
TableId tableId = null;
if (message.getOperation() != Operation.NOOP) {
tableId = PostgresSchema.parse(message.getTable());
Objects.requireNonNull(tableId);
}
offsetContext.updateWalPosition(
lsn,
lastCompletelyProcessedLsn,
message.getCommitTime(),
toLong(message.getTransactionId()),
taskContext.getSlotXmin(connection),
tableId);
boolean dispatched =
message.getOperation() != Operation.NOOP
&& dispatcher.dispatchDataChangeEvent(
partition,
tableId,
new PostgresChangeRecordEmitter(
partition,
offsetContext,
clock,
connectorConfig,
schema,
connection,
tableId,
message));
maybeWarnAboutGrowingWalBacklog(dispatched);
}
});
probeConnectionIfNeeded();
if (receivedMessage) {
noMessageIterations = 0;
lsnFlushingAllowed = true;
} else {
if (offsetContext.hasCompletelyProcessedPosition()) {
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
noMessageIterations++;
if (noMessageIterations >= THROTTLE_NO_MESSAGE_BEFORE_PAUSE) {
noMessageIterations = 0;
pauseNoMessage.sleepWhen(true);
}
}
if (!isInPreSnapshotCatchUpStreaming(offsetContext)) {
// During catch up streaming, the streaming phase needs to hold a transaction open
// so that
// the phase can stream event up to a specific lsn and the snapshot that occurs
// after the catch up
// streaming will not lose the current view of data. Since we need to hold the
// transaction open
// for the snapshot, this block must not commit during catch up streaming.
connection.commit();
}
}
}