in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java [185:227]
public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
String hosts = config.get(MongoDBSourceOptions.HOSTS);
String username = config.get(MongoDBSourceOptions.USERNAME);
String password = config.get(MongoDBSourceOptions.PASSWORD);
String database = config.get(MongoDBSourceOptions.DATABASE);
// note: just to unify job name, no other use.
config.setString(DatabaseSyncConfig.DATABASE_NAME, database);
String collection = config.get(MongoDBSourceOptions.COLLECTION);
if (StringUtils.isBlank(collection)) {
collection = config.get(TABLE_NAME);
}
MongoDBSourceBuilder<String> mongoDBSourceBuilder = MongoDBSource.builder();
Map<String, Object> customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
JsonDebeziumDeserializationSchema schema =
new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
mongoDBSourceBuilder
.hosts(hosts)
.username(username)
.password(password)
.databaseList(database)
.collectionList(collection);
String startupMode = config.get(SourceOptions.SCAN_STARTUP_MODE);
switch (startupMode.toLowerCase()) {
case DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_INITIAL:
mongoDBSourceBuilder.startupOptions(StartupOptions.initial());
break;
case DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_LATEST_OFFSET:
mongoDBSourceBuilder.startupOptions(StartupOptions.latest());
break;
case DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
mongoDBSourceBuilder.startupOptions(
StartupOptions.timestamp(
config.get(SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
break;
default:
throw new IllegalArgumentException("Unsupported startup mode: " + startupMode);
}
MongoDBSource<String> mongoDBSource = mongoDBSourceBuilder.deserializer(schema).build();
return env.fromSource(mongoDBSource, WatermarkStrategy.noWatermarks(), "MongoDB Source");
}