in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java [149:463]
public boolean executeIteration(
ChangeEventSourceContext context,
SqlServerPartition partition,
SqlServerOffsetContext offsetContext)
throws InterruptedException {
if (connectorConfig.getSnapshotMode().equals(SnapshotMode.INITIAL_ONLY)) {
LOGGER.info("Streaming is not enabled in current configuration");
return false;
}
final String databaseName = partition.getDatabaseName();
try {
final SqlServerStreamingExecutionContext streamingExecutionContext =
streamingExecutionContexts.getOrDefault(
partition,
new SqlServerStreamingExecutionContext(
new PriorityQueue<>(
(x, y) -> x.getStopLsn().compareTo(y.getStopLsn())),
new AtomicReference<>(),
offsetContext.getChangePosition(),
new AtomicBoolean(false),
// LSN should be increased for the first run only immediately
// after snapshot completion
// otherwise we might skip an incomplete transaction after
// restart
offsetContext.isSnapshotCompleted()));
if (!streamingExecutionContexts.containsKey(partition)) {
streamingExecutionContexts.put(partition, streamingExecutionContext);
LOGGER.info(
"Last position recorded in offsets is {}[{}]",
offsetContext.getChangePosition(),
offsetContext.getEventSerialNo());
}
final Queue<SqlServerChangeTable> schemaChangeCheckpoints =
streamingExecutionContext.getSchemaChangeCheckpoints();
final AtomicReference<SqlServerChangeTable[]> tablesSlot =
streamingExecutionContext.getTablesSlot();
final TxLogPosition lastProcessedPositionOnStart = offsetContext.getChangePosition();
final long lastProcessedEventSerialNoOnStart = offsetContext.getEventSerialNo();
final AtomicBoolean changesStoppedBeingMonotonic =
streamingExecutionContext.getChangesStoppedBeingMonotonic();
final int maxTransactionsPerIteration =
connectorConfig.getMaxTransactionsPerIteration();
TxLogPosition lastProcessedPosition =
streamingExecutionContext.getLastProcessedPosition();
if (context.isRunning()) {
commitTransaction();
final Lsn toLsn =
getToLsn(
dataConnection,
databaseName,
lastProcessedPosition,
maxTransactionsPerIteration);
// Shouldn't happen if the agent is running, but it is better to guard against such
// situation
if (!toLsn.isAvailable()) {
LOGGER.warn(
"No maximum LSN recorded in the database; please ensure that the SQL Server Agent is running");
return false;
}
// There is no change in the database
if (toLsn.compareTo(lastProcessedPosition.getCommitLsn()) <= 0
&& streamingExecutionContext.getShouldIncreaseFromLsn()) {
LOGGER.debug("No change in the database");
return false;
}
// Reading interval is inclusive so we need to move LSN forward but not for first
// run as TX might not be streamed completely
final Lsn fromLsn =
lastProcessedPosition.getCommitLsn().isAvailable()
&& streamingExecutionContext.getShouldIncreaseFromLsn()
? dataConnection.incrementLsn(
databaseName, lastProcessedPosition.getCommitLsn())
: lastProcessedPosition.getCommitLsn();
streamingExecutionContext.setShouldIncreaseFromLsn(true);
while (!schemaChangeCheckpoints.isEmpty()) {
migrateTable(partition, schemaChangeCheckpoints, offsetContext);
}
if (!dataConnection.getNewChangeTables(databaseName, fromLsn, toLsn).isEmpty()) {
final SqlServerChangeTable[] tables =
getChangeTablesToQuery(partition, offsetContext, toLsn);
tablesSlot.set(tables);
for (SqlServerChangeTable table : tables) {
if (table.getStartLsn().isBetween(fromLsn, toLsn)) {
LOGGER.info("Schema will be changed for {}", table);
schemaChangeCheckpoints.add(table);
}
}
}
if (tablesSlot.get() == null) {
tablesSlot.set(getChangeTablesToQuery(partition, offsetContext, toLsn));
}
try {
dataConnection.getChangesForTables(
databaseName,
tablesSlot.get(),
fromLsn,
toLsn,
resultSets -> {
long eventSerialNoInInitialTx = 1;
final int tableCount = resultSets.length;
final SqlServerChangeTablePointer[] changeTables =
new SqlServerChangeTablePointer[tableCount];
final SqlServerChangeTable[] tables = tablesSlot.get();
for (int i = 0; i < tableCount; i++) {
changeTables[i] =
new SqlServerChangeTablePointer(
tables[i],
resultSets[i],
connectorConfig.getSourceTimestampMode());
changeTables[i].next();
}
for (; ; ) {
SqlServerChangeTablePointer tableWithSmallestLsn = null;
for (SqlServerChangeTablePointer changeTable : changeTables) {
if (changeTable.isCompleted()) {
continue;
}
if (tableWithSmallestLsn == null
|| changeTable.compareTo(tableWithSmallestLsn)
< 0) {
tableWithSmallestLsn = changeTable;
}
}
if (tableWithSmallestLsn == null) {
// No more LSNs available
break;
}
if (!(tableWithSmallestLsn.getChangePosition().isAvailable()
&& tableWithSmallestLsn
.getChangePosition()
.getInTxLsn()
.isAvailable())) {
LOGGER.error(
"Skipping change {} as its LSN is NULL which is not expected",
tableWithSmallestLsn);
tableWithSmallestLsn.next();
continue;
}
if (tableWithSmallestLsn.isNewTransaction()
&& changesStoppedBeingMonotonic.get()) {
LOGGER.info(
"Resetting changesStoppedBeingMonotonic as transaction changes");
changesStoppedBeingMonotonic.set(false);
}
// After restart for changes that are not monotonic to avoid
// data loss
if (tableWithSmallestLsn
.isCurrentPositionSmallerThanPreviousPosition()) {
LOGGER.info(
"Disabling skipping changes due to not monotonic order of changes");
changesStoppedBeingMonotonic.set(true);
}
// After restart for changes that were executed before the last
// committed offset
if (!changesStoppedBeingMonotonic.get()
&& tableWithSmallestLsn
.getChangePosition()
.compareTo(lastProcessedPositionOnStart)
< 0) {
LOGGER.info(
"Skipping change {} as its position is smaller than the last recorded position {}",
tableWithSmallestLsn,
lastProcessedPositionOnStart);
tableWithSmallestLsn.next();
continue;
}
// After restart for change that was the last committed and
// operations in it before the last committed offset
if (!changesStoppedBeingMonotonic.get()
&& tableWithSmallestLsn
.getChangePosition()
.compareTo(lastProcessedPositionOnStart)
== 0
&& eventSerialNoInInitialTx
<= lastProcessedEventSerialNoOnStart) {
LOGGER.info(
"Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]",
tableWithSmallestLsn,
eventSerialNoInInitialTx,
lastProcessedPositionOnStart,
lastProcessedEventSerialNoOnStart);
eventSerialNoInInitialTx++;
tableWithSmallestLsn.next();
continue;
}
if (tableWithSmallestLsn
.getChangeTable()
.getStopLsn()
.isAvailable()
&& tableWithSmallestLsn
.getChangeTable()
.getStopLsn()
.compareTo(
tableWithSmallestLsn
.getChangePosition()
.getCommitLsn())
<= 0) {
LOGGER.debug(
"Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}",
tableWithSmallestLsn,
tableWithSmallestLsn.getChangePosition());
tableWithSmallestLsn.next();
continue;
}
LOGGER.trace("Processing change {}", tableWithSmallestLsn);
LOGGER.trace(
"Schema change checkpoints {}",
schemaChangeCheckpoints);
if (!schemaChangeCheckpoints.isEmpty()) {
if (tableWithSmallestLsn
.getChangePosition()
.getCommitLsn()
.compareTo(
schemaChangeCheckpoints
.peek()
.getStartLsn())
>= 0) {
migrateTable(
partition,
schemaChangeCheckpoints,
offsetContext);
}
}
final TableId tableId =
tableWithSmallestLsn
.getChangeTable()
.getSourceTableId();
final int operation = tableWithSmallestLsn.getOperation();
final Object[] data = tableWithSmallestLsn.getData();
// UPDATE consists of two consecutive events, first event
// contains
// the row before it was updated and the second the row after
// it was updated
int eventCount = 1;
if (operation
== SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) {
if (!tableWithSmallestLsn.next()
|| tableWithSmallestLsn.getOperation()
!= SqlServerChangeRecordEmitter
.OP_UPDATE_AFTER) {
throw new IllegalStateException(
"The update before event at "
+ tableWithSmallestLsn
.getChangePosition()
+ " for table "
+ tableId
+ " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
}
eventCount = 2;
}
final Object[] dataNext =
(operation
== SqlServerChangeRecordEmitter
.OP_UPDATE_BEFORE)
? tableWithSmallestLsn.getData()
: null;
offsetContext.setChangePosition(
tableWithSmallestLsn.getChangePosition(), eventCount);
offsetContext.event(
tableWithSmallestLsn
.getChangeTable()
.getSourceTableId(),
connectorConfig
.getSourceTimestampMode()
.getTimestamp(
clock,
tableWithSmallestLsn.getResultSet()));
dispatcher.dispatchDataChangeEvent(
partition,
tableId,
new SqlServerChangeRecordEmitter(
partition,
offsetContext,
operation,
data,
dataNext,
clock));
tableWithSmallestLsn.next();
}
});
streamingExecutionContext.setLastProcessedPosition(
TxLogPosition.valueOf(toLsn));
// Terminate the transaction otherwise CDC could not be disabled for tables
dataConnection.rollback();
// Determine whether to continue streaming in sqlserver cdc snapshot phase
afterHandleLsn(partition, toLsn);
} catch (SQLException e) {
tablesSlot.set(
processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get()));
}
}
} catch (Exception e) {
errorHandler.setProducerThrowable(e);
}
return true;
}