in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java [179:284]
public DataFileMeta process(BinaryRow partition, int bucket, ManifestEntry manifestEntry)
throws IOException {
DataFileMeta dataFileMeta = manifestEntry.file();
DataFilePathFactory dataFilePathFactory = pathFactories.get(partition, bucket);
SchemaInfo schemaInfo = schemaInfoCache.schemaInfo(dataFileMeta.schemaId());
List<String> extras = new ArrayList<>(dataFileMeta.extraFiles());
List<String> indexFiles =
dataFileMeta.extraFiles().stream()
.filter(name -> name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
.collect(Collectors.toList());
extras.removeAll(indexFiles);
Path newIndexPath;
Map<String, Map<String, byte[]>> maintainers;
// load
if (!indexFiles.isEmpty()) {
String indexFile = indexFiles.get(0);
try (FileIndexFormat.Reader indexReader =
FileIndexFormat.createReader(
fileIO.newInputStream(
dataFilePathFactory.toAlignedPath(indexFile, dataFileMeta)),
schemaInfo.fileSchema)) {
maintainers = indexReader.readAll();
}
newIndexPath =
createNewFileIndexFilePath(
dataFilePathFactory.toAlignedPath(indexFile, dataFileMeta));
} else {
maintainers = new HashMap<>();
newIndexPath = dataFileToFileIndexPath(dataFilePathFactory.toPath(dataFileMeta));
}
// remove unnecessary
for (Map.Entry<String, Map<String, byte[]>> entry :
new HashSet<>(maintainers.entrySet())) {
String name = entry.getKey();
if (!schemaInfo.projectedColFullNames.contains(name)) {
maintainers.remove(name);
} else {
Map<String, byte[]> indexTypeBytes = maintainers.get(name);
for (String indexType : entry.getValue().keySet()) {
if (!indexTypeBytes.containsKey(indexType)) {
indexTypeBytes.remove(indexType);
}
}
}
}
// ignore close, do not close to write file, only collect serialized maintainers
@SuppressWarnings("resource")
DataFileIndexWriter dataFileIndexWriter =
DataFileIndexWriter.create(
fileIO,
newIndexPath,
schemaInfo.fileSchema.project(schemaInfo.projectedIndexCols),
fileIndexOptions,
schemaInfo.colNameMapping);
if (dataFileIndexWriter != null) {
try (RecordReader<InternalRow> reader =
table.newReadBuilder()
.withProjection(schemaInfo.projectedIndexCols)
.newRead()
.createReader(
DataSplit.builder()
.withPartition(partition)
.withBucket(bucket)
.withBucketPath(
pathFactory
.bucketPath(partition, bucket)
.toString())
.withTotalBuckets(manifestEntry.totalBuckets())
.withDataFiles(
Collections.singletonList(dataFileMeta))
.rawConvertible(true)
.build())) {
reader.forEachRemaining(dataFileIndexWriter::write);
}
dataFileIndexWriter
.serializeMaintainers()
.forEach(
(key, value) ->
maintainers
.computeIfAbsent(key, k -> new HashMap<>())
.putAll(value));
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (FileIndexFormat.Writer indexWriter = FileIndexFormat.createWriter(baos)) {
if (!maintainers.isEmpty()) {
indexWriter.writeColumnIndexes(maintainers);
}
}
if (baos.size() > sizeInMeta) {
try (OutputStream outputStream = fileIO.newOutputStream(newIndexPath, true)) {
outputStream.write(baos.toByteArray());
}
extras.add(newIndexPath.getName());
return dataFileMeta.copy(extras);
} else if (baos.size() == 0) {
return dataFileMeta.copy(extras);
} else {
return dataFileMeta.copy(baos.toByteArray());
}
}