in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java [61:161]
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(
DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX, JdbcUrlUtils.PROPERTIES_PREFIX);
final ReadableConfig config = helper.getOptions();
String hostname = config.get(MySqlSourceOptions.HOSTNAME);
String username = config.get(MySqlSourceOptions.USERNAME);
String password = config.get(MySqlSourceOptions.PASSWORD);
String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
validateRegex(MySqlSourceOptions.DATABASE_NAME.key(), databaseName);
String tableName = config.get(MySqlSourceOptions.TABLE_NAME);
validateRegex(MySqlSourceOptions.TABLE_NAME.key(), tableName);
int port = config.get(MySqlSourceOptions.PORT);
int splitSize = config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
int splitMetaGroupSize = config.get(MySqlSourceOptions.CHUNK_META_GROUP_SIZE);
int fetchSize = config.get(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE);
ZoneId serverTimeZone = getServerTimeZone(config);
ResolvedSchema physicalSchema =
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
String serverId = validateAndGetServerId(config);
StartupOptions startupOptions = getStartupOptions(config);
Duration connectTimeout = config.get(MySqlSourceOptions.CONNECT_TIMEOUT);
int connectMaxRetries = config.get(MySqlSourceOptions.CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(MySqlSourceOptions.CONNECTION_POOL_SIZE);
double distributionFactorUpper =
config.get(MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
double distributionFactorLower =
config.get(MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
boolean scanNewlyAddedTableEnabled =
config.get(MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED);
Duration heartbeatInterval = config.get(MySqlSourceOptions.HEARTBEAT_INTERVAL);
String chunkKeyColumn =
config.getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN)
.orElse(null);
boolean enableParallelRead =
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
boolean closeIdleReaders =
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackFill =
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean parseOnLineSchemaChanges =
config.get(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
boolean useLegacyJsonFormat = config.get(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
boolean assignUnboundedChunkFirst =
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);
boolean appendOnly =
config.get(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
if (enableParallelRead) {
validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
validateIntegerOption(
MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(MySqlSourceOptions.CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
validateIntegerOption(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
validateIntegerOption(MySqlSourceOptions.CONNECTION_POOL_SIZE, connectionPoolSize, 1);
validateIntegerOption(MySqlSourceOptions.CONNECT_MAX_RETRIES, connectMaxRetries, 0);
validateDistributionFactorUpper(distributionFactorUpper);
validateDistributionFactorLower(distributionFactorLower);
validateDurationOption(
MySqlSourceOptions.CONNECT_TIMEOUT, connectTimeout, Duration.ofMillis(250));
}
OptionUtils.printOptions(IDENTIFIER, ((Configuration) config).toMap());
return new MySqlTableSource(
physicalSchema,
port,
hostname,
databaseName,
tableName,
username,
password,
serverTimeZone,
getDebeziumProperties(context.getCatalogTable().getOptions()),
serverId,
enableParallelRead,
splitSize,
splitMetaGroupSize,
fetchSize,
connectTimeout,
connectMaxRetries,
connectionPoolSize,
distributionFactorUpper,
distributionFactorLower,
startupOptions,
scanNewlyAddedTableEnabled,
closeIdleReaders,
JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
heartbeatInterval,
chunkKeyColumn,
skipSnapshotBackFill,
parseOnLineSchemaChanges,
useLegacyJsonFormat,
assignUnboundedChunkFirst,
appendOnly);
}