public SinkRuntimeProvider getSinkRuntimeProvider()

in flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java [86:140]


    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        Properties loadProperties = executionOptions.getStreamLoadProp();
        boolean deletable =
                executionOptions.getDeletable()
                        && RestService.isUniqueKeyType(options, readOptions, LOG);
        if (!loadProperties.containsKey(COLUMNS_KEY)) {
            String[] fieldNames = tableSchema.getFieldNames();
            Preconditions.checkState(fieldNames != null && fieldNames.length > 0);
            String columns =
                    String.join(
                            ",",
                            Arrays.stream(fieldNames)
                                    .map(
                                            item ->
                                                    String.format(
                                                            "`%s`", item.trim().replace("`", "")))
                                    .collect(Collectors.toList()));
            if (deletable) {
                columns = String.format("%s,%s", columns, DORIS_DELETE_SIGN);
            }
            loadProperties.put(COLUMNS_KEY, columns);
        }

        RowDataSerializer.Builder serializerBuilder = RowDataSerializer.builder();
        serializerBuilder
                .setFieldNames(tableSchema.getFieldNames())
                .setFieldType(tableSchema.getFieldDataTypes())
                .setType(loadProperties.getProperty(FORMAT_KEY, CSV))
                .enableDelete(deletable)
                .setFieldDelimiter(
                        loadProperties.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT));

        DorisSink.Builder<RowData> dorisSinkBuilder = DorisSink.builder();
        dorisSinkBuilder
                .setDorisOptions(options)
                .setDorisReadOptions(readOptions)
                .setDorisExecutionOptions(executionOptions)
                .setSerializer(serializerBuilder.build());
        DorisSink<RowData> dorisSink = dorisSinkBuilder.build();

        // for insert overwrite
        if (overwrite) {
            if (context.isBounded()) {
                // execute jdbc query to truncate table
                Preconditions.checkArgument(
                        options.getJdbcUrl() != null, "jdbc-url is required for Overwrite mode.");
                // todo: should be written to a temporary table first,
                // and then use GlobalCommitter to perform the rename.
                truncateTable();
            } else {
                throw new IllegalStateException("Streaming mode not support overwrite.");
            }
        }
        return SinkV2Provider.of(dorisSink, sinkParallelism);
    }