public DataStreamSource buildCdcSource()

in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java [136:227]


    public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
        String databaseName = config.get(PostgresSourceOptions.DATABASE_NAME);
        String schemaName = config.get(PostgresSourceOptions.SCHEMA_NAME);
        String slotName = config.get(SLOT_NAME);
        Preconditions.checkNotNull(databaseName, "database-name in postgres is required");
        Preconditions.checkNotNull(schemaName, "schema-name in postgres is required");
        Preconditions.checkNotNull(slotName, "slot.name in postgres is required");

        String tableName = config.get(PostgresSourceOptions.TABLE_NAME);
        String hostname = config.get(PostgresSourceOptions.HOSTNAME);
        Integer port = config.get(PostgresSourceOptions.PG_PORT);
        String username = config.get(PostgresSourceOptions.USERNAME);
        String password = config.get(PostgresSourceOptions.PASSWORD);

        StartupOptions startupOptions = StartupOptions.initial();
        String startupMode = config.get(PostgresSourceOptions.SCAN_STARTUP_MODE);
        if (DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_INITIAL.equalsIgnoreCase(startupMode)) {
            startupOptions = StartupOptions.initial();
        } else if (DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_LATEST_OFFSET.equalsIgnoreCase(
                startupMode)) {
            startupOptions = StartupOptions.latest();
        }

        // debezium properties set
        Properties debeziumProperties = new Properties();
        debeziumProperties.putAll(PostgresDateConverter.DEFAULT_PROPS);
        debeziumProperties.put(DatabaseSyncConfig.DECIMAL_HANDLING_MODE, "string");

        for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
                debeziumProperties.put(
                        key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
            }
        }

        DebeziumDeserializationSchema<String> schema;
        if (ignoreDefaultValue) {
            schema = new DorisJsonDebeziumDeserializationSchema();
        } else {
            Map<String, Object> customConverterConfigs = new HashMap<>();
            schema = new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
        }

        if (config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false)) {
            JdbcIncrementalSource<String> incrSource =
                    PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
                            .hostname(hostname)
                            .port(port)
                            .database(databaseName)
                            .schemaList(schemaName)
                            .tableList(tableName)
                            .username(username)
                            .password(password)
                            .deserializer(schema)
                            .slotName(slotName)
                            .decodingPluginName(config.get(DECODING_PLUGIN_NAME))
                            .debeziumProperties(debeziumProperties)
                            .startupOptions(startupOptions)
                            .splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
                            .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
                            .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
                            .connectTimeout(config.get(CONNECT_TIMEOUT))
                            .connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
                            .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
                            .distributionFactorUpper(
                                    config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
                            .distributionFactorLower(
                                    config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
                            .heartbeatInterval(config.get(HEARTBEAT_INTERVAL))
                            .build();
            return env.fromSource(
                    incrSource, WatermarkStrategy.noWatermarks(), "Postgres IncrSource");
        } else {
            DebeziumSourceFunction<String> postgresSource =
                    PostgreSQLSource.<String>builder()
                            .hostname(hostname)
                            .port(port)
                            .database(databaseName)
                            .schemaList(schemaName)
                            .tableList(tableName)
                            .username(username)
                            .password(password)
                            .debeziumProperties(debeziumProperties)
                            .deserializer(schema)
                            .slotName(slotName)
                            .decodingPluginName(config.get(DECODING_PLUGIN_NAME))
                            .build();
            return env.addSource(postgresSource, "Postgres Source");
        }
    }