in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java [92:133]
public void build() throws Exception {
DorisConnectionOptions options = getDorisConnectionOptions();
DorisSystem dorisSystem = new DorisSystem(options);
List<SourceSchema> schemaList = getSchemaList();
if (!dorisSystem.databaseExists(database)) {
LOG.info("database {} not exist, created", database);
dorisSystem.createDatabase(database);
}
List<String> syncTables = new ArrayList<>();
List<String> dorisTables = new ArrayList<>();
for (SourceSchema schema : schemaList) {
syncTables.add(schema.getTableName());
String dorisTable = converter.convert(schema.getTableName());
if (!dorisSystem.tableExists(database, dorisTable)) {
TableSchema dorisSchema = schema.convertTableSchema(tableConfig);
//set doris target database
dorisSchema.setDatabase(database);
dorisSchema.setTable(dorisTable);
dorisSystem.createTable(dorisSchema);
}
dorisTables.add(dorisTable);
}
if(createTableOnly){
System.out.println("Create table finished.");
System.exit(0);
}
Preconditions.checkState(!syncTables.isEmpty(), "No tables to be synchronized.");
config.setString(TABLE_NAME_OPTIONS, "(" + String.join("|", syncTables) + ")");
DataStreamSource<String> streamSource = buildCdcSource(env);
SingleOutputStreamOperator<Void> parsedStream = streamSource.process(new ParsingProcessFunction(converter));
for (String table : dorisTables) {
OutputTag<String> recordOutputTag = ParsingProcessFunction.createRecordOutputTag(table);
DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag);
int sinkParallel = sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism());
sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table).uid(table);
}
}