in fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherThread.java [287:360]
private void handleFetchLogResponseOfSuccessBucket(
TableBucket tableBucket,
BucketFetchStatus currentFetchStatus,
FetchLogResultForBucket replicaData) {
try {
long nextFetchOffset = -1L;
if (replicaData.fetchFromRemote()) {
nextFetchOffset = processFetchResultFromRemoteStorage(tableBucket, replicaData);
} else {
LogAppendInfo logAppendInfo =
processFetchResultFromLocalStorage(
tableBucket, currentFetchStatus.fetchOffset(), replicaData);
if (logAppendInfo.validBytes() > 0) {
nextFetchOffset = logAppendInfo.lastOffset() + 1;
}
}
if (nextFetchOffset != -1L && fairBucketStatusMap.contains(tableBucket)) {
BucketFetchStatus newFetchStatus =
new BucketFetchStatus(
currentFetchStatus.tableId(),
currentFetchStatus.tablePath(),
nextFetchOffset,
null);
fairBucketStatusMap.updateAndMoveToEnd(tableBucket, newFetchStatus);
}
} catch (Exception e) {
if (e instanceof CorruptRecordException || e instanceof InvalidRecordException) {
// we log the error and continue to ensure if there is a corrupt record in a table
// bucket, it does not bring the fetcher thread down and cause other table bucket to
// also lag.
LOG.error(
"Found invalid record during fetch for bucket {} at offset {}",
tableBucket,
currentFetchStatus.fetchOffset(),
e);
} else if (e instanceof StorageException) {
LOG.error(
"Error while processing data for bucket {} at offset {}",
tableBucket,
currentFetchStatus.fetchOffset(),
e);
removeBucket(tableBucket);
} else if (e instanceof DuplicateSequenceException
|| e instanceof OutOfOrderSequenceException
|| e instanceof InvalidOffsetException) {
// TODO this part of logic need to be removed after we introduce leader epoch cache.
// Trace by https://github.com/alibaba/fluss/issues/673
LOG.error(
"Founding recoverable error while processing data for bucket {} at offset {}, try to "
+ "truncate to LeaderEndOffsetSnapshot",
tableBucket,
currentFetchStatus.fetchOffset(),
e);
try {
truncateToLeaderEndOffsetSnapshot(tableBucket, currentFetchStatus.tablePath());
} catch (Exception ex) {
LOG.error(
"Error while truncating bucket {} at offset {}",
tableBucket,
currentFetchStatus.fetchOffset(),
ex);
removeBucket(tableBucket);
}
} else {
LOG.error(
"Unexpected error occurred while processing data for bucket {} at offset {}",
tableBucket,
currentFetchStatus.fetchOffset(),
e);
removeBucket(tableBucket);
}
}
}