public DataStreamSource buildCdcSource()

in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java [109:186]


    public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
        String databaseName = config.get(OracleSourceOptions.DATABASE_NAME);
        String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME);
        Preconditions.checkNotNull(databaseName, "database-name in oracle is required");
        Preconditions.checkNotNull(schemaName, "schema-name in oracle is required");
        String tableName = config.get(OracleSourceOptions.TABLE_NAME);
        String url = config.get(OracleSourceOptions.URL);
        String hostname = config.get(OracleSourceOptions.HOSTNAME);
        Integer port = config.get(OracleSourceOptions.PORT);
        String username = config.get(OracleSourceOptions.USERNAME);
        String password = config.get(OracleSourceOptions.PASSWORD);

        StartupOptions startupOptions = StartupOptions.initial();
        String startupMode = config.get(OracleSourceOptions.SCAN_STARTUP_MODE);
        if ("initial".equalsIgnoreCase(startupMode)) {
            startupOptions = StartupOptions.initial();
        } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
            startupOptions = StartupOptions.latest();
        }

        //debezium properties set
        Properties debeziumProperties = new Properties();
        debeziumProperties.put("decimal.handling.mode", "string");
        //date to string
        debeziumProperties.putAll(OracleDateConverter.DEFAULT_PROPS);

        for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
                debeziumProperties.put(
                        key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
            }
        }

        Map<String, Object> customConverterConfigs = new HashMap<>();
        JsonDebeziumDeserializationSchema schema =
                new JsonDebeziumDeserializationSchema(false, customConverterConfigs);

        if(config.getBoolean(OracleSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false)){
            JdbcIncrementalSource<String> incrSource = OracleSourceBuilder.OracleIncrementalSource.<String>builder()
                    .hostname(hostname)
                    .url(url)
                    .port(port)
                    .databaseList(databaseName)
                    .schemaList(schemaName)
                    .tableList(schemaName + "." + tableName)
                    .username(username)
                    .password(password)
                    .startupOptions(startupOptions)
                    .deserializer(schema)
                    .debeziumProperties(debeziumProperties)
                    .splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
                    .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
                    .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
                    .connectTimeout(config.get(CONNECT_TIMEOUT))
                    .connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
                    .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
                    .distributionFactorUpper(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
                    .distributionFactorLower(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
                    .build();
            return env.fromSource(incrSource,  WatermarkStrategy.noWatermarks(), "Oracle IncrSource");
        }else{
            DebeziumSourceFunction<String> oracleSource = OracleSource.<String>builder().url(url)
                    .hostname(hostname)
                    .port(port)
                    .username(username)
                    .password(password)
                    .database(databaseName)
                    .schemaList(schemaName)
                    .tableList(schemaName + "." + tableName)
                    .debeziumProperties(debeziumProperties)
                    .startupOptions(startupOptions)
                    .deserializer(schema)
                    .build();
            return env.addSource(oracleSource, "Oracle Source");
        }
    }