in turbonfs/src/nfs_inode.cpp [654:942]
void nfs_inode::sync_membufs(std::vector<bytes_chunk> &bc_vec,
bool is_flush,
struct rpc_task *parent_task)
{
// Caller must hold the flush_lock.
assert(is_flushing);
if (!is_stable_write()) {
/*
* We do not allow a new flush while there's an ongoing one, in case
* of unstable writes.
*/
assert(!get_filecache()->is_flushing_in_progress());
}
/*
* Stable won't have commit and for unstable we cannot flush while
* commit is going on.
*/
assert(!is_commit_in_progress());
INC_GBL_STATS(num_sync_membufs, 1);
if (bc_vec.empty()) {
return;
}
/*
* If parent_task is passed, it must refer to the fuse write task that
* trigerred the inline sync.
*/
if (parent_task) {
assert(parent_task->magic == RPC_TASK_MAGIC);
// Must be a frontend write task.
assert(parent_task->get_op_type() == FUSE_WRITE);
assert(parent_task->rpc_api->write_task.is_fe());
// Must not already have num_ongoing_backend_writes set.
assert(parent_task->num_ongoing_backend_writes == 0);
/*
* Set num_ongoing_backend_writes to 1 before issuing the first backend
* write. Note that bc_vec may result in possibly multiple backend
* writes to be issued. After issuing some of those writes and before we
* could issue all if write_iov_callback() is called for all the writes
* issued till that point, then we may mistake it for "all issued writes
* have completed" and wrongly complete the parent_task.
* This protective ref is decremented at the end of this function.
*/
parent_task->num_ongoing_backend_writes = 1;
}
/*
* If the new data being written is not right after the last one written
* we need to switch to stable write.
*/
if (check_stable_write_required(bc_vec[0].offset)) {
switch_to_stable_write();
}
/*
* Create the flush task to carry out the write.
*/
struct rpc_task *write_task = nullptr;
// Flush dirty membufs to backend.
for (bytes_chunk& bc : bc_vec) {
/*
* We should never write a partial membuf, that will cause issues as
* membuf flags (dirty, flushing, in this case) are tracked at membuf
* granularity. Check maps_full_membuf() to see how the membuf itself
* may have been trimmed by a release() call, but the bc must refer to
* whatever membuf part is currently valid.
*/
assert(bc.maps_full_membuf());
/*
* Get the underlying membuf for bc.
* Note that we write the entire membuf, even though bc may be referring
* to a smaller window.
*
* Correction: We may not write the entire membuf in case the bytes_chunk
* was trimmed. Since get_dirty_bc_range() returns full
* bytes_chunks from the chunkmap, we should get full
* (but potentially trimmed) bytes_chunks here.
*/
struct membuf *mb = bc.get_membuf();
/*
* Verify the mb.
* Caller must hold an inuse count on the membufs.
* sync_membufs() takes ownership of that inuse count and will drop it.
* We have two cases:
* 1. We decide to issue the write IO.
* In this case the inuse count will be dropped by
* write_iov_callback().
* This will be the only inuse count and the buffer will be
* release()d after write_iov_callback() (in bc_iovec destructor).
* 2. We found the membuf as flushing.
* In this case we don't issue the write and return, but only after
* dropping the inuse count.
*/
assert(mb != nullptr);
assert(mb->is_inuse());
if (is_flush) {
/*
* get_dirty_bc_range() must have held an inuse count.
* We hold an extra inuse count so that we can safely wait for the
* flush in the "waiting loop" in nfs_inode::flush_cache_and_wait().
* This is needed as we drop inuse count if membuf is already being
* flushed by another thread or it may drop when the write_iov_callback()
* completes which can happen before we reach the waiting loop.
*/
mb->set_inuse();
}
/*
* Lock the membuf. If multiple writer threads want to flush the same
* membuf the first one will find it dirty and not flushing, that thread
* should initiate the Blob write. Others that come in while the 1st thread
* started flushing but the write has not completed, will find it "dirty
* and flushing" and they can avoid the write and optionally choose to wait
* for it to complete by waiting for the lock. Others who find it after the
* write is done and lock is released will find it not "dirty and not
* flushing". They can just skip.
*
* Note that we allocate the rpc_task for flush before the lock as it may
* block.
* TODO: We don't do it currently, fix this!
*/
if (mb->is_flushing() || !mb->is_dirty()) {
mb->clear_inuse();
continue;
}
/*
* We hold the membuf lock here for the following reasons:
* - Only one thread can flush a membuf. Once it takes the lock
* it calls set_flushing() to mark the membuf as flushing and
* then no other thread would attempt to flush it.
* - It also prevents writers from updating the membuf content
* while it's being flushed (though this is not mandatory).
*
* This is released only when the backend write completes, thus
* wait_for_ongoing_flush() can simply wait for the membuf lock to
* get notified when the flush completes.
*
* TODO: This can block the fuse thread for longish times affecting
* other interactive commands like readdir/stat.
*
* Note: Since we are holding flush_lock and flush_lock has a reqirement
* that it should not be held while waiting for some write/commit
* on that inode to complete, we must ensure that the following
* set_locked() call won't wait for write to complete.
* This is ensured because we only come here for membufs that are
* currently not flushing and hence cannot be waiting for a write.
*
* Note: bytes_chunk_cache::truncate() can truncate a membuf after we
* get the list of dirty membufs and before we could get the lock
* here, skip those.
*/
mb->set_locked();
if (mb->is_flushing() ||
!mb->is_dirty() ||
mb->is_truncated()) {
if (mb->is_truncated()) {
AZLogInfo("[{}] sync_membufs: skipping truncated membuf "
"[{}, {})", get_fuse_ino(), mb->offset.load(),
mb->offset.load()+mb->length.load());
}
mb->clear_locked();
mb->clear_inuse();
continue;
}
INC_GBL_STATS(tot_bytes_sync_membufs, mb->length.load());
if (write_task == nullptr) {
write_task =
get_client()->get_rpc_task_helper()->alloc_rpc_task(FUSE_WRITE);
write_task->init_write_be(ino);
assert(write_task->rpc_api->pvt == nullptr);
assert(write_task->rpc_api->parent_task == nullptr);
/*
* Set the parent_task pointer for this child task, so that we can
* complete the parent task when all issued writes complete.
*/
if (parent_task) {
write_task->rpc_api->parent_task = parent_task;
parent_task->num_ongoing_backend_writes++;
}
write_task->rpc_api->pvt = new bc_iovec(this);
/*
* We have at least one flush/write to issue, mark fcsm as running,
* if not already marked.
*/
get_fcsm()->mark_running();
}
/*
* XXX Add an assert that unstable writes should only have contiguous
* bcs .
*/
/*
* Add as many bytes_chunk to the write_task as it allows.
* Once packed completely, then dispatch the write.
*/
if (write_task->add_bc(bc)) {
if (!is_stable_write()) {
putblock_filesize += bc.length;
} else {
assert(putblock_filesize == (off_t) AZNFSC_BAD_OFFSET);
}
continue;
} else {
/*
* This write_task will orchestrate this write.
*/
write_task->issue_write_rpc();
/*
* Create the new flush task to carry out the write for next bc,
* which we failed to add to the existing write_task.
*/
write_task =
get_client()->get_rpc_task_helper()->alloc_rpc_task(FUSE_WRITE);
write_task->init_write_be(ino);
assert(write_task->rpc_api->pvt == nullptr);
if (parent_task) {
write_task->rpc_api->parent_task = parent_task;
parent_task->num_ongoing_backend_writes++;
}
write_task->rpc_api->pvt = new bc_iovec(this);
// Single bc addition should not fail.
[[maybe_unused]] bool res = write_task->add_bc(bc);
assert(res == true);
if (!is_stable_write()) {
putblock_filesize += bc.length;
} else {
assert(putblock_filesize == (off_t) AZNFSC_BAD_OFFSET);
}
}
}
// Dispatch the leftover bytes (or full write).
if (write_task) {
write_task->issue_write_rpc();
}
/*
* Drop the protective num_ongoing_backend_writes count taken at the start
* of this function, and if it's the only one remaining that means all
* backend writes have completed and we can complete the parent_task, else
* (for the common case) we will complete parent_task when the last backend
* write completes, in write_iov_callback().
*/
if (parent_task) {
assert(parent_task->magic == RPC_TASK_MAGIC);
assert(parent_task->get_op_type() == FUSE_WRITE);
assert(parent_task->rpc_api->write_task.is_fe());
assert(parent_task->num_ongoing_backend_writes > 0);
assert(parent_task->rpc_api->write_task.get_ino() == get_fuse_ino());
if (--parent_task->num_ongoing_backend_writes == 0) {
if (get_write_error() == 0) {
assert(parent_task->rpc_api->write_task.get_size() > 0);
parent_task->reply_write(
parent_task->rpc_api->write_task.get_size());
} else {
parent_task->reply_error(get_write_error());
}
}
/*
* Note: parent_task could be freed by the above reply callback.
* Don't access parent_task after this, either here or the
* caller.
*/
}
}