in flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java [88:170]
public void processElement(PlannedGroup value, Context ctx, Collector<ExecutedGroup> out)
throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(
DataFileRewritePlanner.MESSAGE_PREFIX + "Rewriting files for group {} with files: {}",
tableName,
taskName,
taskIndex,
ctx.timestamp(),
value.group().info(),
value.group().rewrittenFiles());
} else {
LOG.info(
DataFileRewritePlanner.MESSAGE_PREFIX
+ "Rewriting files for group {} with {} number of files",
tableName,
taskName,
taskIndex,
ctx.timestamp(),
value.group().info(),
value.group().rewrittenFiles().size());
}
try (TaskWriter<RowData> writer = writerFor(value)) {
try (DataIterator<RowData> iterator = readerFor(value)) {
while (iterator.hasNext()) {
writer.write(iterator.next());
}
Set<DataFile> dataFiles = Sets.newHashSet(writer.dataFiles());
value.group().setOutputFiles(dataFiles);
out.collect(
new ExecutedGroup(
value.table().currentSnapshot().snapshotId(),
value.groupsPerCommit(),
value.group()));
if (LOG.isDebugEnabled()) {
LOG.debug(
DataFileRewritePlanner.MESSAGE_PREFIX + "Rewritten files {} from {} to {}",
tableName,
taskName,
taskIndex,
ctx.timestamp(),
value.group().info(),
value.group().rewrittenFiles(),
value.group().addedFiles());
} else {
LOG.info(
DataFileRewritePlanner.MESSAGE_PREFIX + "Rewritten {} files to {} files",
tableName,
taskName,
taskIndex,
ctx.timestamp(),
value.group().rewrittenFiles().size(),
value.group().addedFiles().size());
}
} catch (Exception ex) {
LOG.info(
DataFileRewritePlanner.MESSAGE_PREFIX + "Exception rewriting datafile group {}",
tableName,
taskName,
taskIndex,
ctx.timestamp(),
value.group(),
ex);
ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
errorCounter.inc();
abort(writer, ctx.timestamp());
}
} catch (Exception ex) {
LOG.info(
DataFileRewritePlanner.MESSAGE_PREFIX
+ "Exception creating compaction writer for group {}",
tableName,
taskName,
taskIndex,
ctx.timestamp(),
value.group(),
ex);
ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
errorCounter.inc();
}
}