in flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java [575:688]
private DataStream<RowData> distributeDataStream(
DataStream<RowData> input,
List<Integer> equalityFieldIds,
RowType flinkRowType,
int writerParallelism) {
DistributionMode writeMode = flinkWriteConf.distributionMode();
LOG.info("Write distribution mode is '{}'", writeMode.modeName());
Schema iSchema = table.schema();
PartitionSpec partitionSpec = table.spec();
SortOrder sortOrder = table.sortOrder();
switch (writeMode) {
case NONE:
if (equalityFieldIds.isEmpty()) {
return input;
} else {
LOG.info("Distribute rows by equality fields, because there are equality fields set");
return input.keyBy(
new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
}
case HASH:
if (equalityFieldIds.isEmpty()) {
if (partitionSpec.isUnpartitioned()) {
LOG.warn(
"Fallback to use 'none' distribution mode, because there are no equality fields set "
+ "and table is unpartitioned");
return input;
} else {
return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
}
} else {
if (partitionSpec.isUnpartitioned()) {
LOG.info(
"Distribute rows by equality fields, because there are equality fields set "
+ "and table is unpartitioned");
return input.keyBy(
new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
} else {
for (PartitionField partitionField : partitionSpec.fields()) {
Preconditions.checkState(
equalityFieldIds.contains(partitionField.sourceId()),
"In 'hash' distribution mode with equality fields set, partition field '%s' "
+ "should be included in equality fields: '%s'",
partitionField,
equalityFieldColumns);
}
return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
}
}
case RANGE:
// Ideally, exception should be thrown in the combination of range distribution and
// equality fields. Primary key case should use hash distribution mode.
// Keep the current behavior of falling back to keyBy for backward compatibility.
if (!equalityFieldIds.isEmpty()) {
LOG.warn(
"Hash distribute rows by equality fields, even though {}=range is set. "
+ "Range distribution for primary keys are not always safe in "
+ "Flink streaming writer.",
WRITE_DISTRIBUTION_MODE);
return input.keyBy(
new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
}
// range distribute by partition key or sort key if table has an SortOrder
Preconditions.checkState(
sortOrder.isSorted() || partitionSpec.isPartitioned(),
"Invalid write distribution mode: range. Need to define sort order or partition spec.");
if (sortOrder.isUnsorted()) {
sortOrder = Partitioning.sortOrderFor(partitionSpec);
LOG.info("Construct sort order from partition spec");
}
LOG.info("Range distribute rows by sort order: {}", sortOrder);
StatisticsOrRecordTypeInformation statisticsOrRecordTypeInformation =
new StatisticsOrRecordTypeInformation(flinkRowType, iSchema, sortOrder);
StatisticsType statisticsType = flinkWriteConf.rangeDistributionStatisticsType();
SingleOutputStreamOperator<StatisticsOrRecord> shuffleStream =
input
.transform(
operatorName("range-shuffle"),
statisticsOrRecordTypeInformation,
new DataStatisticsOperatorFactory(
iSchema,
sortOrder,
writerParallelism,
statisticsType,
flinkWriteConf.rangeDistributionSortKeyBaseWeight()))
// Set the parallelism same as input operator to encourage chaining
.setParallelism(input.getParallelism());
if (uidPrefix != null) {
shuffleStream = shuffleStream.uid(uidPrefix + "-shuffle");
}
return shuffleStream
.partitionCustom(new RangePartitioner(iSchema, sortOrder), r -> r)
.flatMap(
(FlatMapFunction<StatisticsOrRecord, RowData>)
(statisticsOrRecord, out) -> {
if (statisticsOrRecord.hasRecord()) {
out.collect(statisticsOrRecord.record());
}
})
// Set the parallelism same as writerParallelism to
// promote operator chaining with the downstream writer operator
.setParallelism(writerParallelism)
.returns(RowData.class);
default:
throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode);
}
}