spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java [81:461]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class RewriteManifestsSparkAction
    extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> implements RewriteManifests {

  public static final String USE_CACHING = "use-caching";
  public static final boolean USE_CACHING_DEFAULT = true;

  private static final Logger LOG = LoggerFactory.getLogger(RewriteManifestsSparkAction.class);

  private final Encoder<ManifestFile> manifestEncoder;
  private final Table table;
  private final int formatVersion;
  private final long targetManifestSizeBytes;

  private PartitionSpec spec = null;
  private Predicate<ManifestFile> predicate = manifest -> true;
  private String stagingLocation = null;

  RewriteManifestsSparkAction(SparkSession spark, Table table) {
    super(spark);
    this.manifestEncoder = Encoders.javaSerialization(ManifestFile.class);
    this.table = table;
    this.spec = table.spec();
    this.targetManifestSizeBytes =
        PropertyUtil.propertyAsLong(
            table.properties(),
            TableProperties.MANIFEST_TARGET_SIZE_BYTES,
            TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);

    // default the staging location to the metadata location
    TableOperations ops = ((HasTableOperations) table).operations();
    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
    this.stagingLocation = metadataFilePath.getParent().toString();

    // use the current table format version for new manifests
    this.formatVersion = ops.current().formatVersion();
  }

  @Override
  protected RewriteManifestsSparkAction self() {
    return this;
  }

  @Override
  public RewriteManifestsSparkAction specId(int specId) {
    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %s", specId);
    this.spec = table.specs().get(specId);
    return this;
  }

  @Override
  public RewriteManifestsSparkAction rewriteIf(Predicate<ManifestFile> newPredicate) {
    this.predicate = newPredicate;
    return this;
  }

  @Override
  public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) {
    this.stagingLocation = newStagingLocation;
    return this;
  }

  @Override
  public RewriteManifests.Result execute() {
    String desc =
        String.format(
            "Rewriting manifests (staging location=%s) of %s", stagingLocation, table.name());
    JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
    return withJobGroupInfo(info, this::doExecute);
  }

  private RewriteManifests.Result doExecute() {
    List<ManifestFile> matchingManifests = findMatchingManifests();
    if (matchingManifests.isEmpty()) {
      return ImmutableRewriteManifests.Result.builder()
          .addedManifests(ImmutableList.of())
          .rewrittenManifests(ImmutableList.of())
          .build();
    }

    long totalSizeBytes = 0L;
    int numEntries = 0;

    for (ManifestFile manifest : matchingManifests) {
      ValidationException.check(
          hasFileCounts(manifest), "No file counts in manifest: %s", manifest.path());

      totalSizeBytes += manifest.length();
      numEntries +=
          manifest.addedFilesCount() + manifest.existingFilesCount() + manifest.deletedFilesCount();
    }

    int targetNumManifests = targetNumManifests(totalSizeBytes);
    int targetNumManifestEntries = targetNumManifestEntries(numEntries, targetNumManifests);

    if (targetNumManifests == 1 && matchingManifests.size() == 1) {
      return ImmutableRewriteManifests.Result.builder()
          .addedManifests(ImmutableList.of())
          .rewrittenManifests(ImmutableList.of())
          .build();
    }

    Dataset<Row> manifestEntryDF = buildManifestEntryDF(matchingManifests);

    List<ManifestFile> newManifests;
    if (spec.fields().size() < 1) {
      newManifests = writeManifestsForUnpartitionedTable(manifestEntryDF, targetNumManifests);
    } else {
      newManifests =
          writeManifestsForPartitionedTable(
              manifestEntryDF, targetNumManifests, targetNumManifestEntries);
    }

    replaceManifests(matchingManifests, newManifests);

    return ImmutableRewriteManifests.Result.builder()
        .rewrittenManifests(matchingManifests)
        .addedManifests(newManifests)
        .build();
  }

  private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {
    Dataset<Row> manifestDF =
        spark()
            .createDataset(Lists.transform(manifests, ManifestFile::path), Encoders.STRING())
            .toDF("manifest");

    Dataset<Row> manifestEntryDF =
        loadMetadataTable(table, ENTRIES)
            .filter("status < 2") // select only live entries
            .selectExpr(
                "input_file_name() as manifest",
                "snapshot_id",
                "sequence_number",
                "file_sequence_number",
                "data_file");

    Column joinCond = manifestDF.col("manifest").equalTo(manifestEntryDF.col("manifest"));
    return manifestEntryDF
        .join(manifestDF, joinCond, "left_semi")
        .select("snapshot_id", "sequence_number", "file_sequence_number", "data_file");
  }

  private List<ManifestFile> writeManifestsForUnpartitionedTable(
      Dataset<Row> manifestEntryDF, int numManifests) {
    Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
    StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
    Types.StructType combinedPartitionType = Partitioning.partitionType(table);

    // we rely only on the target number of manifests for unpartitioned tables
    // as we should not worry about having too much metadata per partition
    long maxNumManifestEntries = Long.MAX_VALUE;

    return manifestEntryDF
        .repartition(numManifests)
        .mapPartitions(
            toManifests(
                tableBroadcast,
                maxNumManifestEntries,
                stagingLocation,
                formatVersion,
                combinedPartitionType,
                spec,
                sparkType),
            manifestEncoder)
        .collectAsList();
  }

  private List<ManifestFile> writeManifestsForPartitionedTable(
      Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {

    Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
    StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
    Types.StructType combinedPartitionType = Partitioning.partitionType(table);

    // we allow the actual size of manifests to be 10% higher if the estimation is not precise
    // enough
    long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries);

    return withReusableDS(
        manifestEntryDF,
        df -> {
          Column partitionColumn = df.col("data_file.partition");
          return df.repartitionByRange(numManifests, partitionColumn)
              .sortWithinPartitions(partitionColumn)
              .mapPartitions(
                  toManifests(
                      tableBroadcast,
                      maxNumManifestEntries,
                      stagingLocation,
                      formatVersion,
                      combinedPartitionType,
                      spec,
                      sparkType),
                  manifestEncoder)
              .collectAsList();
        });
  }

  private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
    Dataset<T> reusableDS;
    boolean useCaching =
        PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT);
    if (useCaching) {
      reusableDS = ds.cache();
    } else {
      int parallelism = SQLConf.get().numShufflePartitions();
      reusableDS =
          ds.repartition(parallelism).map((MapFunction<T, T>) value -> value, ds.exprEnc());
    }

    try {
      return func.apply(reusableDS);
    } finally {
      if (useCaching) {
        reusableDS.unpersist(false);
      }
    }
  }

  private List<ManifestFile> findMatchingManifests() {
    Snapshot currentSnapshot = table.currentSnapshot();

    if (currentSnapshot == null) {
      return ImmutableList.of();
    }

    return currentSnapshot.dataManifests(table.io()).stream()
        .filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
        .collect(Collectors.toList());
  }

  private int targetNumManifests(long totalSizeBytes) {
    return (int) ((totalSizeBytes + targetManifestSizeBytes - 1) / targetManifestSizeBytes);
  }

  private int targetNumManifestEntries(int numEntries, int numManifests) {
    return (numEntries + numManifests - 1) / numManifests;
  }

  private boolean hasFileCounts(ManifestFile manifest) {
    return manifest.addedFilesCount() != null
        && manifest.existingFilesCount() != null
        && manifest.deletedFilesCount() != null;
  }

  private void replaceManifests(
      Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> addedManifests) {
    try {
      boolean snapshotIdInheritanceEnabled =
          PropertyUtil.propertyAsBoolean(
              table.properties(),
              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);

      org.apache.iceberg.RewriteManifests rewriteManifests = table.rewriteManifests();
      deletedManifests.forEach(rewriteManifests::deleteManifest);
      addedManifests.forEach(rewriteManifests::addManifest);
      commit(rewriteManifests);

      if (!snapshotIdInheritanceEnabled) {
        // delete new manifests as they were rewritten before the commit
        deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
      }
    } catch (CommitStateUnknownException commitStateUnknownException) {
      // don't clean up added manifest files, because they may have been successfully committed.
      throw commitStateUnknownException;
    } catch (Exception e) {
      // delete all new manifests because the rewrite failed
      deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
      throw e;
    }
  }

  private void deleteFiles(Iterable<String> locations) {
    Tasks.foreach(locations)
        .executeWith(ThreadPools.getWorkerPool())
        .noRetry()
        .suppressFailureWhenFinished()
        .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
        .run(location -> table.io().deleteFile(location));
  }

  private static ManifestFile writeManifest(
      List<Row> rows,
      int startIndex,
      int endIndex,
      Broadcast<Table> tableBroadcast,
      String location,
      int format,
      Types.StructType combinedPartitionType,
      PartitionSpec spec,
      StructType sparkType)
      throws IOException {

    String manifestName = "optimized-m-" + UUID.randomUUID();
    Path manifestPath = new Path(location, manifestName);
    OutputFile outputFile =
        tableBroadcast
            .value()
            .io()
            .newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));

    Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
    Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
    SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);

    ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);

    try {
      for (int index = startIndex; index < endIndex; index++) {
        Row row = rows.get(index);
        long snapshotId = row.getLong(0);
        long sequenceNumber = row.getLong(1);
        Long fileSequenceNumber = row.isNullAt(2) ? null : row.getLong(2);
        Row file = row.getStruct(3);
        writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber, fileSequenceNumber);
      }
    } finally {
      writer.close();
    }

    return writer.toManifestFile();
  }

  private static MapPartitionsFunction<Row, ManifestFile> toManifests(
      Broadcast<Table> tableBroadcast,
      long maxNumManifestEntries,
      String location,
      int format,
      Types.StructType combinedPartitionType,
      PartitionSpec spec,
      StructType sparkType) {

    return rows -> {
      List<Row> rowsAsList = Lists.newArrayList(rows);

      if (rowsAsList.isEmpty()) {
        return Collections.emptyIterator();
      }

      List<ManifestFile> manifests = Lists.newArrayList();
      if (rowsAsList.size() <= maxNumManifestEntries) {
        manifests.add(
            writeManifest(
                rowsAsList,
                0,
                rowsAsList.size(),
                tableBroadcast,
                location,
                format,
                combinedPartitionType,
                spec,
                sparkType));
      } else {
        int midIndex = rowsAsList.size() / 2;
        manifests.add(
            writeManifest(
                rowsAsList,
                0,
                midIndex,
                tableBroadcast,
                location,
                format,
                combinedPartitionType,
                spec,
                sparkType));
        manifests.add(
            writeManifest(
                rowsAsList,
                midIndex,
                rowsAsList.size(),
                tableBroadcast,
                location,
                format,
                combinedPartitionType,
                spec,
                sparkType));
      }

      return manifests.iterator();
    };
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java [81:461]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class RewriteManifestsSparkAction
    extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> implements RewriteManifests {

  public static final String USE_CACHING = "use-caching";
  public static final boolean USE_CACHING_DEFAULT = true;

  private static final Logger LOG = LoggerFactory.getLogger(RewriteManifestsSparkAction.class);

  private final Encoder<ManifestFile> manifestEncoder;
  private final Table table;
  private final int formatVersion;
  private final long targetManifestSizeBytes;

  private PartitionSpec spec = null;
  private Predicate<ManifestFile> predicate = manifest -> true;
  private String stagingLocation = null;

  RewriteManifestsSparkAction(SparkSession spark, Table table) {
    super(spark);
    this.manifestEncoder = Encoders.javaSerialization(ManifestFile.class);
    this.table = table;
    this.spec = table.spec();
    this.targetManifestSizeBytes =
        PropertyUtil.propertyAsLong(
            table.properties(),
            TableProperties.MANIFEST_TARGET_SIZE_BYTES,
            TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);

    // default the staging location to the metadata location
    TableOperations ops = ((HasTableOperations) table).operations();
    Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
    this.stagingLocation = metadataFilePath.getParent().toString();

    // use the current table format version for new manifests
    this.formatVersion = ops.current().formatVersion();
  }

  @Override
  protected RewriteManifestsSparkAction self() {
    return this;
  }

  @Override
  public RewriteManifestsSparkAction specId(int specId) {
    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %s", specId);
    this.spec = table.specs().get(specId);
    return this;
  }

  @Override
  public RewriteManifestsSparkAction rewriteIf(Predicate<ManifestFile> newPredicate) {
    this.predicate = newPredicate;
    return this;
  }

  @Override
  public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) {
    this.stagingLocation = newStagingLocation;
    return this;
  }

  @Override
  public RewriteManifests.Result execute() {
    String desc =
        String.format(
            "Rewriting manifests (staging location=%s) of %s", stagingLocation, table.name());
    JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
    return withJobGroupInfo(info, this::doExecute);
  }

  private RewriteManifests.Result doExecute() {
    List<ManifestFile> matchingManifests = findMatchingManifests();
    if (matchingManifests.isEmpty()) {
      return ImmutableRewriteManifests.Result.builder()
          .addedManifests(ImmutableList.of())
          .rewrittenManifests(ImmutableList.of())
          .build();
    }

    long totalSizeBytes = 0L;
    int numEntries = 0;

    for (ManifestFile manifest : matchingManifests) {
      ValidationException.check(
          hasFileCounts(manifest), "No file counts in manifest: %s", manifest.path());

      totalSizeBytes += manifest.length();
      numEntries +=
          manifest.addedFilesCount() + manifest.existingFilesCount() + manifest.deletedFilesCount();
    }

    int targetNumManifests = targetNumManifests(totalSizeBytes);
    int targetNumManifestEntries = targetNumManifestEntries(numEntries, targetNumManifests);

    if (targetNumManifests == 1 && matchingManifests.size() == 1) {
      return ImmutableRewriteManifests.Result.builder()
          .addedManifests(ImmutableList.of())
          .rewrittenManifests(ImmutableList.of())
          .build();
    }

    Dataset<Row> manifestEntryDF = buildManifestEntryDF(matchingManifests);

    List<ManifestFile> newManifests;
    if (spec.fields().size() < 1) {
      newManifests = writeManifestsForUnpartitionedTable(manifestEntryDF, targetNumManifests);
    } else {
      newManifests =
          writeManifestsForPartitionedTable(
              manifestEntryDF, targetNumManifests, targetNumManifestEntries);
    }

    replaceManifests(matchingManifests, newManifests);

    return ImmutableRewriteManifests.Result.builder()
        .rewrittenManifests(matchingManifests)
        .addedManifests(newManifests)
        .build();
  }

  private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {
    Dataset<Row> manifestDF =
        spark()
            .createDataset(Lists.transform(manifests, ManifestFile::path), Encoders.STRING())
            .toDF("manifest");

    Dataset<Row> manifestEntryDF =
        loadMetadataTable(table, ENTRIES)
            .filter("status < 2") // select only live entries
            .selectExpr(
                "input_file_name() as manifest",
                "snapshot_id",
                "sequence_number",
                "file_sequence_number",
                "data_file");

    Column joinCond = manifestDF.col("manifest").equalTo(manifestEntryDF.col("manifest"));
    return manifestEntryDF
        .join(manifestDF, joinCond, "left_semi")
        .select("snapshot_id", "sequence_number", "file_sequence_number", "data_file");
  }

  private List<ManifestFile> writeManifestsForUnpartitionedTable(
      Dataset<Row> manifestEntryDF, int numManifests) {
    Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
    StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
    Types.StructType combinedPartitionType = Partitioning.partitionType(table);

    // we rely only on the target number of manifests for unpartitioned tables
    // as we should not worry about having too much metadata per partition
    long maxNumManifestEntries = Long.MAX_VALUE;

    return manifestEntryDF
        .repartition(numManifests)
        .mapPartitions(
            toManifests(
                tableBroadcast,
                maxNumManifestEntries,
                stagingLocation,
                formatVersion,
                combinedPartitionType,
                spec,
                sparkType),
            manifestEncoder)
        .collectAsList();
  }

  private List<ManifestFile> writeManifestsForPartitionedTable(
      Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {

    Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
    StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
    Types.StructType combinedPartitionType = Partitioning.partitionType(table);

    // we allow the actual size of manifests to be 10% higher if the estimation is not precise
    // enough
    long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries);

    return withReusableDS(
        manifestEntryDF,
        df -> {
          Column partitionColumn = df.col("data_file.partition");
          return df.repartitionByRange(numManifests, partitionColumn)
              .sortWithinPartitions(partitionColumn)
              .mapPartitions(
                  toManifests(
                      tableBroadcast,
                      maxNumManifestEntries,
                      stagingLocation,
                      formatVersion,
                      combinedPartitionType,
                      spec,
                      sparkType),
                  manifestEncoder)
              .collectAsList();
        });
  }

  private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
    Dataset<T> reusableDS;
    boolean useCaching =
        PropertyUtil.propertyAsBoolean(options(), USE_CACHING, USE_CACHING_DEFAULT);
    if (useCaching) {
      reusableDS = ds.cache();
    } else {
      int parallelism = SQLConf.get().numShufflePartitions();
      reusableDS =
          ds.repartition(parallelism).map((MapFunction<T, T>) value -> value, ds.exprEnc());
    }

    try {
      return func.apply(reusableDS);
    } finally {
      if (useCaching) {
        reusableDS.unpersist(false);
      }
    }
  }

  private List<ManifestFile> findMatchingManifests() {
    Snapshot currentSnapshot = table.currentSnapshot();

    if (currentSnapshot == null) {
      return ImmutableList.of();
    }

    return currentSnapshot.dataManifests(table.io()).stream()
        .filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
        .collect(Collectors.toList());
  }

  private int targetNumManifests(long totalSizeBytes) {
    return (int) ((totalSizeBytes + targetManifestSizeBytes - 1) / targetManifestSizeBytes);
  }

  private int targetNumManifestEntries(int numEntries, int numManifests) {
    return (numEntries + numManifests - 1) / numManifests;
  }

  private boolean hasFileCounts(ManifestFile manifest) {
    return manifest.addedFilesCount() != null
        && manifest.existingFilesCount() != null
        && manifest.deletedFilesCount() != null;
  }

  private void replaceManifests(
      Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> addedManifests) {
    try {
      boolean snapshotIdInheritanceEnabled =
          PropertyUtil.propertyAsBoolean(
              table.properties(),
              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);

      org.apache.iceberg.RewriteManifests rewriteManifests = table.rewriteManifests();
      deletedManifests.forEach(rewriteManifests::deleteManifest);
      addedManifests.forEach(rewriteManifests::addManifest);
      commit(rewriteManifests);

      if (!snapshotIdInheritanceEnabled) {
        // delete new manifests as they were rewritten before the commit
        deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
      }
    } catch (CommitStateUnknownException commitStateUnknownException) {
      // don't clean up added manifest files, because they may have been successfully committed.
      throw commitStateUnknownException;
    } catch (Exception e) {
      // delete all new manifests because the rewrite failed
      deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
      throw e;
    }
  }

  private void deleteFiles(Iterable<String> locations) {
    Tasks.foreach(locations)
        .executeWith(ThreadPools.getWorkerPool())
        .noRetry()
        .suppressFailureWhenFinished()
        .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
        .run(location -> table.io().deleteFile(location));
  }

  private static ManifestFile writeManifest(
      List<Row> rows,
      int startIndex,
      int endIndex,
      Broadcast<Table> tableBroadcast,
      String location,
      int format,
      Types.StructType combinedPartitionType,
      PartitionSpec spec,
      StructType sparkType)
      throws IOException {

    String manifestName = "optimized-m-" + UUID.randomUUID();
    Path manifestPath = new Path(location, manifestName);
    OutputFile outputFile =
        tableBroadcast
            .value()
            .io()
            .newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));

    Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
    Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
    SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);

    ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);

    try {
      for (int index = startIndex; index < endIndex; index++) {
        Row row = rows.get(index);
        long snapshotId = row.getLong(0);
        long sequenceNumber = row.getLong(1);
        Long fileSequenceNumber = row.isNullAt(2) ? null : row.getLong(2);
        Row file = row.getStruct(3);
        writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber, fileSequenceNumber);
      }
    } finally {
      writer.close();
    }

    return writer.toManifestFile();
  }

  private static MapPartitionsFunction<Row, ManifestFile> toManifests(
      Broadcast<Table> tableBroadcast,
      long maxNumManifestEntries,
      String location,
      int format,
      Types.StructType combinedPartitionType,
      PartitionSpec spec,
      StructType sparkType) {

    return rows -> {
      List<Row> rowsAsList = Lists.newArrayList(rows);

      if (rowsAsList.isEmpty()) {
        return Collections.emptyIterator();
      }

      List<ManifestFile> manifests = Lists.newArrayList();
      if (rowsAsList.size() <= maxNumManifestEntries) {
        manifests.add(
            writeManifest(
                rowsAsList,
                0,
                rowsAsList.size(),
                tableBroadcast,
                location,
                format,
                combinedPartitionType,
                spec,
                sparkType));
      } else {
        int midIndex = rowsAsList.size() / 2;
        manifests.add(
            writeManifest(
                rowsAsList,
                0,
                midIndex,
                tableBroadcast,
                location,
                format,
                combinedPartitionType,
                spec,
                sparkType));
        manifests.add(
            writeManifest(
                rowsAsList,
                midIndex,
                rowsAsList.size(),
                tableBroadcast,
                location,
                format,
                combinedPartitionType,
                spec,
                sparkType));
      }

      return manifests.iterator();
    };
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



