private void handleFetchLogResponseOfSuccessBucket()

in fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherThread.java [287:360]


    private void handleFetchLogResponseOfSuccessBucket(
            TableBucket tableBucket,
            BucketFetchStatus currentFetchStatus,
            FetchLogResultForBucket replicaData) {
        try {
            long nextFetchOffset = -1L;
            if (replicaData.fetchFromRemote()) {
                nextFetchOffset = processFetchResultFromRemoteStorage(tableBucket, replicaData);
            } else {
                LogAppendInfo logAppendInfo =
                        processFetchResultFromLocalStorage(
                                tableBucket, currentFetchStatus.fetchOffset(), replicaData);
                if (logAppendInfo.validBytes() > 0) {
                    nextFetchOffset = logAppendInfo.lastOffset() + 1;
                }
            }

            if (nextFetchOffset != -1L && fairBucketStatusMap.contains(tableBucket)) {
                BucketFetchStatus newFetchStatus =
                        new BucketFetchStatus(
                                currentFetchStatus.tableId(),
                                currentFetchStatus.tablePath(),
                                nextFetchOffset,
                                null);
                fairBucketStatusMap.updateAndMoveToEnd(tableBucket, newFetchStatus);
            }
        } catch (Exception e) {
            if (e instanceof CorruptRecordException || e instanceof InvalidRecordException) {
                // we log the error and continue to ensure if there is a corrupt record in a table
                // bucket, it does not bring the fetcher thread down and cause other table bucket to
                // also lag.
                LOG.error(
                        "Found invalid record during fetch for bucket {} at offset {}",
                        tableBucket,
                        currentFetchStatus.fetchOffset(),
                        e);
            } else if (e instanceof StorageException) {
                LOG.error(
                        "Error while processing data for bucket {} at offset {}",
                        tableBucket,
                        currentFetchStatus.fetchOffset(),
                        e);
                removeBucket(tableBucket);
            } else if (e instanceof DuplicateSequenceException
                    || e instanceof OutOfOrderSequenceException
                    || e instanceof InvalidOffsetException) {
                // TODO this part of logic need to be removed after we introduce leader epoch cache.
                // Trace by https://github.com/alibaba/fluss/issues/673
                LOG.error(
                        "Founding recoverable error while processing data for bucket {} at offset {}, try to "
                                + "truncate to LeaderEndOffsetSnapshot",
                        tableBucket,
                        currentFetchStatus.fetchOffset(),
                        e);
                try {
                    truncateToLeaderEndOffsetSnapshot(tableBucket, currentFetchStatus.tablePath());
                } catch (Exception ex) {
                    LOG.error(
                            "Error while truncating bucket {} at offset {}",
                            tableBucket,
                            currentFetchStatus.fetchOffset(),
                            ex);
                    removeBucket(tableBucket);
                }
            } else {
                LOG.error(
                        "Unexpected error occurred while processing data for bucket {} at offset {}",
                        tableBucket,
                        currentFetchStatus.fetchOffset(),
                        e);
                removeBucket(tableBucket);
            }
        }
    }