in flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java [117:288]
public DataSource createDataSource(Context context) {
FactoryHelper.createFactoryHelper(this, context)
.validateExcept(PROPERTIES_PREFIX, DEBEZIUM_OPTIONS_PREFIX);
final Configuration config = context.getFactoryConfiguration();
String hostname = config.get(HOSTNAME);
int port = config.get(PORT);
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String tables = config.get(TABLES);
String tablesExclude = config.get(TABLES_EXCLUDE);
String serverId = validateAndGetServerId(config);
ZoneId serverTimeZone = getServerTimeZone(config);
StartupOptions startupOptions = getStartupOptions(config);
// Batch mode only supports StartupMode.SNAPSHOT.
Configuration pipelineConfiguration = context.getPipelineConfiguration();
if (pipelineConfiguration != null
&& pipelineConfiguration.contains(PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE)
&& RuntimeExecutionMode.BATCH.equals(
pipelineConfiguration.get(PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE))
&& !StartupOptions.snapshot().equals(startupOptions)) {
throw new IllegalArgumentException(
String.format(
"Only \"snapshot\" of MySQLDataSource StartupOption is supported in BATCH pipeline, but actual MySQLDataSource StartupOption is {}.",
startupOptions.startupMode));
}
boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED);
int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
double distributionFactorUpper = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED);
boolean treatTinyInt1AsBoolean = config.get(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
boolean scanBinlogNewlyAddedTableEnabled =
config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES);
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
boolean isAssignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0);
validateDistributionFactorUpper(distributionFactorUpper);
validateDistributionFactorLower(distributionFactorLower);
Map<String, String> configMap = config.toMap();
OptionUtils.printOptions(IDENTIFIER, config.toMap());
if (includeComments) {
// set debezium config 'include.schema.comments' to true
configMap.put(
DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
+ RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
"true");
}
if (!treatTinyInt1AsBoolean) {
// set jdbc config 'tinyInt1isBit' to false
configMap.put(PROPERTIES_PREFIX + PropertyKey.tinyInt1isBit.getKeyName(), "false");
}
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(hostname)
.port(port)
.username(username)
.password(password)
.databaseList(".*")
.tableList(".*")
.startupOptions(startupOptions)
.serverId(serverId)
.serverTimeZone(serverTimeZone.getId())
.fetchSize(fetchSize)
.splitSize(splitSize)
.splitMetaGroupSize(splitMetaGroupSize)
.distributionFactorLower(distributionFactorLower)
.distributionFactorUpper(distributionFactorUpper)
.heartbeatInterval(heartbeatInterval)
.connectTimeout(connectTimeout)
.connectMaxRetries(connectMaxRetries)
.connectionPoolSize(connectionPoolSize)
.closeIdleReaders(closeIdleReaders)
.includeSchemaChanges(includeSchemaChanges)
.debeziumProperties(getDebeziumProperties(configMap))
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.parseOnLineSchemaChanges(isParsingOnLineSchemaChanges)
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst);
List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
if (scanBinlogNewlyAddedTableEnabled && scanNewlyAddedTableEnabled) {
throw new IllegalArgumentException(
"If both scan.binlog.newly-added-table.enabled and scan.newly-added-table.enabled are true, data maybe duplicate after restore");
}
if (scanBinlogNewlyAddedTableEnabled) {
String newTables = validateTableAndReturnDebeziumStyle(tables);
configFactory.tableList(newTables);
configFactory.excludeTableList(tablesExclude);
} else {
Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
List<String> capturedTables = getTableList(tableIds, selectors);
if (capturedTables.isEmpty()) {
throw new IllegalArgumentException(
"Cannot find any table by the option 'tables' = " + tables);
}
if (tablesExclude != null) {
Selectors selectExclude =
new Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
List<String> excludeTables = getTableList(tableIds, selectExclude);
if (!excludeTables.isEmpty()) {
capturedTables.removeAll(excludeTables);
}
if (capturedTables.isEmpty()) {
throw new IllegalArgumentException(
"Cannot find any table with by the option 'tables.exclude' = "
+ tablesExclude);
}
}
configFactory.tableList(capturedTables.toArray(new String[0]));
}
String chunkKeyColumns = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
if (chunkKeyColumns != null) {
Map<ObjectPath, String> chunkKeyColumnMap = new HashMap<>();
for (String chunkKeyColumn : chunkKeyColumns.split(";")) {
String[] splits = chunkKeyColumn.split(":");
if (splits.length == 2) {
Selectors chunkKeySelector =
new Selectors.SelectorsBuilder().includeTables(splits[0]).build();
List<ObjectPath> tableList =
getChunkKeyColumnTableList(tableIds, chunkKeySelector);
for (ObjectPath table : tableList) {
chunkKeyColumnMap.put(table, splits[1]);
}
} else {
throw new IllegalArgumentException(
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key()
+ " = "
+ chunkKeyColumns
+ " failed to be parsed in this part '"
+ chunkKeyColumn
+ "'.");
}
}
LOG.info("Add chunkKeyColumn {}.", chunkKeyColumnMap);
configFactory.chunkKeyColumn(chunkKeyColumnMap);
}
String metadataList = config.get(METADATA_LIST);
List<MySqlReadableMetadata> readableMetadataList = listReadableMetadata(metadataList);
return new MySqlDataSource(configFactory, readableMetadataList);
}