in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java [92:240]
public void processElement(
Tuple2<String, String> tuple,
ProcessFunction<Tuple2<String, String>, Void>.Context context,
Collector<Void> collector)
throws Exception {
String sourceIdentifierStr = tuple.f0;
Identifier sourceIdentifier = Identifier.fromString(sourceIdentifierStr);
String targetIdentifierStr = tuple.f1;
Identifier targetIdentifier = Identifier.fromString(targetIdentifierStr);
FileStoreTable sourceTable = (FileStoreTable) sourceCatalog.getTable(sourceIdentifier);
// 1. create target table
targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true);
targetCatalog.createTable(
targetIdentifier, newSchemaFromTableSchema(sourceTable.schema()), true);
FileStoreTable targetTable = (FileStoreTable) targetCatalog.getTable(targetIdentifier);
// 2. copy all schema files
SchemaManager sourceSchemaManager = sourceTable.schemaManager();
SchemaManager targetSchemaManager = targetTable.schemaManager();
FileIO sourceTableFileIO = sourceTable.fileIO();
FileIO targetTableFileIO = targetTable.fileIO();
for (long schemaId : sourceSchemaManager.listAllIds()) {
IOUtils.copyBytes(
sourceTableFileIO.newInputStream(sourceSchemaManager.toSchemaPath(schemaId)),
targetTableFileIO.newOutputStream(
targetSchemaManager.toSchemaPath(schemaId), true));
}
// 3. copy latest snapshot files
FileStore<?> sourceStore = sourceTable.store();
FileStore<?> targetStore = targetTable.store();
SnapshotManager sourceSnapshotManager = sourceStore.snapshotManager();
SnapshotManager targetSnapshotManager = targetStore.snapshotManager();
Snapshot latestSnapshot = sourceSnapshotManager.latestSnapshot();
if (latestSnapshot != null) {
long snapshotId = latestSnapshot.id();
IOUtils.copyBytes(
sourceTableFileIO.newInputStream(
sourceSnapshotManager.snapshotPath(snapshotId)),
targetTableFileIO.newOutputStream(
targetSnapshotManager.snapshotPath(snapshotId), true));
}
FileStorePathFactory sourcePathFactory = sourceStore.pathFactory();
FileStorePathFactory targetPathFactory = targetStore.pathFactory();
// 4. copy manifest list files
if (latestSnapshot != null) {
IOUtils.copyBytes(
sourceTableFileIO.newInputStream(
sourcePathFactory.toManifestListPath(
latestSnapshot.baseManifestList())),
targetTableFileIO.newOutputStream(
targetPathFactory.toManifestListPath(latestSnapshot.baseManifestList()),
true));
IOUtils.copyBytes(
sourceTableFileIO.newInputStream(
sourcePathFactory.toManifestListPath(
latestSnapshot.deltaManifestList())),
targetTableFileIO.newOutputStream(
targetPathFactory.toManifestListPath(
latestSnapshot.deltaManifestList()),
true));
String changelogManifestList = latestSnapshot.changelogManifestList();
if (changelogManifestList != null) {
IOUtils.copyBytes(
sourceTableFileIO.newInputStream(
sourcePathFactory.toManifestListPath(changelogManifestList)),
targetTableFileIO.newOutputStream(
targetPathFactory.toManifestListPath(changelogManifestList), true));
}
}
// 5. copy index manifest files
List<CopyFileInfo> indexFiles = new ArrayList<>();
if (latestSnapshot != null) {
IndexFileHandler indexFileHandler = sourceStore.newIndexFileHandler();
String indexManifest = latestSnapshot.indexManifest();
if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) {
IOUtils.copyBytes(
sourceTableFileIO.newInputStream(
sourcePathFactory.indexManifestFileFactory().toPath(indexManifest)),
targetTableFileIO.newOutputStream(
targetPathFactory.indexManifestFileFactory().toPath(indexManifest),
true));
// read index files
List<IndexManifestEntry> indexManifestEntries =
CopyFilesUtil.retryReadingFiles(
() -> indexFileHandler.readManifestWithIOException(indexManifest));
List<Path> indexFileList = new ArrayList<>();
if (indexManifestEntries != null) {
indexManifestEntries.stream()
.map(IndexManifestEntry::indexFile)
.map(indexFileHandler::filePath)
.forEach(indexFileList::add);
}
indexFiles =
CopyFilesUtil.toCopyFileInfos(
indexFileList,
sourceTable.location(),
sourceIdentifierStr,
targetIdentifierStr);
for (CopyFileInfo info : indexFiles) {
context.output(INDEX_FILES_TAG, info);
}
}
}
// 6. copy statistics file
if (latestSnapshot != null && latestSnapshot.statistics() != null) {
IOUtils.copyBytes(
sourceTableFileIO.newInputStream(
sourcePathFactory
.statsFileFactory()
.toPath(latestSnapshot.statistics())),
targetTableFileIO.newOutputStream(
targetPathFactory
.statsFileFactory()
.toPath(latestSnapshot.statistics()),
true));
}
// pick manifest files
List<CopyFileInfo> dataManifestFiles = new ArrayList<>();
if (latestSnapshot != null) {
List<Path> list =
CopyFilesUtil.getManifestUsedFilesForSnapshot(sourceTable, latestSnapshot.id());
dataManifestFiles =
CopyFilesUtil.toCopyFileInfos(
list, sourceTable.location(), sourceIdentifierStr, targetIdentifierStr);
}
for (CopyFileInfo info : dataManifestFiles) {
context.output(DATA_MANIFEST_FILES_TAG, info);
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"The CopyFileInfo of table {} is: indexFiles={}, dataManifestFiles={}",
sourceTable.location(),
indexFiles,
dataManifestFiles);
}
}