public boolean tryComplete()

in fluss-server/src/main/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLog.java [114:193]


    public boolean tryComplete() {
        int accumulatedSize = 0;

        for (Map.Entry<TableBucket, FetchBucketStatus> entry : fetchBucketStatusMap.entrySet()) {
            TableBucket tb = entry.getKey();
            FetchBucketStatus fetchBucketStatus = entry.getValue();
            LogOffsetMetadata fetchOffset = fetchBucketStatus.startOffsetMetadata;
            try {
                if (!fetchBucketStatus.previousFetchLogResultForBucket.fetchFromRemote()
                        && fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
                    Replica replica = replicaManager.getReplicaOrException(tb);
                    LogOffsetSnapshot logOffsetSnapshot =
                            replica.fetchOffsetSnapshot(params.fetchOnlyLeader());
                    LogOffsetMetadata endOffset;
                    if (params.isolation() == FetchIsolation.LOG_END) {
                        endOffset = logOffsetSnapshot.logEndOffset;
                    } else if (params.isolation() == FetchIsolation.HIGH_WATERMARK) {
                        endOffset = logOffsetSnapshot.highWatermark;
                    } else {
                        throw new FlussRuntimeException("Unknown fetch isolation.");
                    }

                    // Go directly to the check for Case E if the log offsets are the same. If
                    // the log segment has just rolled, then the high watermark offset will remain
                    // the same but be on the old segment, which would incorrectly be seen as an
                    // instance of Case D.
                    if (endOffset.getMessageOffset() != fetchOffset.getMessageOffset()) {
                        if (endOffset.onOlderSegment(fetchOffset)) {
                            // Case D, this can happen when the new fetch log operation is on a
                            // truncated leader.
                            LOG.debug(
                                    "Satisfying delayed fetch log since it is fetching later segments of bucket {}.",
                                    tb);
                            return forceComplete();
                        } else if (fetchOffset.onOlderSegment(endOffset)) {
                            // Case D, this can happen when the fetch operation is falling behind
                            // the current segment or the bucket has just rolled a new segment.
                            LOG.debug(
                                    "Satisfying delayed fetch log since it is fetching older segments of bucket {}.",
                                    tb);
                            return forceComplete();
                        } else if (fetchOffset.getMessageOffset() < endOffset.getMessageOffset()) {
                            // We take the bucket fetch size as upper bound when accumulating the
                            // bytes.
                            int bytesAvailable =
                                    Math.min(
                                            endOffset.positionDiff(fetchOffset),
                                            fetchBucketStatus.fetchData.getMaxBytes());
                            accumulatedSize += bytesAvailable;
                        }
                    }
                }
            } catch (NotLeaderOrFollowerException e) {
                // case A and B.
                LOG.debug(
                        "TabletServer is no longer the leader or follower of table-bucket {}, satisfy delayFetchLog immediately.",
                        tb);
                return forceComplete();
            } catch (UnknownTableOrBucketException e) {
                // case C
                LOG.debug(
                        "TabletServer os mp longer knows of table-bucket {}, satisfy delayFetchLog immediately.",
                        tb);
                return forceComplete();
            } catch (IOException e) {
                LOG.debug(
                        "There is an storage exception append for table-bucket {}, satisfy delayFetchLog immediately.",
                        tb,
                        e);
                return forceComplete();
            }
        }

        // Case F.
        if (accumulatedSize >= params.minFetchBytes()) {
            return forceComplete();
        } else {
            return false;
        }
    }