in turbonfs/src/nfs_inode.cpp [1138:1342]
int nfs_inode::wait_for_ongoing_flush()
{
// Caller must call us with flush_lock held.
assert(is_flushing);
/*
* MUST be called only for regular files.
* Leave the assert to catch if fuse ever calls flush() on non-reg files.
*/
if (!is_regfile()) {
assert(0);
return 0;
}
/*
* If flush() is called w/o open(), there won't be any cache, skip.
*/
if (!has_filecache()) {
return 0;
}
/*
* Flushing not in progress and no new flushing can be started as we hold
* the flush_lock(), and callback drained.
*/
/*
* Stable writes do not need commit, so no commit inprogress and no pending
* commit data.
*/
if (is_stable_write()) {
assert(!is_commit_in_progress());
assert(get_filecache()->bytes_commit_pending == 0);
}
if (get_filecache()->is_flushing_in_progress()) {
assert(!is_commit_in_progress());
} else if (!is_commit_in_progress() &&
!get_fcsm()->fc_cb_running() &&
(get_filecache()->bytes_commit_pending == 0)) {
/*
* Flushing not in progress and no new flushing can be started as we hold
* the flush_lock(), and callback drained.
* No commit inprogress and no pending commit data, return.
*/
AZLogDebug("[{}] No flush or commit in progress, returning", ino);
return 0;
}
/*
* We don't want to hold flush_lock while we wait for the ongoing flush to
* complete, as this can cause deadlock as on_flush_complete() also takes
* flush_lock. We get the current flushing bcs atomically under flush_lock,
* and then release the flush_lock while waiting. We repeat the same till
* there are no flushing bcs. Technically we should not have back to back
* flushes started, but to be safe we retry few times.
* In debug builds we can induce sleep in the write/flush callback, so we
* need to wait enough.
*/
int retry, err = 0;
const int max_retry = 200;
for (retry = 0; retry < max_retry; retry++) {
assert(is_flushing);
/*
* Get the flushing bytes_chunk from the filecache handle.
* This will grab an exclusive lock on the file cache and return the
* list of flushing bytes_chunks at that point. Note that we can have
* new dirty bytes_chunks created but we don't want to wait for those.
*/
std::vector<bytes_chunk> bc_vec =
filecache_handle->get_flushing_bc_range();
/*
* Nothing to flush and callback drained, job done!
* Note that we unlock the membuf before calling on_flush_complete(),
* so there's a window where the callback is still running while all
* membufs have completed. We need to drain the callbacks too, else
* we can deadlock with on_flush_complete() trying to acquire the
* flush_lock which we would be holding and waiting. Once the current
* callback completes, there cannot be any other callback that can run
* as we have the flush_lock which will block any new flushes.
*/
if (bc_vec.empty() && !get_fcsm()->fc_cb_running()) {
assert(err == 0);
break;
}
flush_unlock();
AZLogDebug("[{}] wait_for_ongoing_flush(), attempt #{}, {} membufs, "
"fc_cb_count: {}",
ino, retry+1, bc_vec.size(), get_fcsm()->fc_cb_count());
/*
* Give 10ms to the callback to drain completely.
*/
if (bc_vec.empty() && get_fcsm()->fc_cb_running()) {
::usleep(10 * 1000);
}
/*
* Our caller expects us to return only after the flush completes.
* Wait for all the membufs to flush and get result back.
*/
for (bytes_chunk &bc : bc_vec) {
struct membuf *mb = bc.get_membuf();
assert(mb != nullptr);
assert(mb->is_inuse());
/*
* sync_membufs() would have taken the membuf lock for the duration
* of the backend wite that flushes the membuf, so once we get the
* lock we know that the flush write has completed.
*/
mb->set_locked();
/*
* If still dirty after we get the lock, it may mean two things:
* - Write failed.
* - Some other thread got the lock before us and it made the
* membuf dirty again.
*/
if (mb->is_dirty() && get_write_error()) {
AZLogError("[{}] Flush [{}, {}) failed with error: {}",
ino,
bc.offset, bc.offset + bc.length,
get_write_error());
}
mb->clear_locked();
mb->clear_inuse();
/*
* Release the bytes_chunk back to the filecache.
* These bytes_chunks are not needed anymore as the flush is done.
*
* Note: We come here for bytes_chunks which were found dirty by the
* above loop. These writes may or may not have been issued by
* us (if not issued by us it was because some other thread,
* mostly the writer issued the write so we found it flushing
* and hence didn't issue). In any case since we have an inuse
* count, release() called from write_callback() would not
* have released it, so we need to release it now.
*/
filecache_handle->release(bc.offset, bc.length);
}
// Re-grab flush_lock, now that the wait is over.
flush_lock();
err = get_write_error();
if (err != 0) {
AZLogDebug("[{}] wait_for_ongoing_flush() failed with error: {}",
ino, err);
break;
}
}
if (retry == max_retry) {
err = EINPROGRESS;
AZLogError("[{}] wait_for_ongoing_flush(), failed after {} retries!",
ino, retry);
assert(0);
} else if (err == 0) {
AZLogDebug("[{}] wait_for_ongoing_flush(), succeeded after {} "
"retry(s)!", ino, retry);
}
/*
* We should leave with flush_lock held and flush callback drained.
*/
assert(is_flushing);
assert(!get_fcsm()->fc_cb_running());
/*
* Unstable write case, we need to wait for the commit to complete.
*/
if (get_filecache()->get_bytes_to_commit() > 0) {
assert(!is_stable_write());
if (!is_commit_in_progress()) {
uint64_t bytes = 0;
std::vector<bytes_chunk> bc_vec =
get_filecache()->get_commit_pending_bcs(&bytes);
assert(bc_vec.empty() == (bytes == 0));
assert(bytes > 0);
/*
* Issue the commit RPC to commit the pending data.
*/
commit_membufs(bc_vec);
}
}
if (is_commit_in_progress()) {
wait_for_ongoing_commit();
}
assert(!is_commit_in_progress());
assert(!get_filecache()->is_flushing_in_progress());
return get_write_error();
}