in fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java [976:1050]
public Map<TableBucket, LogReadResult> readFromLog(
FetchParams fetchParams, Map<TableBucket, FetchData> bucketFetchInfo) {
Map<TableBucket, LogReadResult> logReadResult = new HashMap<>();
boolean isFromFollower = fetchParams.isFromFollower();
int limitBytes = fetchParams.maxFetchBytes();
for (Map.Entry<TableBucket, FetchData> entry : bucketFetchInfo.entrySet()) {
TableBucket tb = entry.getKey();
PhysicalTableMetricGroup tableMetrics = null;
Replica replica = null;
FetchData fetchData = entry.getValue();
long fetchOffset = fetchData.getFetchOffset();
int adjustedMaxBytes = Math.min(limitBytes, fetchData.getMaxBytes());
try {
replica = getReplicaOrException(tb);
tableMetrics = replica.tableMetrics();
tableMetrics.totalFetchLogRequests().inc();
LOG.trace(
"Fetching log record for replica {}, offset {}",
tb,
fetchData.getFetchOffset());
replica.checkProjection(fetchData.getProjectFields());
fetchParams.setCurrentFetch(
tb.getTableId(),
fetchOffset,
adjustedMaxBytes,
replica.getRowType(),
replica.getArrowCompressionInfo(),
fetchData.getProjectFields());
LogReadInfo readInfo = replica.fetchRecords(fetchParams);
// Once we read from a non-empty bucket, we stop ignoring request and bucket
// level size limits.
FetchDataInfo fetchedData = readInfo.getFetchedData();
int recordBatchSize = fetchedData.getRecords().sizeInBytes();
if (recordBatchSize > 0) {
fetchParams.markReadOneMessage();
}
limitBytes = Math.max(0, limitBytes - recordBatchSize);
logReadResult.put(
tb,
new LogReadResult(
new FetchLogResultForBucket(
tb, fetchedData.getRecords(), readInfo.getHighWatermark()),
fetchedData.getFetchOffsetMetadata()));
// update metrics
if (isFromFollower) {
serverMetricGroup.replicationBytesOut().inc(recordBatchSize);
} else {
tableMetrics.logBytesOut().inc(recordBatchSize);
}
} catch (Exception e) {
if (isUnexpectedException(e)) {
LOG.error("Error processing log fetch operation on replica {}", tb, e);
// NOTE: Failed fetch requests metric is not incremented for known exceptions
// since it is supposed to indicate un-expected failure of a server in
// handling a fetch request
if (tableMetrics != null) {
tableMetrics.failedFetchLogRequests().inc();
}
}
FetchLogResultForBucket result;
if (replica != null && e instanceof LogOffsetOutOfRangeException) {
result = handleFetchOutOfRangeException(replica, fetchOffset, e);
} else {
result = new FetchLogResultForBucket(tb, ApiError.fromThrowable(e));
}
logReadResult.put(
tb, new LogReadResult(result, LogOffsetMetadata.UNKNOWN_OFFSET_METADATA));
}
}
return logReadResult;
}