in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java [109:357]
public void execute(
ChangeEventSourceContext context,
Db2Partition partition,
Db2OffsetContext offsetContext)
throws InterruptedException {
if (!connectorConfig.getSnapshotMode().shouldStream()) {
LOGGER.info("Streaming is not enabled in current configuration");
return;
}
final Metronome metronome = Metronome.sleeper(pollInterval, clock);
final Queue<Db2ChangeTable> schemaChangeCheckpoints =
new PriorityQueue<>((x, y) -> x.getStopLsn().compareTo(y.getStopLsn()));
try {
final AtomicReference<Db2ChangeTable[]> tablesSlot =
new AtomicReference<>(getCdcTablesToQuery(partition, offsetContext));
final TxLogPosition lastProcessedPositionOnStart = offsetContext.getChangePosition();
final long lastProcessedEventSerialNoOnStart = offsetContext.getEventSerialNo();
LOGGER.info(
"Last position recorded in offsets is {}[{}]",
lastProcessedPositionOnStart,
lastProcessedEventSerialNoOnStart);
TxLogPosition lastProcessedPosition = lastProcessedPositionOnStart;
// LSN should be increased for the first run only immediately after snapshot completion
// otherwise we might skip an incomplete transaction after restart
boolean shouldIncreaseFromLsn = offsetContext.isSnapshotCompleted();
while (context.isRunning()) {
final Lsn currentMaxLsn = dataConnection.getMaxLsn();
// Shouldn't happen if the agent is running, but it is better to guard against such
// situation
if (!currentMaxLsn.isAvailable()) {
LOGGER.warn(
"No maximum LSN recorded in the database; please ensure that the DB2 Agent is running");
metronome.pause();
continue;
}
// There is no change in the database
if (currentMaxLsn.equals(lastProcessedPosition.getCommitLsn())
&& shouldIncreaseFromLsn) {
LOGGER.debug("No change in the database");
metronome.pause();
continue;
}
// Reading interval is inclusive so we need to move LSN forward but not for first
// run as TX might not be streamed completely
final Lsn fromLsn =
lastProcessedPosition.getCommitLsn().isAvailable() && shouldIncreaseFromLsn
? dataConnection.incrementLsn(lastProcessedPosition.getCommitLsn())
: lastProcessedPosition.getCommitLsn();
shouldIncreaseFromLsn = true;
while (!schemaChangeCheckpoints.isEmpty()) {
migrateTable(partition, offsetContext, schemaChangeCheckpoints);
}
if (!dataConnection.listOfNewChangeTables(fromLsn, currentMaxLsn).isEmpty()) {
final Db2ChangeTable[] tables = getCdcTablesToQuery(partition, offsetContext);
tablesSlot.set(tables);
for (Db2ChangeTable table : tables) {
if (table.getStartLsn().isBetween(fromLsn, currentMaxLsn)) {
LOGGER.info("Schema will be changed for {}", table);
schemaChangeCheckpoints.add(table);
}
}
}
try {
dataConnection.getChangesForTables(
tablesSlot.get(),
fromLsn,
currentMaxLsn,
resultSets -> {
long eventSerialNoInInitialTx = 1;
final int tableCount = resultSets.length;
final ChangeTablePointer[] changeTables =
new ChangeTablePointer[tableCount];
final Db2ChangeTable[] tables = tablesSlot.get();
for (int i = 0; i < tableCount; i++) {
changeTables[i] =
new ChangeTablePointer(tables[i], resultSets[i]);
changeTables[i].next();
}
for (; ; ) {
ChangeTablePointer tableWithSmallestLsn = null;
for (ChangeTablePointer changeTable : changeTables) {
if (changeTable.isCompleted()) {
continue;
}
if (tableWithSmallestLsn == null
|| changeTable.compareTo(tableWithSmallestLsn)
< 0) {
tableWithSmallestLsn = changeTable;
}
}
if (tableWithSmallestLsn == null) {
// No more LSNs available
break;
}
if (!(tableWithSmallestLsn.getChangePosition().isAvailable()
&& tableWithSmallestLsn
.getChangePosition()
.getInTxLsn()
.isAvailable())) {
LOGGER.error(
"Skipping change {} as its LSN is NULL which is not expected",
tableWithSmallestLsn);
tableWithSmallestLsn.next();
continue;
}
// After restart for changes that were executed before the last
// committed offset
if (tableWithSmallestLsn
.getChangePosition()
.compareTo(lastProcessedPositionOnStart)
< 0) {
LOGGER.info(
"Skipping change {} as its position is smaller than the last recorded position {}",
tableWithSmallestLsn,
lastProcessedPositionOnStart);
tableWithSmallestLsn.next();
continue;
}
// After restart for change that was the last committed and
// operations in it before the last committed offset
if (tableWithSmallestLsn
.getChangePosition()
.compareTo(lastProcessedPositionOnStart)
== 0
&& eventSerialNoInInitialTx
<= lastProcessedEventSerialNoOnStart) {
LOGGER.info(
"Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]",
tableWithSmallestLsn,
eventSerialNoInInitialTx,
lastProcessedPositionOnStart,
lastProcessedEventSerialNoOnStart);
eventSerialNoInInitialTx++;
tableWithSmallestLsn.next();
continue;
}
if (tableWithSmallestLsn
.getChangeTable()
.getStopLsn()
.isAvailable()
&& tableWithSmallestLsn
.getChangeTable()
.getStopLsn()
.compareTo(
tableWithSmallestLsn
.getChangePosition()
.getCommitLsn())
<= 0) {
LOGGER.debug(
"Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}",
tableWithSmallestLsn,
tableWithSmallestLsn.getChangePosition());
tableWithSmallestLsn.next();
continue;
}
LOGGER.trace("Processing change {}", tableWithSmallestLsn);
if (!schemaChangeCheckpoints.isEmpty()) {
if (tableWithSmallestLsn
.getChangePosition()
.getCommitLsn()
.compareTo(
schemaChangeCheckpoints
.peek()
.getStopLsn())
>= 0) {
migrateTable(
partition,
offsetContext,
schemaChangeCheckpoints);
}
}
final TableId tableId =
tableWithSmallestLsn
.getChangeTable()
.getSourceTableId();
final int operation = tableWithSmallestLsn.getOperation();
final Object[] data = tableWithSmallestLsn.getData();
// UPDATE consists of two consecutive events, first event
// contains
// the row before it was updated and the second the row after
// it was updated
int eventCount = 1;
if (operation == Db2ChangeRecordEmitter.OP_UPDATE_BEFORE) {
if (!tableWithSmallestLsn.next()
|| tableWithSmallestLsn.getOperation()
!= Db2ChangeRecordEmitter.OP_UPDATE_AFTER) {
throw new IllegalStateException(
"The update before event at "
+ tableWithSmallestLsn
.getChangePosition()
+ " for table "
+ tableId
+ " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
}
eventCount = 2;
}
final Object[] dataNext =
(operation == Db2ChangeRecordEmitter.OP_UPDATE_BEFORE)
? tableWithSmallestLsn.getData()
: null;
offsetContext.setChangePosition(
tableWithSmallestLsn.getChangePosition(), eventCount);
offsetContext.event(
tableWithSmallestLsn
.getChangeTable()
.getSourceTableId(),
metadataConnection.timestampOfLsn(
tableWithSmallestLsn
.getChangePosition()
.getCommitLsn()));
dispatcher.dispatchDataChangeEvent(
partition,
tableId,
new Db2ChangeRecordEmitter(
partition,
offsetContext,
operation,
data,
dataNext,
clock));
tableWithSmallestLsn.next();
}
});
lastProcessedPosition = TxLogPosition.valueOf(currentMaxLsn);
// Terminate the transaction otherwise CDC could not be disabled for tables
dataConnection.rollback();
// Determine whether to continue streaming in db2 cdc snapshot phase
afterHandleLsn(partition, currentMaxLsn);
} catch (SQLException e) {
tablesSlot.set(processErrorFromChangeTableQuery(e, tablesSlot.get()));
}
}
} catch (Exception e) {
errorHandler.setProducerThrowable(e);
}
}