public static FetchLogResponse makeFetchLogResponse()

in fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java [481:575]


    public static FetchLogResponse makeFetchLogResponse(
            Map<TableBucket, FetchLogResultForBucket> fetchLogResult) {
        Map<Long, List<PbFetchLogRespForBucket>> fetchLogRespMap = new HashMap<>();
        for (Map.Entry<TableBucket, FetchLogResultForBucket> entry : fetchLogResult.entrySet()) {
            TableBucket tb = entry.getKey();
            FetchLogResultForBucket bucketResult = entry.getValue();
            PbFetchLogRespForBucket fetchLogRespForBucket =
                    new PbFetchLogRespForBucket().setBucketId(tb.getBucket());
            if (tb.getPartitionId() != null) {
                fetchLogRespForBucket.setPartitionId(tb.getPartitionId());
            }
            if (bucketResult.failed()) {
                fetchLogRespForBucket.setError(
                        bucketResult.getErrorCode(), bucketResult.getErrorMessage());
            } else {
                fetchLogRespForBucket
                        .setHighWatermark(bucketResult.getHighWatermark())
                        // TODO: set log start offset here if we support log clean.
                        .setLogStartOffset(0L);

                if (bucketResult.fetchFromRemote()) {
                    // set remote log fetch info.
                    RemoteLogFetchInfo rlfInfo = bucketResult.remoteLogFetchInfo();
                    checkNotNull(rlfInfo, "Remote log fetch info is null.");
                    List<PbRemoteLogSegment> remoteLogSegmentList = new ArrayList<>();
                    for (RemoteLogSegment logSegment : rlfInfo.remoteLogSegmentList()) {
                        PbRemoteLogSegment pbRemoteLogSegment =
                                new PbRemoteLogSegment()
                                        .setRemoteLogStartOffset(logSegment.remoteLogStartOffset())
                                        .setRemoteLogSegmentId(
                                                logSegment.remoteLogSegmentId().toString())
                                        .setRemoteLogEndOffset(logSegment.remoteLogEndOffset())
                                        .setSegmentSizeInBytes(logSegment.segmentSizeInBytes());
                        remoteLogSegmentList.add(pbRemoteLogSegment);
                    }
                    fetchLogRespForBucket
                            .setRemoteLogFetchInfo()
                            .setRemoteLogTabletDir(rlfInfo.remoteLogTabletDir())
                            .addAllRemoteLogSegments(remoteLogSegmentList)
                            .setFirstStartPos(rlfInfo.firstStartPos());
                    if (rlfInfo.partitionName() != null) {
                        fetchLogRespForBucket
                                .setRemoteLogFetchInfo()
                                .setPartitionName(rlfInfo.partitionName());
                    }
                } else {
                    // set records
                    LogRecords records = bucketResult.recordsOrEmpty();
                    if (records instanceof FileLogRecords) {
                        FileChannelChunk chunk = ((FileLogRecords) records).toChunk();
                        // zero-copy optimization for file channel
                        fetchLogRespForBucket.setRecords(
                                chunk.getFileChannel(), chunk.getPosition(), chunk.getSize());
                    } else if (records instanceof MemoryLogRecords) {
                        // this should never happen, but we still support fetch memory log records.
                        if (records == MemoryLogRecords.EMPTY) {
                            fetchLogRespForBucket.setRecords(new byte[0]);
                        } else {
                            MemoryLogRecords logRecords = (MemoryLogRecords) records;
                            fetchLogRespForBucket.setRecords(
                                    logRecords.getMemorySegment(),
                                    logRecords.getPosition(),
                                    logRecords.sizeInBytes());
                        }
                    } else if (records instanceof BytesViewLogRecords) {
                        // zero-copy for project push down.
                        fetchLogRespForBucket.setRecordsBytesView(
                                ((BytesViewLogRecords) records).getBytesView());
                    } else {
                        throw new UnsupportedOperationException(
                                "Not supported log records type: " + records.getClass().getName());
                    }
                }
            }
            if (fetchLogRespMap.containsKey(tb.getTableId())) {
                fetchLogRespMap.get(tb.getTableId()).add(fetchLogRespForBucket);
            } else {
                List<PbFetchLogRespForBucket> fetchLogRespForBuckets = new ArrayList<>();
                fetchLogRespForBuckets.add(fetchLogRespForBucket);
                fetchLogRespMap.put(tb.getTableId(), fetchLogRespForBuckets);
            }
        }

        List<PbFetchLogRespForTable> fetchLogRespForTables = new ArrayList<>();
        for (Map.Entry<Long, List<PbFetchLogRespForBucket>> entry : fetchLogRespMap.entrySet()) {
            PbFetchLogRespForTable fetchLogRespForTable = new PbFetchLogRespForTable();
            fetchLogRespForTable.setTableId(entry.getKey());
            fetchLogRespForTable.addAllBucketsResps(entry.getValue());
            fetchLogRespForTables.add(fetchLogRespForTable);
        }

        FetchLogResponse fetchLogResponse = new FetchLogResponse();
        fetchLogResponse.addAllTablesResps(fetchLogRespForTables);
        return fetchLogResponse;
    }