in flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java [95:175]
public void processElement(Trigger value, Context ctx, Collector<PlannedGroup> out)
throws Exception {
LOG.info(
DataFileRewritePlanner.MESSAGE_PREFIX + "Creating rewrite plan",
tableName,
taskName,
taskIndex,
ctx.timestamp());
try {
SerializableTable table =
(SerializableTable) SerializableTable.copyOf(tableLoader.loadTable());
if (table.currentSnapshot() == null) {
LOG.info(
DataFileRewritePlanner.MESSAGE_PREFIX + "Nothing to plan for in an empty table",
tableName,
taskName,
taskIndex,
ctx.timestamp());
return;
}
BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(rewriterOptions);
FileRewritePlan<RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup>
plan = planner.plan();
long rewriteBytes = 0;
List<RewriteFileGroup> groups = Lists.newArrayList();
for (CloseableIterator<RewriteFileGroup> groupIterator = plan.groups().iterator();
groupIterator.hasNext(); ) {
RewriteFileGroup group = groupIterator.next();
if (rewriteBytes + group.inputFilesSizeInBytes() > maxRewriteBytes) {
// Keep going, maybe some other group might fit in
LOG.info(
DataFileRewritePlanner.MESSAGE_PREFIX
+ "Skipping group as max rewrite size reached {}",
tableName,
taskName,
taskIndex,
ctx.timestamp(),
group);
} else {
rewriteBytes += group.inputFilesSizeInBytes();
groups.add(group);
}
}
int groupsPerCommit =
IntMath.divide(groups.size(), partialProgressMaxCommits, RoundingMode.CEILING);
LOG.info(
DataFileRewritePlanner.MESSAGE_PREFIX + "Rewrite plan created {}",
tableName,
taskName,
taskIndex,
ctx.timestamp(),
groups);
for (RewriteFileGroup group : groups) {
LOG.info(
DataFileRewritePlanner.MESSAGE_PREFIX + "Emitting {}",
tableName,
taskName,
taskIndex,
ctx.timestamp(),
group);
out.collect(new PlannedGroup(table, groupsPerCommit, group));
}
} catch (Exception e) {
LOG.warn(
DataFileRewritePlanner.MESSAGE_PREFIX + "Failed to plan data file rewrite groups",
tableName,
taskName,
taskIndex,
ctx.timestamp(),
e);
ctx.output(TaskResultAggregator.ERROR_STREAM, e);
errorCounter.inc();
}
}