in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java [129:207]
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
int[] targetColumnIndexes = null;
// skip applying partial-updates for UPDATE command as the Context#targetColumns
// is not correct, see FLINK-36736
if (!appliedUpdates
&& context.getTargetColumns().isPresent()
// when no columns specified in insert into, the length of target columns
// is 0, when no column specified, it's not partial update
// see FLINK-36000
&& context.getTargetColumns().get().length != 0) {
// is partial update, check whether partial update is supported or not
if (context.getTargetColumns().get().length != tableRowType.getFieldCount()) {
if (primaryKeyIndexes.length == 0) {
throw new ValidationException(
"Fluss table sink does not support partial updates for table without primary key. Please make sure the "
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.");
}
if (mergeEngineType != null) {
throw new ValidationException(
String.format(
"Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the "
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.",
tablePath, mergeEngineType));
}
int[][] targetColumns = context.getTargetColumns().get();
targetColumnIndexes = new int[targetColumns.length];
for (int i = 0; i < targetColumns.length; i++) {
int[] column = targetColumns[i];
if (column.length != 1) {
throw new ValidationException(
"Fluss sink table doesn't support partial updates for nested columns.");
}
targetColumnIndexes[i] = column[0];
}
// check the target column contains the primary key columns
for (int primaryKeyIndex : primaryKeyIndexes) {
if (Arrays.stream(targetColumnIndexes)
.noneMatch(targetColumIndex -> targetColumIndex == primaryKeyIndex)) {
throw new ValidationException(
String.format(
"Fluss table sink does not support partial updates without fully specifying the primary key columns. "
+ "The insert columns are %s, but the primary key columns are %s. "
+ "Please make sure the specified columns in INSERT INTO contains "
+ "the primary key columns.",
columns(targetColumnIndexes), columns(primaryKeyIndexes)));
}
}
}
// else, it's full update, ignore the given target columns as we don't care the order
}
FlinkSink.SinkWriterBuilder<? extends FlinkSinkWriter> flinkSinkWriterBuilder =
(primaryKeyIndexes.length > 0)
? new FlinkSink.UpsertSinkWriterBuilder(
tablePath,
flussConfig,
tableRowType,
targetColumnIndexes,
ignoreDelete,
numBucket,
bucketKeys,
partitionKeys,
lakeFormat,
shuffleByBucketId)
: new FlinkSink.AppendSinkWriterBuilder(
tablePath,
flussConfig,
tableRowType,
ignoreDelete,
numBucket,
bucketKeys,
partitionKeys,
lakeFormat,
shuffleByBucketId);
FlinkSink flinkSink = new FlinkSink(flinkSinkWriterBuilder);
return SinkV2Provider.of(flinkSink);
}