public DataStream doOrphanClean()

in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java [90:324]


    public DataStream<CleanOrphanFilesResult> doOrphanClean(StreamExecutionEnvironment env) {
        Configuration flinkConf = new Configuration();
        flinkConf.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
        flinkConf.set(ExecutionOptions.SORT_INPUTS, false);
        flinkConf.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, false);
        if (parallelism != null) {
            flinkConf.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
        }
        // Flink 1.17 introduced this config, use string to keep compatibility
        flinkConf.setString("execution.batch.adaptive.auto-parallelism.enabled", "false");
        env.configure(flinkConf);
        LOG.info("Starting orphan files clean for table {}", table.name());
        long start = System.currentTimeMillis();
        List<String> branches = validBranches();
        LOG.info(
                "End orphan files validBranches: spend [{}] ms",
                System.currentTimeMillis() - start);

        // snapshot and changelog files are the root of everything, so they are handled specially
        // here, and subsequently, we will not count their orphan files.
        DataStream<CleanOrphanFilesResult> branchSnapshotDirDeleted =
                env.fromCollection(branches)
                        .process(
                                new ProcessFunction<String, Tuple2<Long, Long>>() {
                                    @Override
                                    public void processElement(
                                            String branch,
                                            ProcessFunction<String, Tuple2<Long, Long>>.Context ctx,
                                            Collector<Tuple2<Long, Long>> out) {
                                        AtomicLong deletedFilesCount = new AtomicLong(0);
                                        AtomicLong deletedFilesLenInBytes = new AtomicLong(0);
                                        cleanBranchSnapshotDir(
                                                branch,
                                                path -> deletedFilesCount.incrementAndGet(),
                                                deletedFilesLenInBytes::addAndGet);
                                        out.collect(
                                                new Tuple2<>(
                                                        deletedFilesCount.get(),
                                                        deletedFilesLenInBytes.get()));
                                    }
                                })
                        .keyBy(tuple -> 1)
                        .reduce(
                                (ReduceFunction<Tuple2<Long, Long>>)
                                        (value1, value2) ->
                                                new Tuple2<>(
                                                        value1.f0 + value2.f0,
                                                        value1.f1 + value2.f1))
                        .setParallelism(1)
                        .map(tuple -> new CleanOrphanFilesResult(tuple.f0, tuple.f1));

        // branch and manifest file
        final OutputTag<Tuple2<String, String>> manifestOutputTag =
                new OutputTag<Tuple2<String, String>>("manifest-output") {};

        SingleOutputStreamOperator<String> usedManifestFiles =
                env.fromCollection(branches)
                        .process(
                                new ProcessFunction<String, Tuple2<String, String>>() {
                                    @Override
                                    public void processElement(
                                            String branch,
                                            ProcessFunction<String, Tuple2<String, String>>.Context
                                                    ctx,
                                            Collector<Tuple2<String, String>> out)
                                            throws Exception {
                                        for (Snapshot snapshot : safelyGetAllSnapshots(branch)) {
                                            out.collect(new Tuple2<>(branch, snapshot.toJson()));
                                        }
                                    }
                                })
                        .rebalance()
                        .process(
                                new ProcessFunction<Tuple2<String, String>, String>() {

                                    @Override
                                    public void processElement(
                                            Tuple2<String, String> branchAndSnapshot,
                                            ProcessFunction<Tuple2<String, String>, String>.Context
                                                    ctx,
                                            Collector<String> out)
                                            throws Exception {
                                        String branch = branchAndSnapshot.f0;
                                        Snapshot snapshot = Snapshot.fromJson(branchAndSnapshot.f1);
                                        Consumer<String> manifestConsumer =
                                                manifest -> {
                                                    Tuple2<String, String> tuple2 =
                                                            new Tuple2<>(branch, manifest);
                                                    ctx.output(manifestOutputTag, tuple2);
                                                };
                                        collectWithoutDataFile(
                                                branch, snapshot, out::collect, manifestConsumer);
                                    }
                                });

        DataStream<String> usedFiles =
                usedManifestFiles
                        .getSideOutput(manifestOutputTag)
                        .keyBy(tuple2 -> tuple2.f0 + ":" + tuple2.f1)
                        .transform(
                                "datafile-reader",
                                STRING_TYPE_INFO,
                                new BoundedOneInputOperator<Tuple2<String, String>, String>() {

                                    private final Set<Tuple2<String, String>> manifests =
                                            new HashSet<>();

                                    @Override
                                    public void processElement(
                                            StreamRecord<Tuple2<String, String>> element) {
                                        manifests.add(element.getValue());
                                    }

                                    @Override
                                    public void endInput() throws IOException {
                                        Map<String, ManifestFile> branchManifests = new HashMap<>();
                                        for (Tuple2<String, String> tuple2 : manifests) {
                                            ManifestFile manifestFile =
                                                    branchManifests.computeIfAbsent(
                                                            tuple2.f0,
                                                            key ->
                                                                    table.switchToBranch(key)
                                                                            .store()
                                                                            .manifestFileFactory()
                                                                            .create());
                                            retryReadingFiles(
                                                            () ->
                                                                    manifestFile
                                                                            .readWithIOException(
                                                                                    tuple2.f1),
                                                            Collections.<ManifestEntry>emptyList())
                                                    .forEach(
                                                            f -> {
                                                                List<String> files =
                                                                        new ArrayList<>();
                                                                files.add(f.fileName());
                                                                files.addAll(f.file().extraFiles());
                                                                files.forEach(
                                                                        file ->
                                                                                output.collect(
                                                                                        new StreamRecord<>(
                                                                                                file)));
                                                            });
                                        }
                                    }
                                });

        usedFiles = usedFiles.union(usedManifestFiles);
        DataStream<Tuple2<String, Long>> candidates =
                env.fromCollection(Collections.singletonList(1), TypeInformation.of(Integer.class))
                        .process(
                                new ProcessFunction<Integer, Tuple2<String, Long>>() {
                                    @Override
                                    public void processElement(
                                            Integer i,
                                            ProcessFunction<Integer, Tuple2<String, Long>>.Context
                                                    ctx,
                                            Collector<Tuple2<String, Long>> out) {
                                        listPaimonFilesForTable(out);
                                    }
                                })
                        .setParallelism(1);

        DataStream<CleanOrphanFilesResult> deleted =
                usedFiles
                        .keyBy(f -> f)
                        .connect(
                                candidates.keyBy(pathAndSize -> new Path(pathAndSize.f0).getName()))
                        .transform(
                                "files_join",
                                TypeInformation.of(CleanOrphanFilesResult.class),
                                new BoundedTwoInputOperator<
                                        String, Tuple2<String, Long>, CleanOrphanFilesResult>() {

                                    private boolean buildEnd;
                                    private long emittedFilesCount;
                                    private long emittedFilesLen;

                                    private final Set<String> used = new HashSet<>();

                                    @Override
                                    public InputSelection nextSelection() {
                                        return buildEnd
                                                ? InputSelection.SECOND
                                                : InputSelection.FIRST;
                                    }

                                    @Override
                                    public void endInput(int inputId) {
                                        switch (inputId) {
                                            case 1:
                                                checkState(!buildEnd, "Should not build ended.");
                                                LOG.info("Finish build phase.");
                                                buildEnd = true;
                                                break;
                                            case 2:
                                                checkState(buildEnd, "Should build ended.");
                                                LOG.info("Finish probe phase.");
                                                LOG.info(
                                                        "Clean files count : {}",
                                                        emittedFilesCount);
                                                LOG.info("Clean files size : {}", emittedFilesLen);
                                                output.collect(
                                                        new StreamRecord<>(
                                                                new CleanOrphanFilesResult(
                                                                        emittedFilesCount,
                                                                        emittedFilesLen)));
                                                break;
                                        }
                                    }

                                    @Override
                                    public void processElement1(StreamRecord<String> element) {
                                        used.add(element.getValue());
                                    }

                                    @Override
                                    public void processElement2(
                                            StreamRecord<Tuple2<String, Long>> element) {
                                        checkState(buildEnd, "Should build ended.");
                                        Tuple2<String, Long> fileInfo = element.getValue();
                                        String value = fileInfo.f0;
                                        Path path = new Path(value);
                                        if (!used.contains(path.getName())) {
                                            emittedFilesCount++;
                                            emittedFilesLen += fileInfo.f1;
                                            cleanFile(path);
                                            LOG.info("Dry clean: {}", path);
                                        }
                                    }
                                });
        deleted = deleted.union(branchSnapshotDirDeleted);

        return deleted;
    }