public void processElement()

in flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java [88:170]


  public void processElement(PlannedGroup value, Context ctx, Collector<ExecutedGroup> out)
      throws Exception {
    if (LOG.isDebugEnabled()) {
      LOG.debug(
          DataFileRewritePlanner.MESSAGE_PREFIX + "Rewriting files for group {} with files: {}",
          tableName,
          taskName,
          taskIndex,
          ctx.timestamp(),
          value.group().info(),
          value.group().rewrittenFiles());
    } else {
      LOG.info(
          DataFileRewritePlanner.MESSAGE_PREFIX
              + "Rewriting files for group {} with {} number of files",
          tableName,
          taskName,
          taskIndex,
          ctx.timestamp(),
          value.group().info(),
          value.group().rewrittenFiles().size());
    }

    try (TaskWriter<RowData> writer = writerFor(value)) {
      try (DataIterator<RowData> iterator = readerFor(value)) {
        while (iterator.hasNext()) {
          writer.write(iterator.next());
        }

        Set<DataFile> dataFiles = Sets.newHashSet(writer.dataFiles());
        value.group().setOutputFiles(dataFiles);
        out.collect(
            new ExecutedGroup(
                value.table().currentSnapshot().snapshotId(),
                value.groupsPerCommit(),
                value.group()));
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              DataFileRewritePlanner.MESSAGE_PREFIX + "Rewritten files {} from {} to {}",
              tableName,
              taskName,
              taskIndex,
              ctx.timestamp(),
              value.group().info(),
              value.group().rewrittenFiles(),
              value.group().addedFiles());
        } else {
          LOG.info(
              DataFileRewritePlanner.MESSAGE_PREFIX + "Rewritten {} files to {} files",
              tableName,
              taskName,
              taskIndex,
              ctx.timestamp(),
              value.group().rewrittenFiles().size(),
              value.group().addedFiles().size());
        }
      } catch (Exception ex) {
        LOG.info(
            DataFileRewritePlanner.MESSAGE_PREFIX + "Exception rewriting datafile group {}",
            tableName,
            taskName,
            taskIndex,
            ctx.timestamp(),
            value.group(),
            ex);
        ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
        errorCounter.inc();
        abort(writer, ctx.timestamp());
      }
    } catch (Exception ex) {
      LOG.info(
          DataFileRewritePlanner.MESSAGE_PREFIX
              + "Exception creating compaction writer for group {}",
          tableName,
          taskName,
          taskIndex,
          ctx.timestamp(),
          value.group(),
          ex);
      ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
      errorCounter.inc();
    }
  }