public void executeMigrate()

in paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java [157:277]


    public void executeMigrate() throws Exception {
        List<TableSchema> paimonSchemas = icebergSchemasToPaimonSchemas(icebergMetadata);
        Preconditions.checkArgument(
                !paimonSchemas.isEmpty(),
                "paimon schemas transformed from iceberg table is empty.");
        Identifier paimonIdentifier = Identifier.create(paimonDatabaseName, paimonTableName);

        paimonCatalog.createDatabase(paimonDatabaseName, true);
        TableSchema firstSchema = paimonSchemas.get(0);
        Preconditions.checkArgument(firstSchema.id() == 0, "Unexpected, first schema id is not 0.");
        paimonCatalog.createTable(paimonIdentifier, firstSchema.toSchema(), false);

        try {
            FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
            FileIO fileIO = paimonTable.fileIO();
            SchemaManager schemaManager = paimonTable.schemaManager();
            // commit all the iceberg schemas
            for (int i = 1; i < paimonSchemas.size(); i++) {
                LOG.info(
                        "commit new schema from iceberg, new schema id:{}",
                        paimonSchemas.get(i).id());
                schemaManager.commit(paimonSchemas.get(i));
            }

            IcebergManifestFile manifestFile =
                    IcebergManifestFile.create(paimonTable, icebergMetaPathFactory);
            IcebergManifestList manifestList =
                    IcebergManifestList.create(paimonTable, icebergMetaPathFactory);

            List<IcebergManifestFileMeta> icebergManifestFileMetas =
                    manifestList.read(icebergMetadata.currentSnapshot().manifestList());

            // check manifest file with 'DELETE' kind
            checkAndFilterManifestFiles(icebergManifestFileMetas);

            Map<Long, List<IcebergManifestEntry>> icebergEntries = new HashMap<>();
            for (IcebergManifestFileMeta icebergManifestFileMeta : icebergManifestFileMetas) {
                long schemaId =
                        getSchemaIdFromIcebergManifestFile(
                                new Path(icebergManifestFileMeta.manifestPath()), fileIO);
                List<IcebergManifestEntry> entries = manifestFile.read(icebergManifestFileMeta);
                icebergEntries
                        .computeIfAbsent(schemaId, v -> new ArrayList<>())
                        .addAll(
                                entries.stream()
                                        .filter(IcebergManifestEntry::isLive)
                                        .collect(Collectors.toList()));
            }

            List<IcebergDataFileMeta> icebergDataFileMetas = new ArrayList<>();
            // write schema id to IcebergDataFileMeta
            for (Map.Entry<Long, List<IcebergManifestEntry>> kv : icebergEntries.entrySet()) {
                icebergDataFileMetas.addAll(
                        kv.getValue().stream()
                                .map(entry -> entry.file().withSchemaId(kv.getKey()))
                                .collect(Collectors.toList()));
            }

            if (icebergDataFileMetas.isEmpty()) {
                LOG.info(
                        "No live iceberg data files in iceberg table for snapshot {}, iceberg table meta path is {}.",
                        icebergMetadata.currentSnapshotId(),
                        icebergLatestMetadataLocation);
                return;
            }
            // Again, check if delete File exists
            checkAndFilterDataFiles(icebergDataFileMetas);

            LOG.info(
                    "Begin to create Migrate Task, the number of iceberg data files is {}",
                    icebergDataFileMetas.size());

            List<MigrateTask> tasks = new ArrayList<>();
            Map<Path, Path> rollback = new ConcurrentHashMap<>();
            if (paimonTable.partitionKeys().isEmpty()) {
                tasks.add(importUnPartitionedTable(icebergDataFileMetas, paimonTable, rollback));
            } else {
                tasks.addAll(importPartitionedTable(icebergDataFileMetas, paimonTable, rollback));
            }

            List<Future<CommitMessage>> futures =
                    tasks.stream().map(executor::submit).collect(Collectors.toList());
            List<CommitMessage> commitMessages = new ArrayList<>();
            try {
                for (Future<CommitMessage> future : futures) {
                    commitMessages.add(future.get());
                }
            } catch (Exception e) {
                futures.forEach(f -> f.cancel(true));
                for (Future<?> future : futures) {
                    // wait all task cancelled or finished
                    while (!future.isDone()) {
                        //noinspection BusyWait
                        Thread.sleep(100);
                    }
                }
                // roll back all renamed path
                for (Map.Entry<Path, Path> entry : rollback.entrySet()) {
                    Path newPath = entry.getKey();
                    Path origin = entry.getValue();
                    if (fileIO.exists(newPath)) {
                        fileIO.rename(newPath, origin);
                    }
                }

                throw new RuntimeException("Migrating failed because exception happens", e);
            }
            try (BatchTableCommit commit = paimonTable.newBatchWriteBuilder().newCommit()) {
                commit.commit(new ArrayList<>(commitMessages));
                LOG.info("paimon commit success! Iceberg data files have been migrated to paimon.");
            }
        } catch (Exception e) {
            paimonCatalog.dropTable(paimonIdentifier, true);
            throw new RuntimeException("Migrating failed", e);
        }

        // if all success, drop the origin table according the delete field
        if (deleteOriginTable) {
            icebergMigrateMetadata.deleteOriginTable();
        }
    }