in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java [116:191]
public boolean build() throws Exception {
DorisConnectionOptions options = getDorisConnectionOptions();
DorisSystem dorisSystem = new DorisSystem(options);
List<SourceSchema> schemaList = getSchemaList();
Preconditions.checkState(
!schemaList.isEmpty(),
"No tables to be synchronized. Please make sure whether the tables that need to be synchronized exist in the corresponding database or schema.");
if (!StringUtils.isNullOrWhitespaceOnly(database)
&& !dorisSystem.databaseExists(database)) {
LOG.info("database {} not exist, created", database);
dorisSystem.createDatabase(database);
}
List<String> syncTables = new ArrayList<>();
List<Tuple2<String, String>> dorisTables = new ArrayList<>();
Set<String> targetDbSet = new HashSet<>();
for (SourceSchema schema : schemaList) {
syncTables.add(schema.getTableName());
String targetDb = database;
// Synchronize multiple databases using the src database name
if (StringUtils.isNullOrWhitespaceOnly(targetDb)) {
targetDb = schema.getDatabaseName();
targetDbSet.add(targetDb);
}
if (StringUtils.isNullOrWhitespaceOnly(database)
&& !dorisSystem.databaseExists(targetDb)) {
LOG.info("database {} not exist, created", targetDb);
dorisSystem.createDatabase(targetDb);
}
String dorisTable = converter.convert(schema.getTableName());
// Calculate the mapping relationship between upstream and downstream tables
tableMapping.put(
schema.getTableIdentifier(), String.format("%s.%s", targetDb, dorisTable));
DorisTableUtil.tryCreateTableIfAbsent(
dorisSystem,
targetDb,
dorisTable,
schema,
dorisTableConfig,
ignoreIncompatible);
if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) {
dorisTables.add(Tuple2.of(targetDb, dorisTable));
}
}
if (createTableOnly) {
System.out.println("Create table finished.");
return false;
}
LOG.info("table mapping: {}", tableMapping);
config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables));
DataStreamSource<String> streamSource = buildCdcSource(env);
if (singleSink) {
streamSource.sinkTo(buildDorisSink());
} else {
SingleOutputStreamOperator<Void> parsedStream =
streamSource.process(buildProcessFunction());
for (Tuple2<String, String> dbTbl : dorisTables) {
OutputTag<String> recordOutputTag =
ParsingProcessFunction.createRecordOutputTag(dbTbl.f0, dbTbl.f1);
DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag);
int sinkParallel =
sinkConfig.getInteger(
DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism());
String uidName = getUidName(targetDbSet, dbTbl);
sideOutput
.sinkTo(buildDorisSink(dbTbl.f0 + "." + dbTbl.f1))
.setParallelism(sinkParallel)
.name(uidName)
.uid(uidName);
}
}
return true;
}