in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java [823:985]
public void execute(ChangeEventSourceContext context) throws InterruptedException {
if (!connectorConfig.getSnapshotMode().shouldStream()) {
LOGGER.info("Streaming is disabled for snapshot mode {}", connectorConfig.getSnapshotMode());
return;
}
taskContext.getSchema().assureNonEmptySchema();
final Set<Operation> skippedOperations = connectorConfig.getSkippedOps();
// Register our event handlers ...
eventHandlers.put(EventType.STOP, this::handleServerStop);
eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat);
eventHandlers.put(EventType.INCIDENT, this::handleServerIncident);
eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
eventHandlers.put(EventType.QUERY, this::handleQueryEvent);
if (!skippedOperations.contains(Operation.CREATE)) {
eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert);
}
if (!skippedOperations.contains(Operation.UPDATE)) {
eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate);
}
if (!skippedOperations.contains(Operation.DELETE)) {
eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);
eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
}
eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
eventHandlers.put(EventType.XID, this::handleTransactionCompletion);
// Conditionally register ROWS_QUERY handler to parse SQL statements.
if (connectorConfig.includeSqlQuery()) {
eventHandlers.put(EventType.ROWS_QUERY, this::handleRowsQuery);
}
client.registerEventListener(connectorConfig.bufferSizeForStreamingChangeEventSource() == 0
? this::handleEvent
: (new EventBuffer(connectorConfig.bufferSizeForStreamingChangeEventSource(), this, context))::add);
client.registerLifecycleListener(new ReaderThreadLifecycleListener());
client.registerEventListener(this::onEvent);
if (LOGGER.isDebugEnabled()) {
client.registerEventListener(this::logEvent);
}
final boolean isGtidModeEnabled = connection.isGtidModeEnabled();
metrics.setIsGtidModeEnabled(isGtidModeEnabled);
// Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium
// checkpoint.
String availableServerGtidStr = connection.knownGtidSet();
if (isGtidModeEnabled) {
// The server is using GTIDs, so enable the handler ...
eventHandlers.put(EventType.GTID, this::handleGtidEvent);
// Now look at the GTID set from the server and what we've previously seen ...
GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);
// also take into account purged GTID logs
GtidSet purgedServerGtidSet = connection.purgedGtidSet();
LOGGER.info("GTID set purged on server: {}", purgedServerGtidSet);
GtidSet filteredGtidSet = filterGtidSet(availableServerGtidSet, purgedServerGtidSet);
if (filteredGtidSet != null) {
// We've seen at least some GTIDs, so start reading from the filtered GTID set ...
LOGGER.info("Registering binlog reader with GTID set: {}", filteredGtidSet);
String filteredGtidSetStr = filteredGtidSet.toString();
client.setGtidSet(filteredGtidSetStr);
offsetContext.setCompletedGtidSet(filteredGtidSetStr);
gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);
} else {
// We've not yet seen any GTIDs, so that means we have to start reading the binlog from the beginning
// ...
client.setBinlogFilename(offsetContext.getSource().binlogFilename());
client.setBinlogPosition(offsetContext.getSource().binlogPosition());
gtidSet = new com.github.shyiko.mysql.binlog.GtidSet("");
}
} else {
// The server is not using GTIDs, so start reading the binlog based upon where we last left off ...
client.setBinlogFilename(offsetContext.getSource().binlogFilename());
client.setBinlogPosition(offsetContext.getSource().binlogPosition());
}
// We may be restarting in the middle of a transaction, so see how far into the transaction we have already
// processed...
initialEventsToSkip = offsetContext.eventsToSkipUponRestart();
LOGGER.info("Skip {} events on streaming start", initialEventsToSkip);
// Set the starting row number, which is the next row number to be read ...
startingRowNumber = offsetContext.rowsToSkipUponRestart();
LOGGER.info("Skip {} rows on streaming start", startingRowNumber);
// Only when we reach the first BEGIN event will we start to skip events ...
skipEvent = false;
try {
// Start the log reader, which starts background threads ...
if (context.isRunning()) {
long timeout = connectorConfig.getConnectionTimeout().toMillis();
long started = clock.currentTimeInMillis();
try {
LOGGER.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout);
client.connect(timeout);
// Need to wait for keepalive thread to be running, otherwise it can be left orphaned
// The problem is with timing. When the close is called too early after connect then
// the keepalive thread is not terminated
if (client.isKeepAlive()) {
LOGGER.info("Waiting for keepalive thread to start");
final Metronome metronome = Metronome.parker(Duration.ofMillis(100), clock);
int waitAttempts = 50;
boolean keepAliveThreadRunning = false;
while (!keepAliveThreadRunning && waitAttempts-- > 0) {
for (Thread t : binaryLogClientThreads.values()) {
if (t.getName().startsWith(KEEPALIVE_THREAD_NAME) && t.isAlive()) {
LOGGER.info("Keepalive thread is running");
keepAliveThreadRunning = true;
}
}
metronome.pause();
}
}
} catch (TimeoutException e) {
// If the client thread is interrupted *before* the client could connect, the client throws a
// timeout exception
// The only way we can distinguish this is if we get the timeout exception before the specified
// timeout has
// elapsed, so we simply check this (within 10%) ...
long duration = clock.currentTimeInMillis() - started;
if (duration > (0.9 * timeout)) {
double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration);
throw new DebeziumException(
"Timed out after " + actualSeconds + " seconds while waiting to connect to MySQL at " +
connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+ connectorConfig.username() + "'",
e);
}
// Otherwise, we were told to shutdown, so we don't care about the timeout exception
} catch (AuthenticationException e) {
throw new DebeziumException("Failed to authenticate to the MySQL database at " +
connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+ connectorConfig.username() + "'", e);
} catch (Throwable e) {
throw new DebeziumException("Unable to connect to the MySQL database at " +
connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+ connectorConfig.username() + "': " + e.getMessage(), e);
}
}
while (context.isRunning()) {
Thread.sleep(100);
}
} finally {
try {
client.disconnect();
} catch (Exception e) {
LOGGER.info("Exception while stopping binary log client", e);
}
}
}