in cachelib/navy/block_cache/RegionManager.cpp [224:278]
void RegionManager::doFlush(RegionId rid, bool async) {
// We're wasting the remaining bytes of a region, so track it for stats
externalFragmentation_.add(getRegion(rid).getFragmentationSize());
getRegion(rid).setPendingFlush();
numInMemBufWaitingFlush_.inc();
Job flushJob = [this, rid, retryAttempts = 0, flushed = false]() mutable {
if (!flushed) {
if (retryAttempts >= inMemBufFlushRetryLimit_) {
// Flush failure reaches retry limit, stop flushing and start to
// clean up the buffer.
if (cleanupBufferOnFlushFailure(rid)) {
releaseCleanedupRegion(rid);
return JobExitCode::Done;
}
numInMemBufCleanupRetries_.inc();
return JobExitCode::Reschedule;
}
auto res = flushBuffer(rid);
if (res == Region::FlushRes::kSuccess) {
flushed = true;
} else {
// We have a limited retry limit for flush errors due to device
if (res == Region::FlushRes::kRetryDeviceFailure) {
retryAttempts++;
numInMemBufFlushRetries_.inc();
}
return JobExitCode::Reschedule;
}
}
// If the buffer has been successfully flushed or the current flush
// succeeds, detach the buffer until it succeeds
if (flushed) {
if (detachBuffer(rid)) {
// Flush completed, track the region
track(rid);
return JobExitCode::Done;
}
}
return JobExitCode::Reschedule;
};
if (async) {
scheduler_.enqueue(std::move(flushJob), "flush", JobType::Flush);
} else {
while (flushJob() == JobExitCode::Reschedule) {
// We intentionally sleep here to slow it down since this is only
// triggered on shutdown. On cleanup failures, we will sleep a bit before
// retrying to avoid maxing out cpu.
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds{100});
}
}
}