in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java [191:336]
public MySqlStreamingChangeEventSource(
MySqlConnectorConfig connectorConfig,
MySqlConnection connection,
EventDispatcher<MySqlPartition, TableId> dispatcher,
ErrorHandler errorHandler,
Clock clock,
MySqlTaskContext taskContext,
MySqlStreamingChangeEventSourceMetrics metrics) {
this.taskContext = taskContext;
this.connectorConfig = connectorConfig;
this.connection = connection;
this.clock = clock;
this.eventDispatcher = dispatcher;
this.errorHandler = errorHandler;
this.metrics = metrics;
eventDeserializationFailureHandlingMode =
connectorConfig.getEventProcessingFailureHandlingMode();
inconsistentSchemaHandlingMode = connectorConfig.inconsistentSchemaFailureHandlingMode();
// Set up the log reader ...
client = taskContext.getBinaryLogClient();
// BinaryLogClient will overwrite thread names later
client.setThreadFactory(
Threads.threadFactory(
MySqlConnector.class,
connectorConfig.getLogicalName(),
"binlog-client",
false,
false,
x -> binaryLogClientThreads.put(x.getName(), x)));
client.setServerId(connectorConfig.serverId());
client.setSSLMode(sslModeFor(connectorConfig.sslMode()));
if (connectorConfig.sslModeEnabled()) {
SSLSocketFactory sslSocketFactory =
getBinlogSslSocketFactory(connectorConfig, connection);
if (sslSocketFactory != null) {
client.setSslSocketFactory(sslSocketFactory);
}
}
Configuration configuration = connectorConfig.getConfig();
client.setKeepAlive(configuration.getBoolean(MySqlConnectorConfig.KEEP_ALIVE));
final long keepAliveInterval =
configuration.getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS);
client.setKeepAliveInterval(keepAliveInterval);
// Considering heartbeatInterval should be less than keepAliveInterval, we use the
// heartbeatIntervalFactor
// multiply by keepAliveInterval and set the result value to heartbeatInterval.The default
// value of heartbeatIntervalFactor
// is 0.8, and we believe the left time (0.2 * keepAliveInterval) is enough to process the
// packet received from the MySQL server.
client.setHeartbeatInterval((long) (keepAliveInterval * heartbeatIntervalFactor));
boolean filterDmlEventsByGtidSource =
configuration.getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS);
gtidDmlSourceFilter =
filterDmlEventsByGtidSource ? connectorConfig.gtidSourceFilter() : null;
// Set up the event deserializer with additional type(s) ...
final Map<Long, TableMapEventData> tableMapEventByTableId =
new HashMap<Long, TableMapEventData>();
EventDeserializer eventDeserializer =
new EventDeserializer() {
@Override
public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
try {
// Delegate to the superclass ...
Event event = super.nextEvent(inputStream);
// We have to record the most recent TableMapEventData for each table
// number for our custom deserializers ...
if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
TableMapEventData tableMapEvent = event.getData();
tableMapEventByTableId.put(
tableMapEvent.getTableId(), tableMapEvent);
}
// DBZ-5126 Clean cache on rotate event to prevent it from growing
// indefinitely.
if (event.getHeader().getEventType() == EventType.ROTATE
&& event.getHeader().getTimestamp() != 0) {
tableMapEventByTableId.clear();
}
return event;
}
// DBZ-217 In case an event couldn't be read we create a pseudo-event for
// the sake of logging
catch (EventDataDeserializationException edde) {
// DBZ-3095 As of Java 15, when reaching EOF in the binlog stream, the
// polling loop in
// BinaryLogClient#listenForEventPackets() keeps returning values != -1
// from peek();
// this causes the loop to never finish
// Propagating the exception (either EOF or socket closed) causes the
// loop to be aborted
// in this case
if (edde.getCause() instanceof IOException) {
throw edde;
}
EventHeaderV4 header = new EventHeaderV4();
header.setEventType(EventType.INCIDENT);
header.setTimestamp(edde.getEventHeader().getTimestamp());
header.setServerId(edde.getEventHeader().getServerId());
if (edde.getEventHeader() instanceof EventHeaderV4) {
header.setEventLength(
((EventHeaderV4) edde.getEventHeader()).getEventLength());
header.setNextPosition(
((EventHeaderV4) edde.getEventHeader()).getNextPosition());
header.setFlags(((EventHeaderV4) edde.getEventHeader()).getFlags());
}
EventData data = new EventDataDeserializationExceptionData(edde);
return new Event(header, data);
}
}
};
// Add our custom deserializers ...
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(
EventType.WRITE_ROWS,
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(
EventType.UPDATE_ROWS,
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(
EventType.DELETE_ROWS,
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(
EventType.EXT_WRITE_ROWS,
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)
.setMayContainExtraInformation(true));
eventDeserializer.setEventDataDeserializer(
EventType.EXT_UPDATE_ROWS,
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)
.setMayContainExtraInformation(true));
eventDeserializer.setEventDataDeserializer(
EventType.EXT_DELETE_ROWS,
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)
.setMayContainExtraInformation(true));
client.setEventDeserializer(eventDeserializer);
}