public DynamicTableSource createDynamicTableSource()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java [61:161]


    public DynamicTableSource createDynamicTableSource(Context context) {
        final FactoryUtil.TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(this, context);
        helper.validateExcept(
                DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX, JdbcUrlUtils.PROPERTIES_PREFIX);

        final ReadableConfig config = helper.getOptions();
        String hostname = config.get(MySqlSourceOptions.HOSTNAME);
        String username = config.get(MySqlSourceOptions.USERNAME);
        String password = config.get(MySqlSourceOptions.PASSWORD);
        String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
        validateRegex(MySqlSourceOptions.DATABASE_NAME.key(), databaseName);
        String tableName = config.get(MySqlSourceOptions.TABLE_NAME);
        validateRegex(MySqlSourceOptions.TABLE_NAME.key(), tableName);
        int port = config.get(MySqlSourceOptions.PORT);
        int splitSize = config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
        int splitMetaGroupSize = config.get(MySqlSourceOptions.CHUNK_META_GROUP_SIZE);
        int fetchSize = config.get(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE);
        ZoneId serverTimeZone = getServerTimeZone(config);

        ResolvedSchema physicalSchema =
                getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
        String serverId = validateAndGetServerId(config);
        StartupOptions startupOptions = getStartupOptions(config);
        Duration connectTimeout = config.get(MySqlSourceOptions.CONNECT_TIMEOUT);
        int connectMaxRetries = config.get(MySqlSourceOptions.CONNECT_MAX_RETRIES);
        int connectionPoolSize = config.get(MySqlSourceOptions.CONNECTION_POOL_SIZE);
        double distributionFactorUpper =
                config.get(MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
        double distributionFactorLower =
                config.get(MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
        boolean scanNewlyAddedTableEnabled =
                config.get(MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED);
        Duration heartbeatInterval = config.get(MySqlSourceOptions.HEARTBEAT_INTERVAL);
        String chunkKeyColumn =
                config.getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN)
                        .orElse(null);

        boolean enableParallelRead =
                config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
        boolean closeIdleReaders =
                config.get(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
        boolean skipSnapshotBackFill =
                config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
        boolean parseOnLineSchemaChanges =
                config.get(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
        boolean useLegacyJsonFormat = config.get(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
        boolean assignUnboundedChunkFirst =
                config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);

        boolean appendOnly =
                config.get(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);

        if (enableParallelRead) {
            validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
            validateIntegerOption(
                    MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
            validateIntegerOption(MySqlSourceOptions.CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
            validateIntegerOption(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
            validateIntegerOption(MySqlSourceOptions.CONNECTION_POOL_SIZE, connectionPoolSize, 1);
            validateIntegerOption(MySqlSourceOptions.CONNECT_MAX_RETRIES, connectMaxRetries, 0);
            validateDistributionFactorUpper(distributionFactorUpper);
            validateDistributionFactorLower(distributionFactorLower);
            validateDurationOption(
                    MySqlSourceOptions.CONNECT_TIMEOUT, connectTimeout, Duration.ofMillis(250));
        }

        OptionUtils.printOptions(IDENTIFIER, ((Configuration) config).toMap());

        return new MySqlTableSource(
                physicalSchema,
                port,
                hostname,
                databaseName,
                tableName,
                username,
                password,
                serverTimeZone,
                getDebeziumProperties(context.getCatalogTable().getOptions()),
                serverId,
                enableParallelRead,
                splitSize,
                splitMetaGroupSize,
                fetchSize,
                connectTimeout,
                connectMaxRetries,
                connectionPoolSize,
                distributionFactorUpper,
                distributionFactorLower,
                startupOptions,
                scanNewlyAddedTableEnabled,
                closeIdleReaders,
                JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
                heartbeatInterval,
                chunkKeyColumn,
                skipSnapshotBackFill,
                parseOnLineSchemaChanges,
                useLegacyJsonFormat,
                assignUnboundedChunkFirst,
                appendOnly);
    }