private DataStream distributeDataStream()

in flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java [561:674]


    private DataStream<RowData> distributeDataStream(
        DataStream<RowData> input,
        List<Integer> equalityFieldIds,
        RowType flinkRowType,
        int writerParallelism) {
      DistributionMode writeMode = flinkWriteConf.distributionMode();
      LOG.info("Write distribution mode is '{}'", writeMode.modeName());

      Schema iSchema = table.schema();
      PartitionSpec partitionSpec = table.spec();
      SortOrder sortOrder = table.sortOrder();

      switch (writeMode) {
        case NONE:
          if (equalityFieldIds.isEmpty()) {
            return input;
          } else {
            LOG.info("Distribute rows by equality fields, because there are equality fields set");
            return input.keyBy(
                new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
          }

        case HASH:
          if (equalityFieldIds.isEmpty()) {
            if (partitionSpec.isUnpartitioned()) {
              LOG.warn(
                  "Fallback to use 'none' distribution mode, because there are no equality fields set "
                      + "and table is unpartitioned");
              return input;
            } else {
              return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
            }
          } else {
            if (partitionSpec.isUnpartitioned()) {
              LOG.info(
                  "Distribute rows by equality fields, because there are equality fields set "
                      + "and table is unpartitioned");
              return input.keyBy(
                  new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
            } else {
              for (PartitionField partitionField : partitionSpec.fields()) {
                Preconditions.checkState(
                    equalityFieldIds.contains(partitionField.sourceId()),
                    "In 'hash' distribution mode with equality fields set, partition field '%s' "
                        + "should be included in equality fields: '%s'",
                    partitionField,
                    equalityFieldColumns);
              }
              return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
            }
          }

        case RANGE:
          // Ideally, exception should be thrown in the combination of range distribution and
          // equality fields. Primary key case should use hash distribution mode.
          // Keep the current behavior of falling back to keyBy for backward compatibility.
          if (!equalityFieldIds.isEmpty()) {
            LOG.warn(
                "Hash distribute rows by equality fields, even though {}=range is set. "
                    + "Range distribution for primary keys are not always safe in "
                    + "Flink streaming writer.",
                WRITE_DISTRIBUTION_MODE);
            return input.keyBy(
                new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
          }

          // range distribute by partition key or sort key if table has an SortOrder
          Preconditions.checkState(
              sortOrder.isSorted() || partitionSpec.isPartitioned(),
              "Invalid write distribution mode: range. Need to define sort order or partition spec.");
          if (sortOrder.isUnsorted()) {
            sortOrder = Partitioning.sortOrderFor(partitionSpec);
            LOG.info("Construct sort order from partition spec");
          }

          LOG.info("Range distribute rows by sort order: {}", sortOrder);
          StatisticsOrRecordTypeInformation statisticsOrRecordTypeInformation =
              new StatisticsOrRecordTypeInformation(flinkRowType, iSchema, sortOrder);
          StatisticsType statisticsType = flinkWriteConf.rangeDistributionStatisticsType();
          SingleOutputStreamOperator<StatisticsOrRecord> shuffleStream =
              input
                  .transform(
                      operatorName("range-shuffle"),
                      statisticsOrRecordTypeInformation,
                      new DataStatisticsOperatorFactory(
                          iSchema,
                          sortOrder,
                          writerParallelism,
                          statisticsType,
                          flinkWriteConf.rangeDistributionSortKeyBaseWeight()))
                  // Set the parallelism same as input operator to encourage chaining
                  .setParallelism(input.getParallelism());
          if (uidPrefix != null) {
            shuffleStream = shuffleStream.uid(uidPrefix + "-shuffle");
          }

          return shuffleStream
              .partitionCustom(new RangePartitioner(iSchema, sortOrder), r -> r)
              .flatMap(
                  (FlatMapFunction<StatisticsOrRecord, RowData>)
                      (statisticsOrRecord, out) -> {
                        if (statisticsOrRecord.hasRecord()) {
                          out.collect(statisticsOrRecord.record());
                        }
                      })
              // Set the parallelism same as writerParallelism to
              // promote operator chaining with the downstream writer operator
              .setParallelism(writerParallelism)
              .returns(RowData.class);

        default:
          throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode);
      }
    }