public DataSource createDataSource()

in flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java [117:288]


    public DataSource createDataSource(Context context) {
        FactoryHelper.createFactoryHelper(this, context)
                .validateExcept(PROPERTIES_PREFIX, DEBEZIUM_OPTIONS_PREFIX);

        final Configuration config = context.getFactoryConfiguration();
        String hostname = config.get(HOSTNAME);
        int port = config.get(PORT);

        String username = config.get(USERNAME);
        String password = config.get(PASSWORD);
        String tables = config.get(TABLES);
        String tablesExclude = config.get(TABLES_EXCLUDE);

        String serverId = validateAndGetServerId(config);
        ZoneId serverTimeZone = getServerTimeZone(config);
        StartupOptions startupOptions = getStartupOptions(config);
        // Batch mode only supports StartupMode.SNAPSHOT.
        Configuration pipelineConfiguration = context.getPipelineConfiguration();
        if (pipelineConfiguration != null
                && pipelineConfiguration.contains(PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE)
                && RuntimeExecutionMode.BATCH.equals(
                        pipelineConfiguration.get(PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE))
                && !StartupOptions.snapshot().equals(startupOptions)) {
            throw new IllegalArgumentException(
                    String.format(
                            "Only \"snapshot\" of MySQLDataSource StartupOption is supported in BATCH pipeline, but actual MySQLDataSource StartupOption is {}.",
                            startupOptions.startupMode));
        }
        boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED);

        int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
        int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
        int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);

        double distributionFactorUpper = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
        double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);

        boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
        boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED);
        boolean treatTinyInt1AsBoolean = config.get(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);

        Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
        Duration connectTimeout = config.get(CONNECT_TIMEOUT);
        int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
        int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
        boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
        boolean scanBinlogNewlyAddedTableEnabled =
                config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
        boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES);
        boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
        boolean isAssignUnboundedChunkFirst =
                config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);

        validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
        validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
        validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
        validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
        validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0);
        validateDistributionFactorUpper(distributionFactorUpper);
        validateDistributionFactorLower(distributionFactorLower);

        Map<String, String> configMap = config.toMap();
        OptionUtils.printOptions(IDENTIFIER, config.toMap());
        if (includeComments) {
            // set debezium config 'include.schema.comments' to true
            configMap.put(
                    DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
                            + RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
                    "true");
        }

        if (!treatTinyInt1AsBoolean) {
            // set jdbc config 'tinyInt1isBit' to false
            configMap.put(PROPERTIES_PREFIX + PropertyKey.tinyInt1isBit.getKeyName(), "false");
        }

        MySqlSourceConfigFactory configFactory =
                new MySqlSourceConfigFactory()
                        .hostname(hostname)
                        .port(port)
                        .username(username)
                        .password(password)
                        .databaseList(".*")
                        .tableList(".*")
                        .startupOptions(startupOptions)
                        .serverId(serverId)
                        .serverTimeZone(serverTimeZone.getId())
                        .fetchSize(fetchSize)
                        .splitSize(splitSize)
                        .splitMetaGroupSize(splitMetaGroupSize)
                        .distributionFactorLower(distributionFactorLower)
                        .distributionFactorUpper(distributionFactorUpper)
                        .heartbeatInterval(heartbeatInterval)
                        .connectTimeout(connectTimeout)
                        .connectMaxRetries(connectMaxRetries)
                        .connectionPoolSize(connectionPoolSize)
                        .closeIdleReaders(closeIdleReaders)
                        .includeSchemaChanges(includeSchemaChanges)
                        .debeziumProperties(getDebeziumProperties(configMap))
                        .jdbcProperties(getJdbcProperties(configMap))
                        .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
                        .parseOnLineSchemaChanges(isParsingOnLineSchemaChanges)
                        .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
                        .useLegacyJsonFormat(useLegacyJsonFormat)
                        .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst);

        List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);

        if (scanBinlogNewlyAddedTableEnabled && scanNewlyAddedTableEnabled) {
            throw new IllegalArgumentException(
                    "If both scan.binlog.newly-added-table.enabled and scan.newly-added-table.enabled are true, data maybe duplicate after restore");
        }

        if (scanBinlogNewlyAddedTableEnabled) {
            String newTables = validateTableAndReturnDebeziumStyle(tables);
            configFactory.tableList(newTables);
            configFactory.excludeTableList(tablesExclude);

        } else {
            Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
            List<String> capturedTables = getTableList(tableIds, selectors);
            if (capturedTables.isEmpty()) {
                throw new IllegalArgumentException(
                        "Cannot find any table by the option 'tables' = " + tables);
            }
            if (tablesExclude != null) {
                Selectors selectExclude =
                        new Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
                List<String> excludeTables = getTableList(tableIds, selectExclude);
                if (!excludeTables.isEmpty()) {
                    capturedTables.removeAll(excludeTables);
                }
                if (capturedTables.isEmpty()) {
                    throw new IllegalArgumentException(
                            "Cannot find any table with by the option 'tables.exclude'  = "
                                    + tablesExclude);
                }
            }
            configFactory.tableList(capturedTables.toArray(new String[0]));
        }

        String chunkKeyColumns = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
        if (chunkKeyColumns != null) {
            Map<ObjectPath, String> chunkKeyColumnMap = new HashMap<>();

            for (String chunkKeyColumn : chunkKeyColumns.split(";")) {
                String[] splits = chunkKeyColumn.split(":");
                if (splits.length == 2) {
                    Selectors chunkKeySelector =
                            new Selectors.SelectorsBuilder().includeTables(splits[0]).build();
                    List<ObjectPath> tableList =
                            getChunkKeyColumnTableList(tableIds, chunkKeySelector);
                    for (ObjectPath table : tableList) {
                        chunkKeyColumnMap.put(table, splits[1]);
                    }
                } else {
                    throw new IllegalArgumentException(
                            SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key()
                                    + " = "
                                    + chunkKeyColumns
                                    + " failed to be parsed in this part '"
                                    + chunkKeyColumn
                                    + "'.");
                }
            }
            LOG.info("Add chunkKeyColumn {}.", chunkKeyColumnMap);
            configFactory.chunkKeyColumn(chunkKeyColumnMap);
        }
        String metadataList = config.get(METADATA_LIST);
        List<MySqlReadableMetadata> readableMetadataList = listReadableMetadata(metadataList);
        return new MySqlDataSource(configFactory, readableMetadataList);
    }