public MySqlStreamingChangeEventSource()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java [191:336]


    public MySqlStreamingChangeEventSource(
            MySqlConnectorConfig connectorConfig,
            MySqlConnection connection,
            EventDispatcher<MySqlPartition, TableId> dispatcher,
            ErrorHandler errorHandler,
            Clock clock,
            MySqlTaskContext taskContext,
            MySqlStreamingChangeEventSourceMetrics metrics) {

        this.taskContext = taskContext;
        this.connectorConfig = connectorConfig;
        this.connection = connection;
        this.clock = clock;
        this.eventDispatcher = dispatcher;
        this.errorHandler = errorHandler;
        this.metrics = metrics;

        eventDeserializationFailureHandlingMode =
                connectorConfig.getEventProcessingFailureHandlingMode();
        inconsistentSchemaHandlingMode = connectorConfig.inconsistentSchemaFailureHandlingMode();

        // Set up the log reader ...
        client = taskContext.getBinaryLogClient();
        // BinaryLogClient will overwrite thread names later
        client.setThreadFactory(
                Threads.threadFactory(
                        MySqlConnector.class,
                        connectorConfig.getLogicalName(),
                        "binlog-client",
                        false,
                        false,
                        x -> binaryLogClientThreads.put(x.getName(), x)));
        client.setServerId(connectorConfig.serverId());
        client.setSSLMode(sslModeFor(connectorConfig.sslMode()));
        if (connectorConfig.sslModeEnabled()) {
            SSLSocketFactory sslSocketFactory =
                    getBinlogSslSocketFactory(connectorConfig, connection);
            if (sslSocketFactory != null) {
                client.setSslSocketFactory(sslSocketFactory);
            }
        }
        Configuration configuration = connectorConfig.getConfig();
        client.setKeepAlive(configuration.getBoolean(MySqlConnectorConfig.KEEP_ALIVE));
        final long keepAliveInterval =
                configuration.getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS);
        client.setKeepAliveInterval(keepAliveInterval);
        // Considering heartbeatInterval should be less than keepAliveInterval, we use the
        // heartbeatIntervalFactor
        // multiply by keepAliveInterval and set the result value to heartbeatInterval.The default
        // value of heartbeatIntervalFactor
        // is 0.8, and we believe the left time (0.2 * keepAliveInterval) is enough to process the
        // packet received from the MySQL server.
        client.setHeartbeatInterval((long) (keepAliveInterval * heartbeatIntervalFactor));

        boolean filterDmlEventsByGtidSource =
                configuration.getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS);
        gtidDmlSourceFilter =
                filterDmlEventsByGtidSource ? connectorConfig.gtidSourceFilter() : null;

        // Set up the event deserializer with additional type(s) ...
        final Map<Long, TableMapEventData> tableMapEventByTableId =
                new HashMap<Long, TableMapEventData>();
        EventDeserializer eventDeserializer =
                new EventDeserializer() {
                    @Override
                    public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
                        try {
                            // Delegate to the superclass ...
                            Event event = super.nextEvent(inputStream);

                            // We have to record the most recent TableMapEventData for each table
                            // number for our custom deserializers ...
                            if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
                                TableMapEventData tableMapEvent = event.getData();
                                tableMapEventByTableId.put(
                                        tableMapEvent.getTableId(), tableMapEvent);
                            }

                            // DBZ-5126 Clean cache on rotate event to prevent it from growing
                            // indefinitely.
                            if (event.getHeader().getEventType() == EventType.ROTATE
                                    && event.getHeader().getTimestamp() != 0) {
                                tableMapEventByTableId.clear();
                            }
                            return event;
                        }
                        // DBZ-217 In case an event couldn't be read we create a pseudo-event for
                        // the sake of logging
                        catch (EventDataDeserializationException edde) {
                            // DBZ-3095 As of Java 15, when reaching EOF in the binlog stream, the
                            // polling loop in
                            // BinaryLogClient#listenForEventPackets() keeps returning values != -1
                            // from peek();
                            // this causes the loop to never finish
                            // Propagating the exception (either EOF or socket closed) causes the
                            // loop to be aborted
                            // in this case
                            if (edde.getCause() instanceof IOException) {
                                throw edde;
                            }

                            EventHeaderV4 header = new EventHeaderV4();
                            header.setEventType(EventType.INCIDENT);
                            header.setTimestamp(edde.getEventHeader().getTimestamp());
                            header.setServerId(edde.getEventHeader().getServerId());

                            if (edde.getEventHeader() instanceof EventHeaderV4) {
                                header.setEventLength(
                                        ((EventHeaderV4) edde.getEventHeader()).getEventLength());
                                header.setNextPosition(
                                        ((EventHeaderV4) edde.getEventHeader()).getNextPosition());
                                header.setFlags(((EventHeaderV4) edde.getEventHeader()).getFlags());
                            }

                            EventData data = new EventDataDeserializationExceptionData(edde);
                            return new Event(header, data);
                        }
                    }
                };

        // Add our custom deserializers ...
        eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
        eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
        eventDeserializer.setEventDataDeserializer(
                EventType.WRITE_ROWS,
                new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
        eventDeserializer.setEventDataDeserializer(
                EventType.UPDATE_ROWS,
                new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
        eventDeserializer.setEventDataDeserializer(
                EventType.DELETE_ROWS,
                new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
        eventDeserializer.setEventDataDeserializer(
                EventType.EXT_WRITE_ROWS,
                new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)
                        .setMayContainExtraInformation(true));
        eventDeserializer.setEventDataDeserializer(
                EventType.EXT_UPDATE_ROWS,
                new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)
                        .setMayContainExtraInformation(true));
        eventDeserializer.setEventDataDeserializer(
                EventType.EXT_DELETE_ROWS,
                new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)
                        .setMayContainExtraInformation(true));
        client.setEventDeserializer(eventDeserializer);
    }