in fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java [481:575]
public static FetchLogResponse makeFetchLogResponse(
Map<TableBucket, FetchLogResultForBucket> fetchLogResult) {
Map<Long, List<PbFetchLogRespForBucket>> fetchLogRespMap = new HashMap<>();
for (Map.Entry<TableBucket, FetchLogResultForBucket> entry : fetchLogResult.entrySet()) {
TableBucket tb = entry.getKey();
FetchLogResultForBucket bucketResult = entry.getValue();
PbFetchLogRespForBucket fetchLogRespForBucket =
new PbFetchLogRespForBucket().setBucketId(tb.getBucket());
if (tb.getPartitionId() != null) {
fetchLogRespForBucket.setPartitionId(tb.getPartitionId());
}
if (bucketResult.failed()) {
fetchLogRespForBucket.setError(
bucketResult.getErrorCode(), bucketResult.getErrorMessage());
} else {
fetchLogRespForBucket
.setHighWatermark(bucketResult.getHighWatermark())
// TODO: set log start offset here if we support log clean.
.setLogStartOffset(0L);
if (bucketResult.fetchFromRemote()) {
// set remote log fetch info.
RemoteLogFetchInfo rlfInfo = bucketResult.remoteLogFetchInfo();
checkNotNull(rlfInfo, "Remote log fetch info is null.");
List<PbRemoteLogSegment> remoteLogSegmentList = new ArrayList<>();
for (RemoteLogSegment logSegment : rlfInfo.remoteLogSegmentList()) {
PbRemoteLogSegment pbRemoteLogSegment =
new PbRemoteLogSegment()
.setRemoteLogStartOffset(logSegment.remoteLogStartOffset())
.setRemoteLogSegmentId(
logSegment.remoteLogSegmentId().toString())
.setRemoteLogEndOffset(logSegment.remoteLogEndOffset())
.setSegmentSizeInBytes(logSegment.segmentSizeInBytes());
remoteLogSegmentList.add(pbRemoteLogSegment);
}
fetchLogRespForBucket
.setRemoteLogFetchInfo()
.setRemoteLogTabletDir(rlfInfo.remoteLogTabletDir())
.addAllRemoteLogSegments(remoteLogSegmentList)
.setFirstStartPos(rlfInfo.firstStartPos());
if (rlfInfo.partitionName() != null) {
fetchLogRespForBucket
.setRemoteLogFetchInfo()
.setPartitionName(rlfInfo.partitionName());
}
} else {
// set records
LogRecords records = bucketResult.recordsOrEmpty();
if (records instanceof FileLogRecords) {
FileChannelChunk chunk = ((FileLogRecords) records).toChunk();
// zero-copy optimization for file channel
fetchLogRespForBucket.setRecords(
chunk.getFileChannel(), chunk.getPosition(), chunk.getSize());
} else if (records instanceof MemoryLogRecords) {
// this should never happen, but we still support fetch memory log records.
if (records == MemoryLogRecords.EMPTY) {
fetchLogRespForBucket.setRecords(new byte[0]);
} else {
MemoryLogRecords logRecords = (MemoryLogRecords) records;
fetchLogRespForBucket.setRecords(
logRecords.getMemorySegment(),
logRecords.getPosition(),
logRecords.sizeInBytes());
}
} else if (records instanceof BytesViewLogRecords) {
// zero-copy for project push down.
fetchLogRespForBucket.setRecordsBytesView(
((BytesViewLogRecords) records).getBytesView());
} else {
throw new UnsupportedOperationException(
"Not supported log records type: " + records.getClass().getName());
}
}
}
if (fetchLogRespMap.containsKey(tb.getTableId())) {
fetchLogRespMap.get(tb.getTableId()).add(fetchLogRespForBucket);
} else {
List<PbFetchLogRespForBucket> fetchLogRespForBuckets = new ArrayList<>();
fetchLogRespForBuckets.add(fetchLogRespForBucket);
fetchLogRespMap.put(tb.getTableId(), fetchLogRespForBuckets);
}
}
List<PbFetchLogRespForTable> fetchLogRespForTables = new ArrayList<>();
for (Map.Entry<Long, List<PbFetchLogRespForBucket>> entry : fetchLogRespMap.entrySet()) {
PbFetchLogRespForTable fetchLogRespForTable = new PbFetchLogRespForTable();
fetchLogRespForTable.setTableId(entry.getKey());
fetchLogRespForTable.addAllBucketsResps(entry.getValue());
fetchLogRespForTables.add(fetchLogRespForTable);
}
FetchLogResponse fetchLogResponse = new FetchLogResponse();
fetchLogResponse.addAllTablesResps(fetchLogRespForTables);
return fetchLogResponse;
}