in turbonfs/src/rpc_task.cpp [1260:1492]
static void write_iov_callback(
struct rpc_context *rpc,
int rpc_status,
void *data,
void *private_data)
{
assert(rpc != nullptr);
struct rpc_task *task = (struct rpc_task *) private_data;
assert(task->magic == RPC_TASK_MAGIC);
// Only flush tasks use this callback.
assert(task->get_op_type() == FUSE_WRITE);
// This must be a BE task.
assert(task->rpc_api->write_task.is_be());
// Those flush tasks must have pvt set to a bc_iovec ptr.
struct bc_iovec *bciov = (struct bc_iovec *) task->rpc_api->pvt;
assert(bciov);
assert(bciov->magic == BC_IOVEC_MAGIC);
struct nfs_client *client = task->get_client();
assert(client->magic == NFS_CLIENT_MAGIC);
auto res = (WRITE3res *)data;
INJECT_JUKEBOX(res, task);
const char* errstr;
const int status = task->status(rpc_status, NFS_STATUS(res), &errstr);
const fuse_ino_t ino = task->rpc_api->write_task.get_ino();
struct nfs_inode *inode = client->get_nfs_inode_from_ino(ino);
FC_CB_TRACKER fccbt(inode);
/*
* Now that the request has completed, we can query libnfs for the
* dispatch time.
*/
task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUSX(rpc_status, res));
// Success case.
if (status == 0) {
if (!res->WRITE3res_u.resok.file_wcc.after.attributes_follow) {
AZLogDebug("[{}] Postop attributes not received for write, "
"invalidating attribute cache", ino);
/*
* Since the post-op attributes are not populated for the file,
* invalidate the cache as the attributes may no longer
* be valid since write() would have changed the file attributes.
*/
inode->invalidate_attribute_cache();
}
/*
* WCC implementation.
* If pre-op attributes indicate that the file changed since we cached,
* it implies some other client updated the file. In this case the best
* course of action is to drop our cached data. Note that we drop only
* non-dirty data, anyways multiple client writing to the same file
* w/o locking would result in undefined data state.
*/
UPDATE_INODE_WCC(inode, res->WRITE3res_u.resok.file_wcc);
#ifdef ENABLE_PRESSURE_POINTS
/*
* Short write pressure point.
*/
if (inject_error()) {
// Set write size to a random percent of the actual size.
const uint32_t pct = random_number(1, 100);
const uint32_t adj_size =
std::max((res->WRITE3res_u.resok.count * pct) / 100, 1U);
assert(adj_size <= res->WRITE3res_u.resok.count);
AZLogWarn("[{}] PP: short write {} -> {}",
ino, res->WRITE3res_u.resok.count, adj_size);
res->WRITE3res_u.resok.count = adj_size;
}
#endif
// Successful Blob write must not return 0.
assert(res->WRITE3res_u.resok.count > 0);
assert(res->WRITE3res_u.resok.count <= bciov->length);
assert(bciov->length <= bciov->orig_length);
assert(bciov->offset >= bciov->orig_offset);
/*
* Did the write for the entire bciov complete?
* Note that bciov is a vector of multiple bytes_chunk and for each
* of them we write the entire membuf.
*/
const bool is_partial_write =
(res->WRITE3res_u.resok.count < bciov->length);
if (is_partial_write) {
AZLogDebug("[{}] <{}> Partial write: [{}, {}) of [{}, {})",
ino, task->issuing_tid,
bciov->offset,
bciov->offset + res->WRITE3res_u.resok.count,
bciov->orig_offset,
bciov->orig_offset + bciov->orig_length);
// Update bciov after the current write.
bciov->on_io_complete(res->WRITE3res_u.resok.count,
!inode->is_stable_write());
// Create a new write_task for the remaining bc_iovec.
struct rpc_task *write_task =
client->get_rpc_task_helper()->alloc_rpc_task_reserved(FUSE_WRITE);
write_task->init_write_be(ino);
// Any new task should start fresh as a parent task.
assert(write_task->rpc_api->parent_task == nullptr);
// Hand over the remaining bciov to the new write_task.
assert(write_task->rpc_api->pvt == nullptr);
write_task->rpc_api->pvt = task->rpc_api->pvt;
/*
* If this (child) write_rpc was issued as part of a another
* (parent) write task, then set the parent_task in the new
* child write too.
*/
if (task->rpc_api->parent_task) {
write_task->rpc_api->parent_task = task->rpc_api->parent_task;
}
task->rpc_api->pvt = nullptr;
// Issue write for the remaining data.
write_task->issue_write_rpc();
/*
* Release this task since it has done it's job.
* Now next the callback will be called when the above partial
* write completes.
*/
task->free_rpc_task();
return;
} else {
// Complete bc_iovec IO completed.
bciov->on_io_complete(res->WRITE3res_u.resok.count,
!inode->is_stable_write());
// Complete data writen to blob.
AZLogDebug("[{}] <{}> Completed write, off: {}, len: {}",
ino, task->issuing_tid, bciov->offset, bciov->length);
}
} else if (NFS_STATUS(res) == NFS3ERR_JUKEBOX) {
AZLogDebug("[{}] JUKEBOX error write, off: {}, len: {}",
ino,
bciov->offset,
bciov->length);
task->get_client()->jukebox_retry(task);
return;
} else {
/*
* Since the api failed and can no longer be retried, set write_error
* and do not clear dirty flag.
*/
AZLogError("[{}] <{}> Write [{}, {}) failed with status {}: {}",
ino, task->issuing_tid,
bciov->offset,
bciov->length,
status, errstr);
inode->set_write_error(status);
/*
* on_io_fail() will clear flushing from all remaining membufs.
*/
bciov->on_io_fail(status);
}
/*
* If this write_rpc was issued as part of a parent write task, then
* decrement the ongoing write count and if it is the last write then
* complete the parent task. This will call the fuse callback.
*/
if (task->rpc_api->parent_task) {
struct rpc_task *parent_task = task->rpc_api->parent_task;
assert(parent_task->magic == RPC_TASK_MAGIC);
assert(parent_task->get_op_type() == FUSE_WRITE);
assert(parent_task->rpc_api->write_task.is_fe());
assert(parent_task->num_ongoing_backend_writes > 0);
if (--parent_task->num_ongoing_backend_writes == 0) {
if (inode->get_write_error() == 0) {
assert(parent_task->rpc_api->write_task.get_size() > 0);
#if 0
/*
* XXX This assert is not valid as sync_membufs() may have
* divided the dirty membufs into multiple bc_iovec (each
* issues as a single WRITE RPC to the backend), some of
* these dirty membufs would have been added by parent_task,
* while most of it would come from other write tasks.
* Each of these will carry the parent_task and only the
* last one would come here, but this last bc_iovec need
* not correspond to the parent_task.
*/
assert(parent_task->rpc_api->write_task.get_size() <=
bciov->orig_length);
#endif
parent_task->reply_write(
parent_task->rpc_api->write_task.get_size());
} else {
parent_task->reply_error(inode->get_write_error());
}
}
}
/*
* If this flush has completed successfully, call flush-commit state
* machine's on_flush_complete() handler.
*
* Note: We MUST ensure that on_flush_complete() doesn't block else it'll
* block a libnfs thread which may stall further request processing
* which may cause deadlock.
*
* TODO: Need to handle failure case and prevent state machine from
* stalling.
*/
if (inode->get_write_error() == 0) {
inode->get_fcsm()->on_flush_complete(bciov->orig_length);
} else {
// TODO: Add fcsm::on_flush_fail() and call it from here.
assert(0);
}
delete bciov;
task->rpc_api->pvt = nullptr;
// Release the task.
task->free_rpc_task();
}