public DataStreamSource buildCdcSource()

in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java [160:256]


    public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
        Properties debeziumProperties = new Properties();
        String databaseName = config.get(OracleSourceOptions.DATABASE_NAME);
        String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME);
        Preconditions.checkNotNull(databaseName, "database-name in oracle is required");
        Preconditions.checkNotNull(schemaName, "schema-name in oracle is required");
        String tableName = config.get(OracleSourceOptions.TABLE_NAME);
        // LogMinerQueryBuilder.buildTablePredicate is separated by commas to avoid
        // the error ORA-12733 when the regexp_like regular expression exceeds 512 characters
        if (!singleSink && tableName.length() > 256) {
            // todo: Make the length of a single regular expression as long as possible.
            tableName = tableName.replace("|", ",");
        }

        String url = config.get(OracleSourceOptions.URL);
        String hostname = config.get(OracleSourceOptions.HOSTNAME);
        Integer port = config.get(OracleSourceOptions.PORT);
        String username = config.get(OracleSourceOptions.USERNAME);
        String password = config.get(OracleSourceOptions.PASSWORD);

        StartupOptions startupOptions = StartupOptions.initial();
        String startupMode = config.get(OracleSourceOptions.SCAN_STARTUP_MODE);
        if (DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_INITIAL.equalsIgnoreCase(startupMode)) {
            startupOptions = StartupOptions.initial();
        } else if (DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_LATEST_OFFSET.equalsIgnoreCase(
                startupMode)) {
            startupOptions = StartupOptions.latest();
        }

        // debezium properties set
        debeziumProperties.put(DatabaseSyncConfig.DECIMAL_HANDLING_MODE, "string");
        // date to string
        debeziumProperties.putAll(OracleDateConverter.DEFAULT_PROPS);

        for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
                debeziumProperties.put(
                        key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
            }
        }

        DebeziumDeserializationSchema<String> schema;
        if (ignoreDefaultValue) {
            schema = new DorisJsonDebeziumDeserializationSchema();
        } else {
            Map<String, Object> customConverterConfigs = new HashMap<>();
            schema = new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
        }

        if (config.getBoolean(OracleSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false)) {
            JdbcIncrementalSource<String> incrSource =
                    OracleSourceBuilder.OracleIncrementalSource.<String>builder()
                            .hostname(hostname)
                            .url(url)
                            .port(port)
                            .databaseList(databaseName)
                            .schemaList(schemaName)
                            .tableList(tableName)
                            .username(username)
                            .password(password)
                            .includeSchemaChanges(true)
                            .startupOptions(startupOptions)
                            .deserializer(schema)
                            .debeziumProperties(debeziumProperties)
                            .splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
                            .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
                            .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
                            .connectTimeout(config.get(CONNECT_TIMEOUT))
                            .connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
                            .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
                            .distributionFactorUpper(
                                    config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
                            .distributionFactorLower(
                                    config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
                            .build();
            return env.fromSource(
                    incrSource, WatermarkStrategy.noWatermarks(), "Oracle IncrSource");
        } else {
            DebeziumSourceFunction<String> oracleSource =
                    OracleSource.<String>builder()
                            .url(url)
                            .hostname(hostname)
                            .port(port)
                            .username(username)
                            .password(password)
                            .database(databaseName)
                            .schemaList(schemaName)
                            .tableList(tableName)
                            .debeziumProperties(debeziumProperties)
                            .startupOptions(startupOptions)
                            .deserializer(schema)
                            .build();
            return env.addSource(oracleSource, "Oracle Source");
        }
    }