public BytesViewLogRecords project()

in fluss-common/src/main/java/com/alibaba/fluss/record/FileLogProjection.java [160:272]


    public BytesViewLogRecords project(FileChannel channel, int start, int end, int maxBytes)
            throws IOException {
        checkNotNull(currentProjection, "There is no projection registered yet.");
        MultiBytesView.Builder builder = MultiBytesView.builder();
        int position = start;
        while (maxBytes > RECORD_BATCH_HEADER_SIZE) {
            if (position >= end - RECORD_BATCH_HEADER_SIZE) {
                // the remaining bytes in the file are not enough to read a batch header
                return new BytesViewLogRecords(builder.build());
            }

            // read log header
            logHeaderBuffer.rewind();
            readFullyOrFail(channel, logHeaderBuffer, position, "log header");

            logHeaderBuffer.rewind();
            int batchSizeInBytes = LOG_OVERHEAD + logHeaderBuffer.getInt(LENGTH_OFFSET);
            if (position > end - batchSizeInBytes) {
                // the remaining bytes in the file are not enough to read a full batch
                return new BytesViewLogRecords(builder.build());
            }

            // Skip empty batch. The empty batch was generated when build cdc log batch when there
            // is no cdc log generated for this kv batch. See the comments about the field
            // 'lastOffsetDelta' in DefaultLogRecordBatch.
            if (batchSizeInBytes == RECORD_BATCH_HEADER_SIZE) {
                position += batchSizeInBytes;
                continue;
            }

            boolean isAppendOnly =
                    (logHeaderBuffer.get(ATTRIBUTES_OFFSET) & APPEND_ONLY_FLAG_MASK) > 0;

            final int changeTypeBytes;
            final long arrowHeaderOffset;
            if (isAppendOnly) {
                changeTypeBytes = 0;
                arrowHeaderOffset = position + RECORD_BATCH_HEADER_SIZE;
            } else {
                changeTypeBytes = logHeaderBuffer.getInt(RECORDS_COUNT_OFFSET);
                arrowHeaderOffset = position + RECORD_BATCH_HEADER_SIZE + changeTypeBytes;
            }

            // read arrow header
            arrowHeaderBuffer.rewind();
            readFullyOrFail(channel, arrowHeaderBuffer, arrowHeaderOffset, "arrow header");
            arrowHeaderBuffer.position(ARROW_IPC_METADATA_SIZE_OFFSET);
            int arrowMetadataSize = arrowHeaderBuffer.getInt();

            resizeArrowMetadataBuffer(arrowMetadataSize);
            arrowMetadataBuffer.rewind();
            readFullyOrFail(
                    channel,
                    arrowMetadataBuffer,
                    arrowHeaderOffset + ARROW_HEADER_SIZE,
                    "arrow metadata");

            arrowMetadataBuffer.rewind();
            Message metadata = Message.getRootAsMessage(arrowMetadataBuffer);
            ProjectedArrowBatch projectedArrowBatch =
                    projectArrowBatch(
                            metadata,
                            currentProjection.nodesProjection,
                            currentProjection.buffersProjection,
                            currentProjection.bufferCount);
            long arrowBodyLength = projectedArrowBatch.bodyLength();

            int newBatchSizeInBytes =
                    RECORD_BATCH_HEADER_SIZE
                            + changeTypeBytes
                            + currentProjection.arrowMetadataLength
                            + (int) arrowBodyLength; // safe to cast to int
            if (newBatchSizeInBytes > maxBytes) {
                // the remaining bytes in the file are not enough to read a full batch
                return new BytesViewLogRecords(builder.build());
            }

            // 3. create new arrow batch metadata which already projected.
            byte[] headerMetadata =
                    serializeArrowRecordBatchMetadata(
                            projectedArrowBatch,
                            arrowBodyLength,
                            currentProjection.bodyCompression);
            checkState(
                    headerMetadata.length == currentProjection.arrowMetadataLength,
                    "Invalid metadata length");

            // 4. update and copy log batch header
            logHeaderBuffer.position(LENGTH_OFFSET);
            logHeaderBuffer.putInt(newBatchSizeInBytes - LOG_OVERHEAD);
            logHeaderBuffer.rewind();
            // the logHeader can't be reused, as it will be sent to network
            byte[] logHeader = new byte[RECORD_BATCH_HEADER_SIZE];
            logHeaderBuffer.get(logHeader);

            // 5. build log records
            builder.addBytes(logHeader);
            if (!isAppendOnly) {
                builder.addBytes(channel, position + ARROW_CHANGETYPE_OFFSET, changeTypeBytes);
            }
            builder.addBytes(headerMetadata);
            final long bufferOffset = arrowHeaderOffset + ARROW_HEADER_SIZE + arrowMetadataSize;
            projectedArrowBatch.buffers.forEach(
                    b ->
                            builder.addBytes(
                                    channel, bufferOffset + b.getOffset(), (int) b.getSize()));

            maxBytes -= newBatchSizeInBytes;
            position += batchSizeInBytes;
        }

        return new BytesViewLogRecords(builder.build());
    }