public DorisSink buildDorisSink()

in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java [241:336]


    public DorisSink<String> buildDorisSink(String tableIdentifier) {
        String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
        String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
        String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
        String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
        String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL);

        DorisSink.Builder<String> builder = DorisSink.builder();
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder
                .setJdbcUrl(jdbcUrl)
                .setFenodes(fenodes)
                .setBenodes(benodes)
                .setUsername(user)
                .setPassword(passwd);
        sinkConfig
                .getOptional(DorisConfigOptions.AUTO_REDIRECT)
                .ifPresent(dorisBuilder::setAutoRedirect);

        // single sink not need table identifier
        if (!singleSink && !StringUtils.isNullOrWhitespaceOnly(tableIdentifier)) {
            dorisBuilder.setTableIdentifier(tableIdentifier);
        }

        Properties pro = new Properties();
        // default json data format
        pro.setProperty("format", "json");
        pro.setProperty("read_json_by_line", "true");
        // customer stream load properties
        Properties streamLoadProp = DorisConfigOptions.getStreamLoadProp(sinkConfig.toMap());
        pro.putAll(streamLoadProp);
        DorisExecutionOptions.Builder executionBuilder =
                DorisExecutionOptions.builder().setStreamLoadProp(pro);

        sinkConfig
                .getOptional(DorisConfigOptions.SINK_LABEL_PREFIX)
                .ifPresent(executionBuilder::setLabelPrefix);
        sinkConfig
                .getOptional(DorisConfigOptions.SINK_ENABLE_DELETE)
                .ifPresent(executionBuilder::setDeletable);
        sinkConfig
                .getOptional(DorisConfigOptions.SINK_BUFFER_COUNT)
                .ifPresent(executionBuilder::setBufferCount);
        sinkConfig
                .getOptional(DorisConfigOptions.SINK_BUFFER_SIZE)
                .ifPresent(v -> executionBuilder.setBufferSize((int) v.getBytes()));
        sinkConfig
                .getOptional(DorisConfigOptions.SINK_CHECK_INTERVAL)
                .ifPresent(v -> executionBuilder.setCheckInterval((int) v.toMillis()));
        sinkConfig
                .getOptional(DorisConfigOptions.SINK_MAX_RETRIES)
                .ifPresent(executionBuilder::setMaxRetries);
        sinkConfig
                .getOptional(DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE)
                .ifPresent(executionBuilder::setIgnoreUpdateBefore);

        if (!sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC)) {
            executionBuilder.disable2PC();
        } else if (sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_2PC).isPresent()) {
            // force open 2pc
            executionBuilder.enable2PC();
        }

        sinkConfig
                .getOptional(DorisConfigOptions.SINK_ENABLE_BATCH_MODE)
                .ifPresent(executionBuilder::setBatchMode);
        sinkConfig
                .getOptional(DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE)
                .ifPresent(executionBuilder::setFlushQueueSize);
        sinkConfig
                .getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS)
                .ifPresent(executionBuilder::setBufferFlushMaxRows);
        sinkConfig
                .getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES)
                .ifPresent(v -> executionBuilder.setBufferFlushMaxBytes((int) v.getBytes()));
        sinkConfig
                .getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL)
                .ifPresent(v -> executionBuilder.setBufferFlushIntervalMs(v.toMillis()));

        sinkConfig
                .getOptional(DorisConfigOptions.SINK_USE_CACHE)
                .ifPresent(executionBuilder::setUseCache);
        sinkConfig
                .getOptional(DorisConfigOptions.SINK_WRITE_MODE)
                .ifPresent(v -> executionBuilder.setWriteMode(WriteMode.of(v)));
        sinkConfig
                .getOptional(DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR)
                .ifPresent(executionBuilder::setIgnoreCommitError);

        DorisExecutionOptions executionOptions = executionBuilder.build();
        builder.setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisExecutionOptions(executionOptions)
                .setSerializer(buildSchemaSerializer(dorisBuilder, executionOptions))
                .setDorisOptions(dorisBuilder.build());
        return builder.build();
    }