in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java [90:324]
public DataStream<CleanOrphanFilesResult> doOrphanClean(StreamExecutionEnvironment env) {
Configuration flinkConf = new Configuration();
flinkConf.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
flinkConf.set(ExecutionOptions.SORT_INPUTS, false);
flinkConf.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, false);
if (parallelism != null) {
flinkConf.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
}
// Flink 1.17 introduced this config, use string to keep compatibility
flinkConf.setString("execution.batch.adaptive.auto-parallelism.enabled", "false");
env.configure(flinkConf);
LOG.info("Starting orphan files clean for table {}", table.name());
long start = System.currentTimeMillis();
List<String> branches = validBranches();
LOG.info(
"End orphan files validBranches: spend [{}] ms",
System.currentTimeMillis() - start);
// snapshot and changelog files are the root of everything, so they are handled specially
// here, and subsequently, we will not count their orphan files.
DataStream<CleanOrphanFilesResult> branchSnapshotDirDeleted =
env.fromCollection(branches)
.process(
new ProcessFunction<String, Tuple2<Long, Long>>() {
@Override
public void processElement(
String branch,
ProcessFunction<String, Tuple2<Long, Long>>.Context ctx,
Collector<Tuple2<Long, Long>> out) {
AtomicLong deletedFilesCount = new AtomicLong(0);
AtomicLong deletedFilesLenInBytes = new AtomicLong(0);
cleanBranchSnapshotDir(
branch,
path -> deletedFilesCount.incrementAndGet(),
deletedFilesLenInBytes::addAndGet);
out.collect(
new Tuple2<>(
deletedFilesCount.get(),
deletedFilesLenInBytes.get()));
}
})
.keyBy(tuple -> 1)
.reduce(
(ReduceFunction<Tuple2<Long, Long>>)
(value1, value2) ->
new Tuple2<>(
value1.f0 + value2.f0,
value1.f1 + value2.f1))
.setParallelism(1)
.map(tuple -> new CleanOrphanFilesResult(tuple.f0, tuple.f1));
// branch and manifest file
final OutputTag<Tuple2<String, String>> manifestOutputTag =
new OutputTag<Tuple2<String, String>>("manifest-output") {};
SingleOutputStreamOperator<String> usedManifestFiles =
env.fromCollection(branches)
.process(
new ProcessFunction<String, Tuple2<String, String>>() {
@Override
public void processElement(
String branch,
ProcessFunction<String, Tuple2<String, String>>.Context
ctx,
Collector<Tuple2<String, String>> out)
throws Exception {
for (Snapshot snapshot : safelyGetAllSnapshots(branch)) {
out.collect(new Tuple2<>(branch, snapshot.toJson()));
}
}
})
.rebalance()
.process(
new ProcessFunction<Tuple2<String, String>, String>() {
@Override
public void processElement(
Tuple2<String, String> branchAndSnapshot,
ProcessFunction<Tuple2<String, String>, String>.Context
ctx,
Collector<String> out)
throws Exception {
String branch = branchAndSnapshot.f0;
Snapshot snapshot = Snapshot.fromJson(branchAndSnapshot.f1);
Consumer<String> manifestConsumer =
manifest -> {
Tuple2<String, String> tuple2 =
new Tuple2<>(branch, manifest);
ctx.output(manifestOutputTag, tuple2);
};
collectWithoutDataFile(
branch, snapshot, out::collect, manifestConsumer);
}
});
DataStream<String> usedFiles =
usedManifestFiles
.getSideOutput(manifestOutputTag)
.keyBy(tuple2 -> tuple2.f0 + ":" + tuple2.f1)
.transform(
"datafile-reader",
STRING_TYPE_INFO,
new BoundedOneInputOperator<Tuple2<String, String>, String>() {
private final Set<Tuple2<String, String>> manifests =
new HashSet<>();
@Override
public void processElement(
StreamRecord<Tuple2<String, String>> element) {
manifests.add(element.getValue());
}
@Override
public void endInput() throws IOException {
Map<String, ManifestFile> branchManifests = new HashMap<>();
for (Tuple2<String, String> tuple2 : manifests) {
ManifestFile manifestFile =
branchManifests.computeIfAbsent(
tuple2.f0,
key ->
table.switchToBranch(key)
.store()
.manifestFileFactory()
.create());
retryReadingFiles(
() ->
manifestFile
.readWithIOException(
tuple2.f1),
Collections.<ManifestEntry>emptyList())
.forEach(
f -> {
List<String> files =
new ArrayList<>();
files.add(f.fileName());
files.addAll(f.file().extraFiles());
files.forEach(
file ->
output.collect(
new StreamRecord<>(
file)));
});
}
}
});
usedFiles = usedFiles.union(usedManifestFiles);
DataStream<Tuple2<String, Long>> candidates =
env.fromCollection(Collections.singletonList(1), TypeInformation.of(Integer.class))
.process(
new ProcessFunction<Integer, Tuple2<String, Long>>() {
@Override
public void processElement(
Integer i,
ProcessFunction<Integer, Tuple2<String, Long>>.Context
ctx,
Collector<Tuple2<String, Long>> out) {
listPaimonFilesForTable(out);
}
})
.setParallelism(1);
DataStream<CleanOrphanFilesResult> deleted =
usedFiles
.keyBy(f -> f)
.connect(
candidates.keyBy(pathAndSize -> new Path(pathAndSize.f0).getName()))
.transform(
"files_join",
TypeInformation.of(CleanOrphanFilesResult.class),
new BoundedTwoInputOperator<
String, Tuple2<String, Long>, CleanOrphanFilesResult>() {
private boolean buildEnd;
private long emittedFilesCount;
private long emittedFilesLen;
private final Set<String> used = new HashSet<>();
@Override
public InputSelection nextSelection() {
return buildEnd
? InputSelection.SECOND
: InputSelection.FIRST;
}
@Override
public void endInput(int inputId) {
switch (inputId) {
case 1:
checkState(!buildEnd, "Should not build ended.");
LOG.info("Finish build phase.");
buildEnd = true;
break;
case 2:
checkState(buildEnd, "Should build ended.");
LOG.info("Finish probe phase.");
LOG.info(
"Clean files count : {}",
emittedFilesCount);
LOG.info("Clean files size : {}", emittedFilesLen);
output.collect(
new StreamRecord<>(
new CleanOrphanFilesResult(
emittedFilesCount,
emittedFilesLen)));
break;
}
}
@Override
public void processElement1(StreamRecord<String> element) {
used.add(element.getValue());
}
@Override
public void processElement2(
StreamRecord<Tuple2<String, Long>> element) {
checkState(buildEnd, "Should build ended.");
Tuple2<String, Long> fileInfo = element.getValue();
String value = fileInfo.f0;
Path path = new Path(value);
if (!used.contains(path.getName())) {
emittedFilesCount++;
emittedFilesLen += fileInfo.f1;
cleanFile(path);
LOG.info("Dry clean: {}", path);
}
}
});
deleted = deleted.union(branchSnapshotDirDeleted);
return deleted;
}