in fluss-server/src/main/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLog.java [114:193]
public boolean tryComplete() {
int accumulatedSize = 0;
for (Map.Entry<TableBucket, FetchBucketStatus> entry : fetchBucketStatusMap.entrySet()) {
TableBucket tb = entry.getKey();
FetchBucketStatus fetchBucketStatus = entry.getValue();
LogOffsetMetadata fetchOffset = fetchBucketStatus.startOffsetMetadata;
try {
if (!fetchBucketStatus.previousFetchLogResultForBucket.fetchFromRemote()
&& fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
Replica replica = replicaManager.getReplicaOrException(tb);
LogOffsetSnapshot logOffsetSnapshot =
replica.fetchOffsetSnapshot(params.fetchOnlyLeader());
LogOffsetMetadata endOffset;
if (params.isolation() == FetchIsolation.LOG_END) {
endOffset = logOffsetSnapshot.logEndOffset;
} else if (params.isolation() == FetchIsolation.HIGH_WATERMARK) {
endOffset = logOffsetSnapshot.highWatermark;
} else {
throw new FlussRuntimeException("Unknown fetch isolation.");
}
// Go directly to the check for Case E if the log offsets are the same. If
// the log segment has just rolled, then the high watermark offset will remain
// the same but be on the old segment, which would incorrectly be seen as an
// instance of Case D.
if (endOffset.getMessageOffset() != fetchOffset.getMessageOffset()) {
if (endOffset.onOlderSegment(fetchOffset)) {
// Case D, this can happen when the new fetch log operation is on a
// truncated leader.
LOG.debug(
"Satisfying delayed fetch log since it is fetching later segments of bucket {}.",
tb);
return forceComplete();
} else if (fetchOffset.onOlderSegment(endOffset)) {
// Case D, this can happen when the fetch operation is falling behind
// the current segment or the bucket has just rolled a new segment.
LOG.debug(
"Satisfying delayed fetch log since it is fetching older segments of bucket {}.",
tb);
return forceComplete();
} else if (fetchOffset.getMessageOffset() < endOffset.getMessageOffset()) {
// We take the bucket fetch size as upper bound when accumulating the
// bytes.
int bytesAvailable =
Math.min(
endOffset.positionDiff(fetchOffset),
fetchBucketStatus.fetchData.getMaxBytes());
accumulatedSize += bytesAvailable;
}
}
}
} catch (NotLeaderOrFollowerException e) {
// case A and B.
LOG.debug(
"TabletServer is no longer the leader or follower of table-bucket {}, satisfy delayFetchLog immediately.",
tb);
return forceComplete();
} catch (UnknownTableOrBucketException e) {
// case C
LOG.debug(
"TabletServer os mp longer knows of table-bucket {}, satisfy delayFetchLog immediately.",
tb);
return forceComplete();
} catch (IOException e) {
LOG.debug(
"There is an storage exception append for table-bucket {}, satisfy delayFetchLog immediately.",
tb,
e);
return forceComplete();
}
}
// Case F.
if (accumulatedSize >= params.minFetchBytes()) {
return forceComplete();
} else {
return false;
}
}