public SinkRuntimeProvider getSinkRuntimeProvider()

in flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java [81:116]


    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        Properties loadProperties = executionOptions.getStreamLoadProp();
        boolean deletable = executionOptions.getDeletable() && RestService.isUniqueKeyType(options, readOptions, LOG);
        if (!loadProperties.containsKey(COLUMNS_KEY)) {
            String[] fieldNames = tableSchema.getFieldNames();
            Preconditions.checkState(fieldNames != null && fieldNames.length > 0);
            String columns = String.join(",", Arrays.stream(fieldNames).map(item -> String.format("`%s`", item.trim().replace("`", ""))).collect(Collectors.toList()));
            if (deletable) {
                columns = String.format("%s,%s", columns, DORIS_DELETE_SIGN);
            }
            loadProperties.put(COLUMNS_KEY, columns);
        }

        RowDataSerializer.Builder serializerBuilder = RowDataSerializer.builder();
        serializerBuilder.setFieldNames(tableSchema.getFieldNames())
                .setFieldType(tableSchema.getFieldDataTypes())
                .setType(loadProperties.getProperty(FORMAT_KEY, CSV))
                .enableDelete(deletable)
                .setFieldDelimiter(loadProperties.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT));

        if(!executionOptions.enableBatchMode()){
            DorisSink.Builder<RowData> dorisSinkBuilder = DorisSink.builder();
            dorisSinkBuilder.setDorisOptions(options)
                    .setDorisReadOptions(readOptions)
                    .setDorisExecutionOptions(executionOptions)
                    .setSerializer(serializerBuilder.build());
            return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism);
        }else{
            DorisBatchSink.Builder<RowData> dorisBatchSinkBuilder = DorisBatchSink.builder();
            dorisBatchSinkBuilder.setDorisOptions(options)
                    .setDorisReadOptions(readOptions)
                    .setDorisExecutionOptions(executionOptions)
                    .setSerializer(serializerBuilder.build());
            return SinkV2Provider.of(dorisBatchSinkBuilder.build(), sinkParallelism);
        }
    }