in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java [138:233]
public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();
String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
Preconditions.checkNotNull(databaseName, "database-name in mysql is required");
String tableName = config.get(MySqlSourceOptions.TABLE_NAME);
sourceBuilder
.hostname(config.get(MySqlSourceOptions.HOSTNAME))
.port(config.get(MySqlSourceOptions.PORT))
.username(config.get(MySqlSourceOptions.USERNAME))
.password(config.get(MySqlSourceOptions.PASSWORD))
.databaseList(databaseName)
.tableList(tableName);
config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
config.getOptional(MySqlSourceOptions.SERVER_TIME_ZONE)
.ifPresent(sourceBuilder::serverTimeZone);
config.getOptional(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE)
.ifPresent(sourceBuilder::fetchSize);
config.getOptional(MySqlSourceOptions.CONNECT_TIMEOUT)
.ifPresent(sourceBuilder::connectTimeout);
config.getOptional(MySqlSourceOptions.CONNECT_MAX_RETRIES)
.ifPresent(sourceBuilder::connectMaxRetries);
config.getOptional(MySqlSourceOptions.CONNECTION_POOL_SIZE)
.ifPresent(sourceBuilder::connectionPoolSize);
config.getOptional(MySqlSourceOptions.HEARTBEAT_INTERVAL)
.ifPresent(sourceBuilder::heartbeatInterval);
config.getOptional(MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED)
.ifPresent(sourceBuilder::scanNewlyAddedTableEnabled);
config.getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE)
.ifPresent(sourceBuilder::splitSize);
config.getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED)
.ifPresent(sourceBuilder::closeIdleReaders);
setChunkColumns(sourceBuilder);
String startupMode = config.get(MySqlSourceOptions.SCAN_STARTUP_MODE);
if (DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_INITIAL.equalsIgnoreCase(startupMode)) {
sourceBuilder.startupOptions(StartupOptions.initial());
} else if (DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_EARLIEST_OFFSET.equalsIgnoreCase(
startupMode)) {
sourceBuilder.startupOptions(StartupOptions.earliest());
} else if (DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_LATEST_OFFSET.equalsIgnoreCase(
startupMode)) {
sourceBuilder.startupOptions(StartupOptions.latest());
} else if (DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET.equalsIgnoreCase(
startupMode)) {
BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
String file = config.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
Long pos = config.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
if (file != null && pos != null) {
offsetBuilder.setBinlogFilePosition(file, pos);
}
config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET)
.ifPresent(offsetBuilder::setGtidSet);
config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS)
.ifPresent(offsetBuilder::setSkipEvents);
config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS)
.ifPresent(offsetBuilder::setSkipRows);
sourceBuilder.startupOptions(StartupOptions.specificOffset(offsetBuilder.build()));
} else if (DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_TIMESTAMP.equalsIgnoreCase(
startupMode)) {
sourceBuilder.startupOptions(
StartupOptions.timestamp(
config.get(MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
}
Properties jdbcProperties = new Properties();
Properties debeziumProperties = new Properties();
// date to string
debeziumProperties.putAll(DateToStringConverter.DEFAULT_PROPS);
for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(PROPERTIES_PREFIX)) {
jdbcProperties.put(key.substring(PROPERTIES_PREFIX.length()), value);
} else if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
debeziumProperties.put(
key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
}
}
sourceBuilder.jdbcProperties(jdbcProperties);
sourceBuilder.debeziumProperties(debeziumProperties);
DebeziumDeserializationSchema<String> schema;
if (ignoreDefaultValue) {
schema = new DorisJsonDebeziumDeserializationSchema();
} else {
Map<String, Object> customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
schema = new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
}
MySqlSource<String> mySqlSource =
sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
return env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
}