public void processElement()

in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java [92:240]


    public void processElement(
            Tuple2<String, String> tuple,
            ProcessFunction<Tuple2<String, String>, Void>.Context context,
            Collector<Void> collector)
            throws Exception {
        String sourceIdentifierStr = tuple.f0;
        Identifier sourceIdentifier = Identifier.fromString(sourceIdentifierStr);
        String targetIdentifierStr = tuple.f1;
        Identifier targetIdentifier = Identifier.fromString(targetIdentifierStr);

        FileStoreTable sourceTable = (FileStoreTable) sourceCatalog.getTable(sourceIdentifier);

        // 1. create target table
        targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true);
        targetCatalog.createTable(
                targetIdentifier, newSchemaFromTableSchema(sourceTable.schema()), true);
        FileStoreTable targetTable = (FileStoreTable) targetCatalog.getTable(targetIdentifier);

        // 2. copy all schema files
        SchemaManager sourceSchemaManager = sourceTable.schemaManager();
        SchemaManager targetSchemaManager = targetTable.schemaManager();
        FileIO sourceTableFileIO = sourceTable.fileIO();
        FileIO targetTableFileIO = targetTable.fileIO();
        for (long schemaId : sourceSchemaManager.listAllIds()) {
            IOUtils.copyBytes(
                    sourceTableFileIO.newInputStream(sourceSchemaManager.toSchemaPath(schemaId)),
                    targetTableFileIO.newOutputStream(
                            targetSchemaManager.toSchemaPath(schemaId), true));
        }

        // 3. copy latest snapshot files
        FileStore<?> sourceStore = sourceTable.store();
        FileStore<?> targetStore = targetTable.store();
        SnapshotManager sourceSnapshotManager = sourceStore.snapshotManager();
        SnapshotManager targetSnapshotManager = targetStore.snapshotManager();
        Snapshot latestSnapshot = sourceSnapshotManager.latestSnapshot();
        if (latestSnapshot != null) {
            long snapshotId = latestSnapshot.id();
            IOUtils.copyBytes(
                    sourceTableFileIO.newInputStream(
                            sourceSnapshotManager.snapshotPath(snapshotId)),
                    targetTableFileIO.newOutputStream(
                            targetSnapshotManager.snapshotPath(snapshotId), true));
        }

        FileStorePathFactory sourcePathFactory = sourceStore.pathFactory();
        FileStorePathFactory targetPathFactory = targetStore.pathFactory();
        // 4. copy manifest list files
        if (latestSnapshot != null) {
            IOUtils.copyBytes(
                    sourceTableFileIO.newInputStream(
                            sourcePathFactory.toManifestListPath(
                                    latestSnapshot.baseManifestList())),
                    targetTableFileIO.newOutputStream(
                            targetPathFactory.toManifestListPath(latestSnapshot.baseManifestList()),
                            true));

            IOUtils.copyBytes(
                    sourceTableFileIO.newInputStream(
                            sourcePathFactory.toManifestListPath(
                                    latestSnapshot.deltaManifestList())),
                    targetTableFileIO.newOutputStream(
                            targetPathFactory.toManifestListPath(
                                    latestSnapshot.deltaManifestList()),
                            true));

            String changelogManifestList = latestSnapshot.changelogManifestList();
            if (changelogManifestList != null) {
                IOUtils.copyBytes(
                        sourceTableFileIO.newInputStream(
                                sourcePathFactory.toManifestListPath(changelogManifestList)),
                        targetTableFileIO.newOutputStream(
                                targetPathFactory.toManifestListPath(changelogManifestList), true));
            }
        }

        // 5. copy index manifest files
        List<CopyFileInfo> indexFiles = new ArrayList<>();
        if (latestSnapshot != null) {
            IndexFileHandler indexFileHandler = sourceStore.newIndexFileHandler();
            String indexManifest = latestSnapshot.indexManifest();
            if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) {
                IOUtils.copyBytes(
                        sourceTableFileIO.newInputStream(
                                sourcePathFactory.indexManifestFileFactory().toPath(indexManifest)),
                        targetTableFileIO.newOutputStream(
                                targetPathFactory.indexManifestFileFactory().toPath(indexManifest),
                                true));

                // read index files
                List<IndexManifestEntry> indexManifestEntries =
                        CopyFilesUtil.retryReadingFiles(
                                () -> indexFileHandler.readManifestWithIOException(indexManifest));

                List<Path> indexFileList = new ArrayList<>();
                if (indexManifestEntries != null) {
                    indexManifestEntries.stream()
                            .map(IndexManifestEntry::indexFile)
                            .map(indexFileHandler::filePath)
                            .forEach(indexFileList::add);
                }

                indexFiles =
                        CopyFilesUtil.toCopyFileInfos(
                                indexFileList,
                                sourceTable.location(),
                                sourceIdentifierStr,
                                targetIdentifierStr);
                for (CopyFileInfo info : indexFiles) {
                    context.output(INDEX_FILES_TAG, info);
                }
            }
        }

        // 6. copy statistics file
        if (latestSnapshot != null && latestSnapshot.statistics() != null) {
            IOUtils.copyBytes(
                    sourceTableFileIO.newInputStream(
                            sourcePathFactory
                                    .statsFileFactory()
                                    .toPath(latestSnapshot.statistics())),
                    targetTableFileIO.newOutputStream(
                            targetPathFactory
                                    .statsFileFactory()
                                    .toPath(latestSnapshot.statistics()),
                            true));
        }

        // pick manifest files
        List<CopyFileInfo> dataManifestFiles = new ArrayList<>();
        if (latestSnapshot != null) {
            List<Path> list =
                    CopyFilesUtil.getManifestUsedFilesForSnapshot(sourceTable, latestSnapshot.id());
            dataManifestFiles =
                    CopyFilesUtil.toCopyFileInfos(
                            list, sourceTable.location(), sourceIdentifierStr, targetIdentifierStr);
        }

        for (CopyFileInfo info : dataManifestFiles) {
            context.output(DATA_MANIFEST_FILES_TAG, info);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(
                    "The CopyFileInfo of table {} is: indexFiles={}, dataManifestFiles={}",
                    sourceTable.location(),
                    indexFiles,
                    dataManifestFiles);
        }
    }