public boolean build()

in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java [116:191]


    public boolean build() throws Exception {
        DorisConnectionOptions options = getDorisConnectionOptions();
        DorisSystem dorisSystem = new DorisSystem(options);

        List<SourceSchema> schemaList = getSchemaList();
        Preconditions.checkState(
                !schemaList.isEmpty(),
                "No tables to be synchronized. Please make sure whether the tables that need to be synchronized exist in the corresponding database or schema.");

        if (!StringUtils.isNullOrWhitespaceOnly(database)
                && !dorisSystem.databaseExists(database)) {
            LOG.info("database {} not exist, created", database);
            dorisSystem.createDatabase(database);
        }
        List<String> syncTables = new ArrayList<>();
        List<Tuple2<String, String>> dorisTables = new ArrayList<>();

        Set<String> targetDbSet = new HashSet<>();
        for (SourceSchema schema : schemaList) {
            syncTables.add(schema.getTableName());
            String targetDb = database;
            // Synchronize multiple databases using the src database name
            if (StringUtils.isNullOrWhitespaceOnly(targetDb)) {
                targetDb = schema.getDatabaseName();
                targetDbSet.add(targetDb);
            }
            if (StringUtils.isNullOrWhitespaceOnly(database)
                    && !dorisSystem.databaseExists(targetDb)) {
                LOG.info("database {} not exist, created", targetDb);
                dorisSystem.createDatabase(targetDb);
            }
            String dorisTable = converter.convert(schema.getTableName());
            // Calculate the mapping relationship between upstream and downstream tables
            tableMapping.put(
                    schema.getTableIdentifier(), String.format("%s.%s", targetDb, dorisTable));
            DorisTableUtil.tryCreateTableIfAbsent(
                    dorisSystem,
                    targetDb,
                    dorisTable,
                    schema,
                    dorisTableConfig,
                    ignoreIncompatible);

            if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) {
                dorisTables.add(Tuple2.of(targetDb, dorisTable));
            }
        }
        if (createTableOnly) {
            System.out.println("Create table finished.");
            return false;
        }
        LOG.info("table mapping: {}", tableMapping);
        config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables));
        DataStreamSource<String> streamSource = buildCdcSource(env);
        if (singleSink) {
            streamSource.sinkTo(buildDorisSink());
        } else {
            SingleOutputStreamOperator<Void> parsedStream =
                    streamSource.process(buildProcessFunction());
            for (Tuple2<String, String> dbTbl : dorisTables) {
                OutputTag<String> recordOutputTag =
                        ParsingProcessFunction.createRecordOutputTag(dbTbl.f0, dbTbl.f1);
                DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag);
                int sinkParallel =
                        sinkConfig.getInteger(
                                DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism());
                String uidName = getUidName(targetDbSet, dbTbl);
                sideOutput
                        .sinkTo(buildDorisSink(dbTbl.f0 + "." + dbTbl.f1))
                        .setParallelism(sinkParallel)
                        .name(uidName)
                        .uid(uidName);
            }
        }
        return true;
    }