in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java [160:256]
public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
Properties debeziumProperties = new Properties();
String databaseName = config.get(OracleSourceOptions.DATABASE_NAME);
String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME);
Preconditions.checkNotNull(databaseName, "database-name in oracle is required");
Preconditions.checkNotNull(schemaName, "schema-name in oracle is required");
String tableName = config.get(OracleSourceOptions.TABLE_NAME);
// LogMinerQueryBuilder.buildTablePredicate is separated by commas to avoid
// the error ORA-12733 when the regexp_like regular expression exceeds 512 characters
if (!singleSink && tableName.length() > 256) {
// todo: Make the length of a single regular expression as long as possible.
tableName = tableName.replace("|", ",");
}
String url = config.get(OracleSourceOptions.URL);
String hostname = config.get(OracleSourceOptions.HOSTNAME);
Integer port = config.get(OracleSourceOptions.PORT);
String username = config.get(OracleSourceOptions.USERNAME);
String password = config.get(OracleSourceOptions.PASSWORD);
StartupOptions startupOptions = StartupOptions.initial();
String startupMode = config.get(OracleSourceOptions.SCAN_STARTUP_MODE);
if (DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_INITIAL.equalsIgnoreCase(startupMode)) {
startupOptions = StartupOptions.initial();
} else if (DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_LATEST_OFFSET.equalsIgnoreCase(
startupMode)) {
startupOptions = StartupOptions.latest();
}
// debezium properties set
debeziumProperties.put(DatabaseSyncConfig.DECIMAL_HANDLING_MODE, "string");
// date to string
debeziumProperties.putAll(OracleDateConverter.DEFAULT_PROPS);
for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
debeziumProperties.put(
key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
}
}
DebeziumDeserializationSchema<String> schema;
if (ignoreDefaultValue) {
schema = new DorisJsonDebeziumDeserializationSchema();
} else {
Map<String, Object> customConverterConfigs = new HashMap<>();
schema = new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
}
if (config.getBoolean(OracleSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false)) {
JdbcIncrementalSource<String> incrSource =
OracleSourceBuilder.OracleIncrementalSource.<String>builder()
.hostname(hostname)
.url(url)
.port(port)
.databaseList(databaseName)
.schemaList(schemaName)
.tableList(tableName)
.username(username)
.password(password)
.includeSchemaChanges(true)
.startupOptions(startupOptions)
.deserializer(schema)
.debeziumProperties(debeziumProperties)
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
.splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
.fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
.connectTimeout(config.get(CONNECT_TIMEOUT))
.connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
.connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
.distributionFactorUpper(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
.distributionFactorLower(
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
.build();
return env.fromSource(
incrSource, WatermarkStrategy.noWatermarks(), "Oracle IncrSource");
} else {
DebeziumSourceFunction<String> oracleSource =
OracleSource.<String>builder()
.url(url)
.hostname(hostname)
.port(port)
.username(username)
.password(password)
.database(databaseName)
.schemaList(schemaName)
.tableList(tableName)
.debeziumProperties(debeziumProperties)
.startupOptions(startupOptions)
.deserializer(schema)
.build();
return env.addSource(oracleSource, "Oracle Source");
}
}