in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java [491:709]
private ReplicationStream createReplicationStream(
final Lsn startLsn, WalPositionLocator walPosition)
throws SQLException, InterruptedException {
PGReplicationStream s;
try {
try {
s =
startPgReplicationStream(
startLsn,
plugin.forceRds()
? messageDecoder::optionsWithoutMetadata
: messageDecoder::optionsWithMetadata);
messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true);
} catch (PSQLException e) {
LOGGER.debug(
"Could not register for streaming, retrying without optional options", e);
// re-init the slot after a failed start of slot, as this
// may have closed the slot
if (useTemporarySlot()) {
initReplicationSlot();
}
s =
startPgReplicationStream(
startLsn,
plugin.forceRds()
? messageDecoder::optionsWithoutMetadata
: messageDecoder::optionsWithMetadata);
messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true);
}
} catch (PSQLException e) {
if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
// It is possible we are connecting to an old wal2json plug-in
LOGGER.warn(
"Could not register for streaming with metadata in messages, falling back to messages without metadata");
// re-init the slot after a failed start of slot, as this
// may have closed the slot
if (useTemporarySlot()) {
initReplicationSlot();
}
s = startPgReplicationStream(startLsn, messageDecoder::optionsWithoutMetadata);
messageDecoder.setContainsMetadata(false);
} else if (e.getMessage()
.matches("(?s)ERROR: requested WAL segment .* has already been removed.*")) {
LOGGER.error("Cannot rewind to last processed WAL position", e);
throw new ConnectException(
"The offset to start reading from has been removed from the database write-ahead log. Create a new snapshot and consider setting of PostgreSQL parameter wal_keep_segments = 0.");
} else {
throw e;
}
}
final PGReplicationStream stream = s;
return new ReplicationStream() {
private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
private ExecutorService keepAliveExecutor = null;
private AtomicBoolean keepAliveRunning;
private final Metronome metronome =
Metronome.sleeper(statusUpdateInterval, Clock.SYSTEM);
// make sure this is volatile since multiple threads may be interested in this value
private volatile Lsn lastReceivedLsn;
@Override
public void read(ReplicationMessageProcessor processor)
throws SQLException, InterruptedException {
processWarnings(false);
ByteBuffer read = stream.read();
final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
LOGGER.trace(
"Streaming requested from LSN {}, received LSN {}",
startLsn,
lastReceiveLsn);
if (reachEnd(lastReceivedLsn)) {
lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
processor.process(new ReplicationMessage.NoopMessage(null, null));
return;
}
if (messageDecoder.shouldMessageBeSkipped(
read, lastReceiveLsn, startLsn, walPosition)) {
return;
}
deserializeMessages(read, processor);
}
@Override
public boolean readPending(ReplicationMessageProcessor processor)
throws SQLException, InterruptedException {
processWarnings(false);
ByteBuffer read = stream.readPending();
final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
LOGGER.trace(
"Streaming requested from LSN {}, received LSN {}",
startLsn,
lastReceiveLsn);
if (reachEnd(lastReceiveLsn)) {
lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
processor.process(new ReplicationMessage.NoopMessage(null, null));
return true;
}
if (read == null) {
return false;
}
if (messageDecoder.shouldMessageBeSkipped(
read, lastReceiveLsn, startLsn, walPosition)) {
return true;
}
deserializeMessages(read, processor);
return true;
}
private void deserializeMessages(
ByteBuffer buffer, ReplicationMessageProcessor processor)
throws SQLException, InterruptedException {
lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
messageDecoder.processMessage(buffer, processor, typeRegistry);
}
@Override
public void close() throws SQLException {
processWarnings(true);
stream.close();
}
@Override
public void flushLsn(Lsn lsn) throws SQLException {
doFlushLsn(lsn);
}
private void doFlushLsn(Lsn lsn) throws SQLException {
stream.setFlushedLSN(lsn.asLogSequenceNumber());
stream.setAppliedLSN(lsn.asLogSequenceNumber());
stream.forceUpdateStatus();
}
@Override
public Lsn lastReceivedLsn() {
return lastReceivedLsn;
}
@Override
public void startKeepAlive(ExecutorService service) {
if (keepAliveExecutor == null) {
keepAliveExecutor = service;
keepAliveRunning = new AtomicBoolean(true);
keepAliveExecutor.submit(
() -> {
while (keepAliveRunning.get()) {
try {
LOGGER.trace(
"Forcing status update with replication stream");
stream.forceUpdateStatus();
metronome.pause();
} catch (Exception exp) {
throw new RuntimeException(
"received unexpected exception will perform keep alive",
exp);
}
}
});
}
}
@Override
public void stopKeepAlive() {
if (keepAliveExecutor != null) {
keepAliveRunning.set(false);
keepAliveExecutor.shutdownNow();
keepAliveExecutor = null;
}
}
private void processWarnings(final boolean forced) throws SQLException {
if (--warningCheckCounter == 0 || forced) {
warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
for (SQLWarning w = connection().getWarnings();
w != null;
w = w.getNextWarning()) {
LOGGER.debug(
"Server-side message: '{}', state = {}, code = {}",
w.getMessage(),
w.getSQLState(),
w.getErrorCode());
}
connection().clearWarnings();
}
}
@Override
public Lsn startLsn() {
return startLsn;
}
private boolean reachEnd(Lsn receivedLsn) {
if (receivedLsn == null) {
return false;
}
return endingPos != null
&& (!endingPos.isNonStopping())
&& endingPos.compareTo(receivedLsn) < 0;
}
};
}