public SinkRuntimeProvider getSinkRuntimeProvider()

in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java [129:207]


    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        int[] targetColumnIndexes = null;
        // skip applying partial-updates for UPDATE command as the Context#targetColumns
        // is not correct, see FLINK-36736
        if (!appliedUpdates
                && context.getTargetColumns().isPresent()
                // when no columns specified in insert into, the length of target columns
                // is 0, when no column specified, it's not partial update
                // see FLINK-36000
                && context.getTargetColumns().get().length != 0) {
            // is partial update, check whether partial update is supported or not
            if (context.getTargetColumns().get().length != tableRowType.getFieldCount()) {
                if (primaryKeyIndexes.length == 0) {
                    throw new ValidationException(
                            "Fluss table sink does not support partial updates for table without primary key. Please make sure the "
                                    + "number of specified columns in INSERT INTO matches columns of the Fluss table.");
                }
                if (mergeEngineType != null) {
                    throw new ValidationException(
                            String.format(
                                    "Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the "
                                            + "number of specified columns in INSERT INTO matches columns of the Fluss table.",
                                    tablePath, mergeEngineType));
                }
                int[][] targetColumns = context.getTargetColumns().get();
                targetColumnIndexes = new int[targetColumns.length];
                for (int i = 0; i < targetColumns.length; i++) {
                    int[] column = targetColumns[i];
                    if (column.length != 1) {
                        throw new ValidationException(
                                "Fluss sink table doesn't support partial updates for nested columns.");
                    }
                    targetColumnIndexes[i] = column[0];
                }
                // check the target column contains the primary key columns
                for (int primaryKeyIndex : primaryKeyIndexes) {
                    if (Arrays.stream(targetColumnIndexes)
                            .noneMatch(targetColumIndex -> targetColumIndex == primaryKeyIndex)) {
                        throw new ValidationException(
                                String.format(
                                        "Fluss table sink does not support partial updates without fully specifying the primary key columns. "
                                                + "The insert columns are %s, but the primary key columns are %s. "
                                                + "Please make sure the specified columns in INSERT INTO contains "
                                                + "the primary key columns.",
                                        columns(targetColumnIndexes), columns(primaryKeyIndexes)));
                    }
                }
            }
            // else, it's full update, ignore the given target columns as we don't care the order
        }

        FlinkSink.SinkWriterBuilder<? extends FlinkSinkWriter> flinkSinkWriterBuilder =
                (primaryKeyIndexes.length > 0)
                        ? new FlinkSink.UpsertSinkWriterBuilder(
                                tablePath,
                                flussConfig,
                                tableRowType,
                                targetColumnIndexes,
                                ignoreDelete,
                                numBucket,
                                bucketKeys,
                                partitionKeys,
                                lakeFormat,
                                shuffleByBucketId)
                        : new FlinkSink.AppendSinkWriterBuilder(
                                tablePath,
                                flussConfig,
                                tableRowType,
                                ignoreDelete,
                                numBucket,
                                bucketKeys,
                                partitionKeys,
                                lakeFormat,
                                shuffleByBucketId);

        FlinkSink flinkSink = new FlinkSink(flinkSinkWriterBuilder);

        return SinkV2Provider.of(flinkSink);
    }