in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java [1015:1253]
public void execute(
ChangeEventSourceContext context,
MySqlPartition partition,
MySqlOffsetContext offsetContext)
throws InterruptedException {
if (!connectorConfig.getSnapshotMode().shouldStream()) {
LOGGER.info(
"Streaming is disabled for snapshot mode {}",
connectorConfig.getSnapshotMode());
return;
}
if (connectorConfig.getSnapshotMode() != MySqlConnectorConfig.SnapshotMode.NEVER) {
taskContext.getSchema().assureNonEmptySchema();
}
final Set<Operation> skippedOperations = connectorConfig.getSkippedOperations();
final MySqlOffsetContext effectiveOffsetContext =
offsetContext != null ? offsetContext : MySqlOffsetContext.initial(connectorConfig);
// Register our event handlers ...
eventHandlers.put(
EventType.STOP, (event) -> handleServerStop(effectiveOffsetContext, event));
eventHandlers.put(
EventType.HEARTBEAT,
(event) -> handleServerHeartbeat(partition, effectiveOffsetContext, event));
eventHandlers.put(
EventType.INCIDENT,
(event) -> handleServerIncident(partition, effectiveOffsetContext, event));
eventHandlers.put(
EventType.ROTATE, (event) -> handleRotateLogsEvent(effectiveOffsetContext, event));
eventHandlers.put(
EventType.TABLE_MAP,
(event) -> handleUpdateTableMetadata(partition, effectiveOffsetContext, event));
eventHandlers.put(
EventType.QUERY,
(event) -> handleQueryEvent(partition, effectiveOffsetContext, event));
if (!skippedOperations.contains(Operation.CREATE)) {
eventHandlers.put(
EventType.WRITE_ROWS,
(event) -> handleInsert(partition, effectiveOffsetContext, event));
eventHandlers.put(
EventType.EXT_WRITE_ROWS,
(event) -> handleInsert(partition, effectiveOffsetContext, event));
}
if (!skippedOperations.contains(Operation.UPDATE)) {
eventHandlers.put(
EventType.UPDATE_ROWS,
(event) -> handleUpdate(partition, effectiveOffsetContext, event));
eventHandlers.put(
EventType.EXT_UPDATE_ROWS,
(event) -> handleUpdate(partition, effectiveOffsetContext, event));
}
if (!skippedOperations.contains(Operation.DELETE)) {
eventHandlers.put(
EventType.DELETE_ROWS,
(event) -> handleDelete(partition, effectiveOffsetContext, event));
eventHandlers.put(
EventType.EXT_DELETE_ROWS,
(event) -> handleDelete(partition, effectiveOffsetContext, event));
}
eventHandlers.put(
EventType.VIEW_CHANGE, (event) -> viewChange(effectiveOffsetContext, event));
eventHandlers.put(
EventType.XA_PREPARE, (event) -> prepareTransaction(effectiveOffsetContext, event));
eventHandlers.put(
EventType.XID,
(event) -> handleTransactionCompletion(partition, effectiveOffsetContext, event));
// Conditionally register ROWS_QUERY handler to parse SQL statements.
if (connectorConfig.includeSqlQuery()) {
eventHandlers.put(
EventType.ROWS_QUERY,
(event) -> handleRowsQuery(effectiveOffsetContext, event));
}
BinaryLogClient.EventListener listener;
if (connectorConfig.bufferSizeForStreamingChangeEventSource() == 0) {
listener = (event) -> handleEvent(partition, effectiveOffsetContext, event);
} else {
EventBuffer buffer =
new EventBuffer(
connectorConfig.bufferSizeForStreamingChangeEventSource(),
this,
context);
listener = (event) -> buffer.add(partition, effectiveOffsetContext, event);
}
client.registerEventListener(listener);
client.registerLifecycleListener(new ReaderThreadLifecycleListener(effectiveOffsetContext));
client.registerEventListener((event) -> onEvent(effectiveOffsetContext, event));
if (LOGGER.isDebugEnabled()) {
client.registerEventListener((event) -> logEvent(effectiveOffsetContext, event));
}
final boolean isGtidModeEnabled = connection.isGtidModeEnabled();
metrics.setIsGtidModeEnabled(isGtidModeEnabled);
// Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of
// the last Debezium checkpoint.
String availableServerGtidStr = connection.knownGtidSet();
if (isGtidModeEnabled) {
// The server is using GTIDs, so enable the handler ...
eventHandlers.put(
EventType.GTID, (event) -> handleGtidEvent(effectiveOffsetContext, event));
// Now look at the GTID set from the server and what we've previously seen ...
GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);
// also take into account purged GTID logs
GtidSet purgedServerGtidSet = connection.purgedGtidSet();
LOGGER.info("GTID set purged on server: {}", purgedServerGtidSet);
GtidSet filteredGtidSet =
filterGtidSet(
effectiveOffsetContext, availableServerGtidSet, purgedServerGtidSet);
if (filteredGtidSet != null) {
// We've seen at least some GTIDs, so start reading from the filtered GTID set ...
LOGGER.info("Registering binlog reader with GTID set: {}", filteredGtidSet);
String filteredGtidSetStr = filteredGtidSet.toString();
client.setGtidSet(filteredGtidSetStr);
effectiveOffsetContext.setCompletedGtidSet(filteredGtidSetStr);
gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);
} else {
// We've not yet seen any GTIDs, so that means we have to start reading the binlog
// from the beginning ...
client.setBinlogFilename(effectiveOffsetContext.getSource().binlogFilename());
client.setBinlogPosition(effectiveOffsetContext.getSource().binlogPosition());
gtidSet = new com.github.shyiko.mysql.binlog.GtidSet("");
}
} else {
// The server is not using GTIDs, so start reading the binlog based upon where we last
// left off ...
client.setBinlogFilename(effectiveOffsetContext.getSource().binlogFilename());
client.setBinlogPosition(effectiveOffsetContext.getSource().binlogPosition());
}
// We may be restarting in the middle of a transaction, so see how far into the transaction
// we have already processed...
initialEventsToSkip = effectiveOffsetContext.eventsToSkipUponRestart();
LOGGER.info("Skip {} events on streaming start", initialEventsToSkip);
// Set the starting row number, which is the next row number to be read ...
startingRowNumber = effectiveOffsetContext.rowsToSkipUponRestart();
LOGGER.info("Skip {} rows on streaming start", startingRowNumber);
// Only when we reach the first BEGIN event will we start to skip events ...
skipEvent = false;
try {
// Start the log reader, which starts background threads ...
if (context.isRunning()) {
long timeout = connectorConfig.getConnectionTimeout().toMillis();
long started = clock.currentTimeInMillis();
try {
LOGGER.debug(
"Attempting to establish binlog reader connection with timeout of {} ms",
timeout);
client.connect(timeout);
// Need to wait for keepalive thread to be running, otherwise it can be left
// orphaned
// The problem is with timing. When the close is called too early after connect
// then
// the keepalive thread is not terminated
if (client.isKeepAlive()) {
LOGGER.info("Waiting for keepalive thread to start");
final Metronome metronome = Metronome.parker(Duration.ofMillis(100), clock);
int waitAttempts = 50;
boolean keepAliveThreadRunning = false;
while (!keepAliveThreadRunning && waitAttempts-- > 0) {
for (Thread t : binaryLogClientThreads.values()) {
if (t.getName().startsWith(KEEPALIVE_THREAD_NAME) && t.isAlive()) {
LOGGER.info("Keepalive thread is running");
keepAliveThreadRunning = true;
}
}
metronome.pause();
}
}
} catch (TimeoutException e) {
// If the client thread is interrupted *before* the client could connect, the
// client throws a timeout exception
// The only way we can distinguish this is if we get the timeout exception
// before the specified timeout has
// elapsed, so we simply check this (within 10%) ...
long duration = clock.currentTimeInMillis() - started;
if (duration > (0.9 * timeout)) {
double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration);
throw new DebeziumException(
"Timed out after "
+ actualSeconds
+ " seconds while waiting to connect to MySQL at "
+ connectorConfig.hostname()
+ ":"
+ connectorConfig.port()
+ " with user '"
+ connectorConfig.username()
+ "'",
e);
}
// Otherwise, we were told to shutdown, so we don't care about the timeout
// exception
} catch (AuthenticationException e) {
throw new DebeziumException(
"Failed to authenticate to the MySQL database at "
+ connectorConfig.hostname()
+ ":"
+ connectorConfig.port()
+ " with user '"
+ connectorConfig.username()
+ "'",
e);
} catch (Throwable e) {
throw new DebeziumException(
"Unable to connect to the MySQL database at "
+ connectorConfig.hostname()
+ ":"
+ connectorConfig.port()
+ " with user '"
+ connectorConfig.username()
+ "': "
+ e.getMessage(),
e);
}
}
while (context.isRunning()) {
Thread.sleep(100);
}
} finally {
try {
client.disconnect();
} catch (Exception e) {
LOGGER.info("Exception while stopping binary log client", e);
}
}
}