public Map readFromLog()

in fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java [976:1050]


    public Map<TableBucket, LogReadResult> readFromLog(
            FetchParams fetchParams, Map<TableBucket, FetchData> bucketFetchInfo) {
        Map<TableBucket, LogReadResult> logReadResult = new HashMap<>();
        boolean isFromFollower = fetchParams.isFromFollower();
        int limitBytes = fetchParams.maxFetchBytes();
        for (Map.Entry<TableBucket, FetchData> entry : bucketFetchInfo.entrySet()) {
            TableBucket tb = entry.getKey();
            PhysicalTableMetricGroup tableMetrics = null;
            Replica replica = null;
            FetchData fetchData = entry.getValue();
            long fetchOffset = fetchData.getFetchOffset();
            int adjustedMaxBytes = Math.min(limitBytes, fetchData.getMaxBytes());
            try {
                replica = getReplicaOrException(tb);
                tableMetrics = replica.tableMetrics();
                tableMetrics.totalFetchLogRequests().inc();
                LOG.trace(
                        "Fetching log record for replica {}, offset {}",
                        tb,
                        fetchData.getFetchOffset());
                replica.checkProjection(fetchData.getProjectFields());
                fetchParams.setCurrentFetch(
                        tb.getTableId(),
                        fetchOffset,
                        adjustedMaxBytes,
                        replica.getRowType(),
                        replica.getArrowCompressionInfo(),
                        fetchData.getProjectFields());
                LogReadInfo readInfo = replica.fetchRecords(fetchParams);

                // Once we read from a non-empty bucket, we stop ignoring request and bucket
                // level size limits.
                FetchDataInfo fetchedData = readInfo.getFetchedData();
                int recordBatchSize = fetchedData.getRecords().sizeInBytes();
                if (recordBatchSize > 0) {
                    fetchParams.markReadOneMessage();
                }
                limitBytes = Math.max(0, limitBytes - recordBatchSize);

                logReadResult.put(
                        tb,
                        new LogReadResult(
                                new FetchLogResultForBucket(
                                        tb, fetchedData.getRecords(), readInfo.getHighWatermark()),
                                fetchedData.getFetchOffsetMetadata()));

                // update metrics
                if (isFromFollower) {
                    serverMetricGroup.replicationBytesOut().inc(recordBatchSize);
                } else {
                    tableMetrics.logBytesOut().inc(recordBatchSize);
                }
            } catch (Exception e) {
                if (isUnexpectedException(e)) {
                    LOG.error("Error processing log fetch operation on replica {}", tb, e);
                    // NOTE: Failed fetch requests metric is not incremented for known exceptions
                    // since it is supposed to indicate un-expected failure of a server in
                    // handling a fetch request
                    if (tableMetrics != null) {
                        tableMetrics.failedFetchLogRequests().inc();
                    }
                }

                FetchLogResultForBucket result;
                if (replica != null && e instanceof LogOffsetOutOfRangeException) {
                    result = handleFetchOutOfRangeException(replica, fetchOffset, e);
                } else {
                    result = new FetchLogResultForBucket(tb, ApiError.fromThrowable(e));
                }
                logReadResult.put(
                        tb, new LogReadResult(result, LogOffsetMetadata.UNKNOWN_OFFSET_METADATA));
            }
        }
        return logReadResult;
    }