public void processElement()

in flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java [95:175]


  public void processElement(Trigger value, Context ctx, Collector<PlannedGroup> out)
      throws Exception {
    LOG.info(
        DataFileRewritePlanner.MESSAGE_PREFIX + "Creating rewrite plan",
        tableName,
        taskName,
        taskIndex,
        ctx.timestamp());
    try {
      SerializableTable table =
          (SerializableTable) SerializableTable.copyOf(tableLoader.loadTable());
      if (table.currentSnapshot() == null) {
        LOG.info(
            DataFileRewritePlanner.MESSAGE_PREFIX + "Nothing to plan for in an empty table",
            tableName,
            taskName,
            taskIndex,
            ctx.timestamp());
        return;
      }

      BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
      planner.init(rewriterOptions);

      FileRewritePlan<RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup>
          plan = planner.plan();

      long rewriteBytes = 0;
      List<RewriteFileGroup> groups = Lists.newArrayList();
      for (CloseableIterator<RewriteFileGroup> groupIterator = plan.groups().iterator();
          groupIterator.hasNext(); ) {
        RewriteFileGroup group = groupIterator.next();
        if (rewriteBytes + group.inputFilesSizeInBytes() > maxRewriteBytes) {
          // Keep going, maybe some other group might fit in
          LOG.info(
              DataFileRewritePlanner.MESSAGE_PREFIX
                  + "Skipping group as max rewrite size reached {}",
              tableName,
              taskName,
              taskIndex,
              ctx.timestamp(),
              group);
        } else {
          rewriteBytes += group.inputFilesSizeInBytes();
          groups.add(group);
        }
      }

      int groupsPerCommit =
          IntMath.divide(groups.size(), partialProgressMaxCommits, RoundingMode.CEILING);

      LOG.info(
          DataFileRewritePlanner.MESSAGE_PREFIX + "Rewrite plan created {}",
          tableName,
          taskName,
          taskIndex,
          ctx.timestamp(),
          groups);

      for (RewriteFileGroup group : groups) {
        LOG.info(
            DataFileRewritePlanner.MESSAGE_PREFIX + "Emitting {}",
            tableName,
            taskName,
            taskIndex,
            ctx.timestamp(),
            group);
        out.collect(new PlannedGroup(table, groupsPerCommit, group));
      }
    } catch (Exception e) {
      LOG.warn(
          DataFileRewritePlanner.MESSAGE_PREFIX + "Failed to plan data file rewrite groups",
          tableName,
          taskName,
          taskIndex,
          ctx.timestamp(),
          e);
      ctx.output(TaskResultAggregator.ERROR_STREAM, e);
      errorCounter.inc();
    }
  }