in fluss-common/src/main/java/com/alibaba/fluss/record/FileLogProjection.java [160:272]
public BytesViewLogRecords project(FileChannel channel, int start, int end, int maxBytes)
throws IOException {
checkNotNull(currentProjection, "There is no projection registered yet.");
MultiBytesView.Builder builder = MultiBytesView.builder();
int position = start;
while (maxBytes > RECORD_BATCH_HEADER_SIZE) {
if (position >= end - RECORD_BATCH_HEADER_SIZE) {
// the remaining bytes in the file are not enough to read a batch header
return new BytesViewLogRecords(builder.build());
}
// read log header
logHeaderBuffer.rewind();
readFullyOrFail(channel, logHeaderBuffer, position, "log header");
logHeaderBuffer.rewind();
int batchSizeInBytes = LOG_OVERHEAD + logHeaderBuffer.getInt(LENGTH_OFFSET);
if (position > end - batchSizeInBytes) {
// the remaining bytes in the file are not enough to read a full batch
return new BytesViewLogRecords(builder.build());
}
// Skip empty batch. The empty batch was generated when build cdc log batch when there
// is no cdc log generated for this kv batch. See the comments about the field
// 'lastOffsetDelta' in DefaultLogRecordBatch.
if (batchSizeInBytes == RECORD_BATCH_HEADER_SIZE) {
position += batchSizeInBytes;
continue;
}
boolean isAppendOnly =
(logHeaderBuffer.get(ATTRIBUTES_OFFSET) & APPEND_ONLY_FLAG_MASK) > 0;
final int changeTypeBytes;
final long arrowHeaderOffset;
if (isAppendOnly) {
changeTypeBytes = 0;
arrowHeaderOffset = position + RECORD_BATCH_HEADER_SIZE;
} else {
changeTypeBytes = logHeaderBuffer.getInt(RECORDS_COUNT_OFFSET);
arrowHeaderOffset = position + RECORD_BATCH_HEADER_SIZE + changeTypeBytes;
}
// read arrow header
arrowHeaderBuffer.rewind();
readFullyOrFail(channel, arrowHeaderBuffer, arrowHeaderOffset, "arrow header");
arrowHeaderBuffer.position(ARROW_IPC_METADATA_SIZE_OFFSET);
int arrowMetadataSize = arrowHeaderBuffer.getInt();
resizeArrowMetadataBuffer(arrowMetadataSize);
arrowMetadataBuffer.rewind();
readFullyOrFail(
channel,
arrowMetadataBuffer,
arrowHeaderOffset + ARROW_HEADER_SIZE,
"arrow metadata");
arrowMetadataBuffer.rewind();
Message metadata = Message.getRootAsMessage(arrowMetadataBuffer);
ProjectedArrowBatch projectedArrowBatch =
projectArrowBatch(
metadata,
currentProjection.nodesProjection,
currentProjection.buffersProjection,
currentProjection.bufferCount);
long arrowBodyLength = projectedArrowBatch.bodyLength();
int newBatchSizeInBytes =
RECORD_BATCH_HEADER_SIZE
+ changeTypeBytes
+ currentProjection.arrowMetadataLength
+ (int) arrowBodyLength; // safe to cast to int
if (newBatchSizeInBytes > maxBytes) {
// the remaining bytes in the file are not enough to read a full batch
return new BytesViewLogRecords(builder.build());
}
// 3. create new arrow batch metadata which already projected.
byte[] headerMetadata =
serializeArrowRecordBatchMetadata(
projectedArrowBatch,
arrowBodyLength,
currentProjection.bodyCompression);
checkState(
headerMetadata.length == currentProjection.arrowMetadataLength,
"Invalid metadata length");
// 4. update and copy log batch header
logHeaderBuffer.position(LENGTH_OFFSET);
logHeaderBuffer.putInt(newBatchSizeInBytes - LOG_OVERHEAD);
logHeaderBuffer.rewind();
// the logHeader can't be reused, as it will be sent to network
byte[] logHeader = new byte[RECORD_BATCH_HEADER_SIZE];
logHeaderBuffer.get(logHeader);
// 5. build log records
builder.addBytes(logHeader);
if (!isAppendOnly) {
builder.addBytes(channel, position + ARROW_CHANGETYPE_OFFSET, changeTypeBytes);
}
builder.addBytes(headerMetadata);
final long bufferOffset = arrowHeaderOffset + ARROW_HEADER_SIZE + arrowMetadataSize;
projectedArrowBatch.buffers.forEach(
b ->
builder.addBytes(
channel, bufferOffset + b.getOffset(), (int) b.getSize()));
maxBytes -= newBatchSizeInBytes;
position += batchSizeInBytes;
}
return new BytesViewLogRecords(builder.build());
}