in turbonfs/src/rpc_task.cpp [838:1027]
static void commit_callback(
struct rpc_context *rpc,
int rpc_status,
void *data,
void *private_data)
{
rpc_task *task = (rpc_task*) private_data;
/*
* Commit is issued as a FUSE_FLUSH.
* TODO: Maybe it should have a type of its own.
*
* Note that flush/write is issued as FUSE_WRITE.
*/
assert(task->magic == RPC_TASK_MAGIC);
assert(task->get_op_type() == FUSE_FLUSH);
// rpc_api->pvt must contain the list of bcs which were committed.
assert(task->rpc_api->pvt != nullptr);
// Commit is never called for a fuse request.
assert(task->get_fuse_req() == nullptr);
auto res = (COMMIT3res*) data;
INJECT_JUKEBOX(res, task);
const fuse_ino_t ino = task->rpc_api->flush_task.get_ino();
struct nfs_inode *inode =
task->get_client()->get_nfs_inode_from_ino(ino);
// List of bcs committed by this commit call.
auto bc_vec_ptr = (std::vector<bytes_chunk> *) task->rpc_api->pvt;
// We should call commit only when inode is doing unstable+commit.
assert(!inode->is_stable_write());
// Inode must be marked "commit in progress".
assert(inode->is_commit_in_progress());
// Commit and flush/write are exclusive.
assert(!inode->get_filecache()->is_flushing_in_progress());
// bytes_commit_pending will be reduced later below.
assert(inode->get_filecache()->get_bytes_to_commit() > 0);
const int status = task->status(rpc_status, NFS_STATUS(res));
UPDATE_INODE_WCC(inode, res->COMMIT3res_u.resok.file_wcc);
// Set "in commit callback".
FC_CB_TRACKER fccbt(inode);
AZLogDebug("[{}] commit_callback, number of bc committed: {}",
ino, bc_vec_ptr->size());
uint64_t commit_bytes = 0;
/*
* Now that the request has completed, we can query libnfs for the
* dispatch time.
*/
task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUS(res));
if (status == 0) {
uint64_t offset = 0;
uint64_t length = 0;
/*
* Go over all the successfully committed bcs and release them from
* file cache (note that successful commit confirms that server has
* persisted the data and client can fee it).
* Also complete any tasks waiting for these membufs to be committed.
*/
for (auto& bc : *bc_vec_ptr) {
struct membuf *mb = bc.get_membuf();
assert(mb->is_inuse());
assert(mb->is_locked());
assert(mb->is_commit_pending());
assert(mb->is_uptodate());
// Dirty membufs must not be committed.
assert(!mb->is_dirty());
// We should not have a zero length bc.
assert(bc.length > 0);
assert(bc.length <= AZNFSC_MAX_CHUNK_SIZE);
assert((bc.offset + bc.length) <= AZNFSC_MAX_FILE_SIZE);
mb->clear_commit_pending();
mb->clear_locked();
mb->clear_inuse();
/**
* Release commited data from file cache, one contiguous range at
* a time.
*/
if (offset == 0 && length == 0) {
offset = bc.offset;
length = bc.length;
} else if (offset + length == bc.offset) {
length += bc.length;
assert(length <= AZNFSC_MAX_FILE_SIZE);
} else {
commit_bytes += length;
const uint64_t released =
inode->get_filecache()->release(offset, length);
AZLogDebug("[{}] commit_callback releasing bc [{}, {}), "
"released {} bytes",
ino, offset, offset+length, released);
offset = bc.offset;
length = bc.length;
}
}
// Release the last bc not released by the above loop.
if (length != 0) {
commit_bytes += length;
const uint64_t released =
inode->get_filecache()->release(offset, length);
AZLogDebug("[{}] commit_callback releasing bc [{}, {}), "
"released {} bytes",
ino, offset, offset+length, released);
}
} else if (NFS_STATUS(res) == NFS3ERR_JUKEBOX) {
task->get_client()->jukebox_retry(task);
return;
} else {
/*
* Commit has failed.
* Go over all the bcs that this commit was targetting, mark the
* membufs back as dirty and clear the commit_pending flag.
* Next write will initiate the flush again with stable write.
*/
for (auto& bc : *bc_vec_ptr) {
struct membuf *mb = bc.get_membuf();
assert(mb->is_inuse());
assert(mb->is_locked());
assert(mb->is_commit_pending());
assert(mb->is_uptodate());
/*
* TODO: What happens if application writes over these
* commit_pending bcs?
*/
assert(!mb->is_dirty());
mb->clear_commit_pending();
mb->set_dirty();
mb->clear_locked();
mb->clear_inuse();
}
/*
* Set the inode to stable write, so that next write will initiate
* the flush again with stable write.
* There should be no flush in progress at this moment, also since
* we always commit *all* the commit pending bcs, there should not
* be any more commit pending bcs, so we can safely just enable
* stable writes for the inode w/o the elaborate
* switch_to_stable_write().
*/
assert(inode->get_filecache()->is_flushing_in_progress() == false);
inode->set_stable_write();
}
#ifdef ENABLE_PARANOID
/*
* We always pick *all* commit-pending bcs for committing, and while commit
* is going on no other flush can run, hence new commit pending bcs cannot
* be added, so when an ongoing commit completes we we must not have any
* commit pending bcs in the cache.
*/
{
std::vector<bytes_chunk> bc_vec =
inode->get_filecache()->get_commit_pending_bcs();
assert(bc_vec.empty());
}
#endif
if (status == 0) {
assert(commit_bytes > 0);
/*
* Update the commit bytes in the inode.
* This will also clear the commit-in-progress flag in the inode as
* the very first thing.
*/
inode->get_fcsm()->on_commit_complete(commit_bytes);
} else {
// TODO: Add fcsm::on_commit_fail() and call it from here.
// Clear the commit in progress flag.
inode->clear_commit_in_progress();
assert(0);
}
delete bc_vec_ptr;
task->rpc_api->pvt = nullptr;
task->free_rpc_task();
}