private StarRocksSinkOptions buildSinkConnectorOptions()

in flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java [62:147]


    private StarRocksSinkOptions buildSinkConnectorOptions(Configuration cdcConfig) {
        org.apache.flink.configuration.Configuration sinkConfig =
                new org.apache.flink.configuration.Configuration();
        // required sink configurations
        sinkConfig.set(
                StarRocksSinkOptions.JDBC_URL, cdcConfig.get(StarRocksDataSinkOptions.JDBC_URL));
        sinkConfig.set(
                StarRocksSinkOptions.LOAD_URL, cdcConfig.get(StarRocksDataSinkOptions.LOAD_URL));
        sinkConfig.set(
                StarRocksSinkOptions.USERNAME, cdcConfig.get(StarRocksDataSinkOptions.USERNAME));
        sinkConfig.set(
                StarRocksSinkOptions.PASSWORD, cdcConfig.get(StarRocksDataSinkOptions.PASSWORD));
        // optional sink configurations
        cdcConfig
                .getOptional(StarRocksDataSinkOptions.SINK_LABEL_PREFIX)
                .ifPresent(
                        config -> sinkConfig.set(StarRocksSinkOptions.SINK_LABEL_PREFIX, config));
        cdcConfig
                .getOptional(StarRocksDataSinkOptions.SINK_CONNECT_TIMEOUT)
                .ifPresent(
                        config ->
                                sinkConfig.set(StarRocksSinkOptions.SINK_CONNECT_TIMEOUT, config));

        cdcConfig
                .getOptional(StarRocksDataSinkOptions.SINK_SOCKET_TIMEOUT)
                .ifPresent(
                        config -> sinkConfig.set(StarRocksSinkOptions.SINK_SOCKET_TIMEOUT, config));
        cdcConfig
                .getOptional(StarRocksDataSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT)
                .ifPresent(
                        config ->
                                sinkConfig.set(
                                        StarRocksSinkOptions.SINK_WAIT_FOR_CONTINUE_TIMEOUT,
                                        config));
        cdcConfig
                .getOptional(StarRocksDataSinkOptions.SINK_BATCH_MAX_SIZE)
                .ifPresent(
                        config -> sinkConfig.set(StarRocksSinkOptions.SINK_BATCH_MAX_SIZE, config));
        cdcConfig
                .getOptional(StarRocksDataSinkOptions.SINK_BATCH_FLUSH_INTERVAL)
                .ifPresent(
                        config ->
                                sinkConfig.set(
                                        StarRocksSinkOptions.SINK_BATCH_FLUSH_INTERVAL, config));
        cdcConfig
                .getOptional(StarRocksDataSinkOptions.SINK_SCAN_FREQUENCY)
                .ifPresent(
                        config -> sinkConfig.set(StarRocksSinkOptions.SINK_SCAN_FREQUENCY, config));
        cdcConfig
                .getOptional(StarRocksDataSinkOptions.SINK_IO_THREAD_COUNT)
                .ifPresent(
                        config ->
                                sinkConfig.set(StarRocksSinkOptions.SINK_IO_THREAD_COUNT, config));
        cdcConfig
                .getOptional(StarRocksDataSinkOptions.SINK_AT_LEAST_ONCE_USE_TRANSACTION_LOAD)
                .ifPresent(
                        config ->
                                sinkConfig.set(
                                        StarRocksSinkOptions
                                                .SINK_AT_LEAST_ONCE_USE_TRANSACTION_LOAD,
                                        config));
        cdcConfig
                .getOptional(StarRocksDataSinkOptions.SINK_METRIC_HISTOGRAM_WINDOW_SIZE)
                .ifPresent(
                        config ->
                                sinkConfig.set(
                                        StarRocksSinkOptions.SINK_METRIC_HISTOGRAM_WINDOW_SIZE,
                                        config));
        // specified sink configurations for cdc scenario
        sinkConfig.set(StarRocksSinkOptions.DATABASE_NAME, "*");
        sinkConfig.set(StarRocksSinkOptions.TABLE_NAME, "*");
        sinkConfig.set(StarRocksSinkOptions.SINK_USE_NEW_SINK_API, true);
        // currently cdc framework only supports at-least-once
        sinkConfig.set(StarRocksSinkOptions.SINK_SEMANTIC, "at-least-once");

        Map<String, String> streamProperties =
                getPrefixConfigs(cdcConfig.toMap(), SINK_PROPERTIES_PREFIX);
        // force to use json format for stream load to simplify the configuration,
        // such as there is no need to reconfigure the "columns" property after
        // schema change. csv format can be supported in the future if needed
        streamProperties.put("sink.properties.format", "json");
        streamProperties.put("sink.properties.strip_outer_array", "true");
        streamProperties.put("sink.properties.ignore_json_size", "true");

        return new StarRocksSinkOptions(sinkConfig, streamProperties);
    }