public DataStreamSource buildCdcSource()

in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java [185:227]


    public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
        String hosts = config.get(MongoDBSourceOptions.HOSTS);
        String username = config.get(MongoDBSourceOptions.USERNAME);
        String password = config.get(MongoDBSourceOptions.PASSWORD);
        String database = config.get(MongoDBSourceOptions.DATABASE);
        // note: just to unify job name, no other use.
        config.setString(DatabaseSyncConfig.DATABASE_NAME, database);
        String collection = config.get(MongoDBSourceOptions.COLLECTION);
        if (StringUtils.isBlank(collection)) {
            collection = config.get(TABLE_NAME);
        }
        MongoDBSourceBuilder<String> mongoDBSourceBuilder = MongoDBSource.builder();
        Map<String, Object> customConverterConfigs = new HashMap<>();
        customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
        JsonDebeziumDeserializationSchema schema =
                new JsonDebeziumDeserializationSchema(false, customConverterConfigs);

        mongoDBSourceBuilder
                .hosts(hosts)
                .username(username)
                .password(password)
                .databaseList(database)
                .collectionList(collection);

        String startupMode = config.get(SourceOptions.SCAN_STARTUP_MODE);
        switch (startupMode.toLowerCase()) {
            case DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_INITIAL:
                mongoDBSourceBuilder.startupOptions(StartupOptions.initial());
                break;
            case DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_LATEST_OFFSET:
                mongoDBSourceBuilder.startupOptions(StartupOptions.latest());
                break;
            case DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
                mongoDBSourceBuilder.startupOptions(
                        StartupOptions.timestamp(
                                config.get(SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
                break;
            default:
                throw new IllegalArgumentException("Unsupported startup mode: " + startupMode);
        }
        MongoDBSource<String> mongoDBSource = mongoDBSourceBuilder.deserializer(schema).build();
        return env.fromSource(mongoDBSource, WatermarkStrategy.noWatermarks(), "MongoDB Source");
    }