in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java [244:337]
static MySqlSource<String> buildMySqlSource(Configuration mySqlConfig, String tableList) {
validateMySqlConfig(mySqlConfig);
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();
sourceBuilder
.hostname(mySqlConfig.get(MySqlSourceOptions.HOSTNAME))
.port(mySqlConfig.get(MySqlSourceOptions.PORT))
.username(mySqlConfig.get(MySqlSourceOptions.USERNAME))
.password(mySqlConfig.get(MySqlSourceOptions.PASSWORD))
.databaseList(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME))
.tableList(tableList);
mySqlConfig.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
mySqlConfig
.getOptional(MySqlSourceOptions.SERVER_TIME_ZONE)
.ifPresent(sourceBuilder::serverTimeZone);
// MySQL CDC using increment snapshot, splitSize is used instead of fetchSize (as in JDBC
// connector). splitSize is the number of records in each snapshot split.
mySqlConfig
.getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE)
.ifPresent(sourceBuilder::splitSize);
mySqlConfig
.getOptional(MySqlSourceOptions.CONNECT_TIMEOUT)
.ifPresent(sourceBuilder::connectTimeout);
mySqlConfig
.getOptional(MySqlSourceOptions.CONNECT_MAX_RETRIES)
.ifPresent(sourceBuilder::connectMaxRetries);
mySqlConfig
.getOptional(MySqlSourceOptions.CONNECTION_POOL_SIZE)
.ifPresent(sourceBuilder::connectionPoolSize);
mySqlConfig
.getOptional(MySqlSourceOptions.HEARTBEAT_INTERVAL)
.ifPresent(sourceBuilder::heartbeatInterval);
String startupMode = mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_MODE);
// see
// https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java#L196
if ("initial".equalsIgnoreCase(startupMode)) {
sourceBuilder.startupOptions(StartupOptions.initial());
} else if ("earliest-offset".equalsIgnoreCase(startupMode)) {
sourceBuilder.startupOptions(StartupOptions.earliest());
} else if ("latest-offset".equalsIgnoreCase(startupMode)) {
sourceBuilder.startupOptions(StartupOptions.latest());
} else if ("specific-offset".equalsIgnoreCase(startupMode)) {
BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
String file = mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
Long pos = mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
if (file != null && pos != null) {
offsetBuilder.setBinlogFilePosition(file, pos);
}
mySqlConfig
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET)
.ifPresent(offsetBuilder::setGtidSet);
mySqlConfig
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS)
.ifPresent(offsetBuilder::setSkipEvents);
mySqlConfig
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS)
.ifPresent(offsetBuilder::setSkipRows);
sourceBuilder.startupOptions(StartupOptions.specificOffset(offsetBuilder.build()));
} else if ("timestamp".equalsIgnoreCase(startupMode)) {
sourceBuilder.startupOptions(
StartupOptions.timestamp(
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
}
Properties jdbcProperties = new Properties();
Properties debeziumProperties = new Properties();
for (Map.Entry<String, String> entry : mySqlConfig.toMap().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
jdbcProperties.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), value);
} else if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
debeziumProperties.put(
key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
}
}
sourceBuilder.jdbcProperties(jdbcProperties);
sourceBuilder.debeziumProperties(debeziumProperties);
Map<String, Object> customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
JsonDebeziumDeserializationSchema schema =
new JsonDebeziumDeserializationSchema(true, customConverterConfigs);
boolean scanNewlyAddedTables = mySqlConfig.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
return sourceBuilder
.deserializer(schema)
.includeSchemaChanges(true)
.scanNewlyAddedTableEnabled(scanNewlyAddedTables)
.build();
}