spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java [77:178]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private List<DataFile> rewriteDataForTask(CombinedScanTask task) throws Exception {
    TaskContext context = TaskContext.get();
    int partitionId = context.partitionId();
    long taskId = context.taskAttemptId();

    Table table = tableBroadcast.value();
    Schema schema = table.schema();
    Map<String, String> properties = table.properties();

    RowDataReader dataReader = new RowDataReader(task, table, schema, caseSensitive);

    StructType structType = SparkSchemaUtil.convert(schema);
    SparkAppenderFactory appenderFactory =
        SparkAppenderFactory.builderFor(table, schema, structType).spec(spec).build();
    OutputFileFactory fileFactory =
        OutputFileFactory.builderFor(table, partitionId, taskId)
            .defaultSpec(spec)
            .format(format)
            .build();

    TaskWriter<InternalRow> writer;
    if (spec.isUnpartitioned()) {
      writer =
          new UnpartitionedWriter<>(
              spec, format, appenderFactory, fileFactory, table.io(), Long.MAX_VALUE);
    } else if (PropertyUtil.propertyAsBoolean(
        properties,
        TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
        TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)) {
      writer =
          new SparkPartitionedFanoutWriter(
              spec,
              format,
              appenderFactory,
              fileFactory,
              table.io(),
              Long.MAX_VALUE,
              schema,
              structType);
    } else {
      writer =
          new SparkPartitionedWriter(
              spec,
              format,
              appenderFactory,
              fileFactory,
              table.io(),
              Long.MAX_VALUE,
              schema,
              structType);
    }

    try {
      while (dataReader.next()) {
        InternalRow row = dataReader.get();
        writer.write(row);
      }

      dataReader.close();
      dataReader = null;

      writer.close();
      return Lists.newArrayList(writer.dataFiles());

    } catch (Throwable originalThrowable) {
      try {
        LOG.error("Aborting task", originalThrowable);
        context.markTaskFailed(originalThrowable);

        LOG.error(
            "Aborting commit for partition {} (task {}, attempt {}, stage {}.{})",
            partitionId,
            taskId,
            context.attemptNumber(),
            context.stageId(),
            context.stageAttemptNumber());
        if (dataReader != null) {
          dataReader.close();
        }
        writer.abort();
        LOG.error(
            "Aborted commit for partition {} (task {}, attempt {}, stage {}.{})",
            partitionId,
            taskId,
            context.taskAttemptId(),
            context.stageId(),
            context.stageAttemptNumber());

      } catch (Throwable inner) {
        if (originalThrowable != inner) {
          originalThrowable.addSuppressed(inner);
          LOG.warn("Suppressing exception in catch: {}", inner.getMessage(), inner);
        }
      }

      if (originalThrowable instanceof Exception) {
        throw originalThrowable;
      } else {
        throw new RuntimeException(originalThrowable);
      }
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java [77:178]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private List<DataFile> rewriteDataForTask(CombinedScanTask task) throws Exception {
    TaskContext context = TaskContext.get();
    int partitionId = context.partitionId();
    long taskId = context.taskAttemptId();

    Table table = tableBroadcast.value();
    Schema schema = table.schema();
    Map<String, String> properties = table.properties();

    RowDataReader dataReader = new RowDataReader(task, table, schema, caseSensitive);

    StructType structType = SparkSchemaUtil.convert(schema);
    SparkAppenderFactory appenderFactory =
        SparkAppenderFactory.builderFor(table, schema, structType).spec(spec).build();
    OutputFileFactory fileFactory =
        OutputFileFactory.builderFor(table, partitionId, taskId)
            .defaultSpec(spec)
            .format(format)
            .build();

    TaskWriter<InternalRow> writer;
    if (spec.isUnpartitioned()) {
      writer =
          new UnpartitionedWriter<>(
              spec, format, appenderFactory, fileFactory, table.io(), Long.MAX_VALUE);
    } else if (PropertyUtil.propertyAsBoolean(
        properties,
        TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
        TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)) {
      writer =
          new SparkPartitionedFanoutWriter(
              spec,
              format,
              appenderFactory,
              fileFactory,
              table.io(),
              Long.MAX_VALUE,
              schema,
              structType);
    } else {
      writer =
          new SparkPartitionedWriter(
              spec,
              format,
              appenderFactory,
              fileFactory,
              table.io(),
              Long.MAX_VALUE,
              schema,
              structType);
    }

    try {
      while (dataReader.next()) {
        InternalRow row = dataReader.get();
        writer.write(row);
      }

      dataReader.close();
      dataReader = null;

      writer.close();
      return Lists.newArrayList(writer.dataFiles());

    } catch (Throwable originalThrowable) {
      try {
        LOG.error("Aborting task", originalThrowable);
        context.markTaskFailed(originalThrowable);

        LOG.error(
            "Aborting commit for partition {} (task {}, attempt {}, stage {}.{})",
            partitionId,
            taskId,
            context.attemptNumber(),
            context.stageId(),
            context.stageAttemptNumber());
        if (dataReader != null) {
          dataReader.close();
        }
        writer.abort();
        LOG.error(
            "Aborted commit for partition {} (task {}, attempt {}, stage {}.{})",
            partitionId,
            taskId,
            context.taskAttemptId(),
            context.stageId(),
            context.stageAttemptNumber());

      } catch (Throwable inner) {
        if (originalThrowable != inner) {
          originalThrowable.addSuppressed(inner);
          LOG.warn("Suppressing exception in catch: {}", inner.getMessage(), inner);
        }
      }

      if (originalThrowable instanceof Exception) {
        throw originalThrowable;
      } else {
        throw new RuntimeException(originalThrowable);
      }
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



