public DataFileMeta process()

in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java [179:284]


        public DataFileMeta process(BinaryRow partition, int bucket, ManifestEntry manifestEntry)
                throws IOException {
            DataFileMeta dataFileMeta = manifestEntry.file();
            DataFilePathFactory dataFilePathFactory = pathFactories.get(partition, bucket);
            SchemaInfo schemaInfo = schemaInfoCache.schemaInfo(dataFileMeta.schemaId());
            List<String> extras = new ArrayList<>(dataFileMeta.extraFiles());
            List<String> indexFiles =
                    dataFileMeta.extraFiles().stream()
                            .filter(name -> name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
                            .collect(Collectors.toList());
            extras.removeAll(indexFiles);

            Path newIndexPath;
            Map<String, Map<String, byte[]>> maintainers;
            // load
            if (!indexFiles.isEmpty()) {
                String indexFile = indexFiles.get(0);
                try (FileIndexFormat.Reader indexReader =
                        FileIndexFormat.createReader(
                                fileIO.newInputStream(
                                        dataFilePathFactory.toAlignedPath(indexFile, dataFileMeta)),
                                schemaInfo.fileSchema)) {
                    maintainers = indexReader.readAll();
                }
                newIndexPath =
                        createNewFileIndexFilePath(
                                dataFilePathFactory.toAlignedPath(indexFile, dataFileMeta));
            } else {
                maintainers = new HashMap<>();
                newIndexPath = dataFileToFileIndexPath(dataFilePathFactory.toPath(dataFileMeta));
            }

            // remove unnecessary
            for (Map.Entry<String, Map<String, byte[]>> entry :
                    new HashSet<>(maintainers.entrySet())) {
                String name = entry.getKey();
                if (!schemaInfo.projectedColFullNames.contains(name)) {
                    maintainers.remove(name);
                } else {
                    Map<String, byte[]> indexTypeBytes = maintainers.get(name);
                    for (String indexType : entry.getValue().keySet()) {
                        if (!indexTypeBytes.containsKey(indexType)) {
                            indexTypeBytes.remove(indexType);
                        }
                    }
                }
            }

            // ignore close, do not close to write file, only collect serialized maintainers
            @SuppressWarnings("resource")
            DataFileIndexWriter dataFileIndexWriter =
                    DataFileIndexWriter.create(
                            fileIO,
                            newIndexPath,
                            schemaInfo.fileSchema.project(schemaInfo.projectedIndexCols),
                            fileIndexOptions,
                            schemaInfo.colNameMapping);
            if (dataFileIndexWriter != null) {
                try (RecordReader<InternalRow> reader =
                        table.newReadBuilder()
                                .withProjection(schemaInfo.projectedIndexCols)
                                .newRead()
                                .createReader(
                                        DataSplit.builder()
                                                .withPartition(partition)
                                                .withBucket(bucket)
                                                .withBucketPath(
                                                        pathFactory
                                                                .bucketPath(partition, bucket)
                                                                .toString())
                                                .withTotalBuckets(manifestEntry.totalBuckets())
                                                .withDataFiles(
                                                        Collections.singletonList(dataFileMeta))
                                                .rawConvertible(true)
                                                .build())) {
                    reader.forEachRemaining(dataFileIndexWriter::write);
                }

                dataFileIndexWriter
                        .serializeMaintainers()
                        .forEach(
                                (key, value) ->
                                        maintainers
                                                .computeIfAbsent(key, k -> new HashMap<>())
                                                .putAll(value));
            }

            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            try (FileIndexFormat.Writer indexWriter = FileIndexFormat.createWriter(baos)) {
                if (!maintainers.isEmpty()) {
                    indexWriter.writeColumnIndexes(maintainers);
                }
            }

            if (baos.size() > sizeInMeta) {
                try (OutputStream outputStream = fileIO.newOutputStream(newIndexPath, true)) {
                    outputStream.write(baos.toByteArray());
                }
                extras.add(newIndexPath.getName());
                return dataFileMeta.copy(extras);
            } else if (baos.size() == 0) {
                return dataFileMeta.copy(extras);
            } else {
                return dataFileMeta.copy(baos.toByteArray());
            }
        }