in paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java [157:277]
public void executeMigrate() throws Exception {
List<TableSchema> paimonSchemas = icebergSchemasToPaimonSchemas(icebergMetadata);
Preconditions.checkArgument(
!paimonSchemas.isEmpty(),
"paimon schemas transformed from iceberg table is empty.");
Identifier paimonIdentifier = Identifier.create(paimonDatabaseName, paimonTableName);
paimonCatalog.createDatabase(paimonDatabaseName, true);
TableSchema firstSchema = paimonSchemas.get(0);
Preconditions.checkArgument(firstSchema.id() == 0, "Unexpected, first schema id is not 0.");
paimonCatalog.createTable(paimonIdentifier, firstSchema.toSchema(), false);
try {
FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
FileIO fileIO = paimonTable.fileIO();
SchemaManager schemaManager = paimonTable.schemaManager();
// commit all the iceberg schemas
for (int i = 1; i < paimonSchemas.size(); i++) {
LOG.info(
"commit new schema from iceberg, new schema id:{}",
paimonSchemas.get(i).id());
schemaManager.commit(paimonSchemas.get(i));
}
IcebergManifestFile manifestFile =
IcebergManifestFile.create(paimonTable, icebergMetaPathFactory);
IcebergManifestList manifestList =
IcebergManifestList.create(paimonTable, icebergMetaPathFactory);
List<IcebergManifestFileMeta> icebergManifestFileMetas =
manifestList.read(icebergMetadata.currentSnapshot().manifestList());
// check manifest file with 'DELETE' kind
checkAndFilterManifestFiles(icebergManifestFileMetas);
Map<Long, List<IcebergManifestEntry>> icebergEntries = new HashMap<>();
for (IcebergManifestFileMeta icebergManifestFileMeta : icebergManifestFileMetas) {
long schemaId =
getSchemaIdFromIcebergManifestFile(
new Path(icebergManifestFileMeta.manifestPath()), fileIO);
List<IcebergManifestEntry> entries = manifestFile.read(icebergManifestFileMeta);
icebergEntries
.computeIfAbsent(schemaId, v -> new ArrayList<>())
.addAll(
entries.stream()
.filter(IcebergManifestEntry::isLive)
.collect(Collectors.toList()));
}
List<IcebergDataFileMeta> icebergDataFileMetas = new ArrayList<>();
// write schema id to IcebergDataFileMeta
for (Map.Entry<Long, List<IcebergManifestEntry>> kv : icebergEntries.entrySet()) {
icebergDataFileMetas.addAll(
kv.getValue().stream()
.map(entry -> entry.file().withSchemaId(kv.getKey()))
.collect(Collectors.toList()));
}
if (icebergDataFileMetas.isEmpty()) {
LOG.info(
"No live iceberg data files in iceberg table for snapshot {}, iceberg table meta path is {}.",
icebergMetadata.currentSnapshotId(),
icebergLatestMetadataLocation);
return;
}
// Again, check if delete File exists
checkAndFilterDataFiles(icebergDataFileMetas);
LOG.info(
"Begin to create Migrate Task, the number of iceberg data files is {}",
icebergDataFileMetas.size());
List<MigrateTask> tasks = new ArrayList<>();
Map<Path, Path> rollback = new ConcurrentHashMap<>();
if (paimonTable.partitionKeys().isEmpty()) {
tasks.add(importUnPartitionedTable(icebergDataFileMetas, paimonTable, rollback));
} else {
tasks.addAll(importPartitionedTable(icebergDataFileMetas, paimonTable, rollback));
}
List<Future<CommitMessage>> futures =
tasks.stream().map(executor::submit).collect(Collectors.toList());
List<CommitMessage> commitMessages = new ArrayList<>();
try {
for (Future<CommitMessage> future : futures) {
commitMessages.add(future.get());
}
} catch (Exception e) {
futures.forEach(f -> f.cancel(true));
for (Future<?> future : futures) {
// wait all task cancelled or finished
while (!future.isDone()) {
//noinspection BusyWait
Thread.sleep(100);
}
}
// roll back all renamed path
for (Map.Entry<Path, Path> entry : rollback.entrySet()) {
Path newPath = entry.getKey();
Path origin = entry.getValue();
if (fileIO.exists(newPath)) {
fileIO.rename(newPath, origin);
}
}
throw new RuntimeException("Migrating failed because exception happens", e);
}
try (BatchTableCommit commit = paimonTable.newBatchWriteBuilder().newCommit()) {
commit.commit(new ArrayList<>(commitMessages));
LOG.info("paimon commit success! Iceberg data files have been migrated to paimon.");
}
} catch (Exception e) {
paimonCatalog.dropTable(paimonIdentifier, true);
throw new RuntimeException("Migrating failed", e);
}
// if all success, drop the origin table according the delete field
if (deleteOriginTable) {
icebergMigrateMetadata.deleteOriginTable();
}
}