static void write_iov_callback()

in turbonfs/src/rpc_task.cpp [1260:1492]


static void write_iov_callback(
    struct rpc_context *rpc,
    int rpc_status,
    void *data,
    void *private_data)
{
    assert(rpc != nullptr);

    struct rpc_task *task = (struct rpc_task *) private_data;
    assert(task->magic == RPC_TASK_MAGIC);
    // Only flush tasks use this callback.
    assert(task->get_op_type() == FUSE_WRITE);
    // This must be a BE task.
    assert(task->rpc_api->write_task.is_be());
    // Those flush tasks must have pvt set to a bc_iovec ptr.
    struct bc_iovec *bciov = (struct bc_iovec *) task->rpc_api->pvt;
    assert(bciov);
    assert(bciov->magic == BC_IOVEC_MAGIC);

    struct nfs_client *client = task->get_client();
    assert(client->magic == NFS_CLIENT_MAGIC);

    auto res = (WRITE3res *)data;

    INJECT_JUKEBOX(res, task);

    const char* errstr;
    const int status = task->status(rpc_status, NFS_STATUS(res), &errstr);
    const fuse_ino_t ino = task->rpc_api->write_task.get_ino();
    struct nfs_inode *inode = client->get_nfs_inode_from_ino(ino);
    FC_CB_TRACKER fccbt(inode);

    /*
     * Now that the request has completed, we can query libnfs for the
     * dispatch time.
     */
    task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUSX(rpc_status, res));

    // Success case.
    if (status == 0) {
        if (!res->WRITE3res_u.resok.file_wcc.after.attributes_follow) {
            AZLogDebug("[{}] Postop attributes not received for write, "
                       "invalidating attribute cache", ino);

            /*
             * Since the post-op attributes are not populated for the file,
             * invalidate the cache as the attributes may no longer
             * be valid since write() would have changed the file attributes.
             */
            inode->invalidate_attribute_cache();
        }

        /*
         * WCC implementation.
         * If pre-op attributes indicate that the file changed since we cached,
         * it implies some other client updated the file. In this case the best
         * course of action is to drop our cached data. Note that we drop only
         * non-dirty data, anyways multiple client writing to the same file
         * w/o locking would result in undefined data state.
         */
        UPDATE_INODE_WCC(inode, res->WRITE3res_u.resok.file_wcc);

#ifdef ENABLE_PRESSURE_POINTS
        /*
         * Short write pressure point.
         */
        if (inject_error()) {
            // Set write size to a random percent of the actual size.
            const uint32_t pct = random_number(1, 100);
            const uint32_t adj_size =
                std::max((res->WRITE3res_u.resok.count * pct) / 100, 1U);
            assert(adj_size <= res->WRITE3res_u.resok.count);
            AZLogWarn("[{}] PP: short write {} -> {}",
                      ino, res->WRITE3res_u.resok.count, adj_size);
            res->WRITE3res_u.resok.count = adj_size;
        }
#endif
        // Successful Blob write must not return 0.
        assert(res->WRITE3res_u.resok.count > 0);
        assert(res->WRITE3res_u.resok.count <= bciov->length);
        assert(bciov->length <= bciov->orig_length);
        assert(bciov->offset >= bciov->orig_offset);

        /*
         * Did the write for the entire bciov complete?
         * Note that bciov is a vector of multiple bytes_chunk and for each
         * of them we write the entire membuf.
         */
        const bool is_partial_write =
            (res->WRITE3res_u.resok.count < bciov->length);

        if (is_partial_write) {
            AZLogDebug("[{}] <{}> Partial write: [{}, {}) of [{}, {})",
                       ino, task->issuing_tid,
                       bciov->offset,
                       bciov->offset + res->WRITE3res_u.resok.count,
                       bciov->orig_offset,
                       bciov->orig_offset + bciov->orig_length);

            // Update bciov after the current write.
            bciov->on_io_complete(res->WRITE3res_u.resok.count,
                                  !inode->is_stable_write());

            // Create a new write_task for the remaining bc_iovec.
            struct rpc_task *write_task =
                    client->get_rpc_task_helper()->alloc_rpc_task_reserved(FUSE_WRITE);
            write_task->init_write_be(ino);

            // Any new task should start fresh as a parent task.
            assert(write_task->rpc_api->parent_task == nullptr);

            // Hand over the remaining bciov to the new write_task.
            assert(write_task->rpc_api->pvt == nullptr);
            write_task->rpc_api->pvt = task->rpc_api->pvt;

            /*
             * If this (child) write_rpc was issued as part of a another
             * (parent) write task, then set the parent_task in the new
             * child write too.
             */
            if (task->rpc_api->parent_task) {
                write_task->rpc_api->parent_task = task->rpc_api->parent_task;
            }

            task->rpc_api->pvt = nullptr;

            // Issue write for the remaining data.
            write_task->issue_write_rpc();

            /*
             * Release this task since it has done it's job.
             * Now next the callback will be called when the above partial
             * write completes.
             */
            task->free_rpc_task();
            return;
        } else {
            // Complete bc_iovec IO completed.
            bciov->on_io_complete(res->WRITE3res_u.resok.count,
                                  !inode->is_stable_write());

            // Complete data writen to blob.
            AZLogDebug("[{}] <{}> Completed write, off: {}, len: {}",
                       ino, task->issuing_tid, bciov->offset, bciov->length);
        }
    } else if (NFS_STATUS(res) == NFS3ERR_JUKEBOX) {
        AZLogDebug("[{}] JUKEBOX error write, off: {}, len: {}",
                   ino,
                   bciov->offset,
                   bciov->length);
        task->get_client()->jukebox_retry(task);
        return;
    } else {
        /*
         * Since the api failed and can no longer be retried, set write_error
         * and do not clear dirty flag.
         */
        AZLogError("[{}] <{}> Write [{}, {}) failed with status {}: {}",
                   ino, task->issuing_tid,
                   bciov->offset,
                   bciov->length,
                   status, errstr);

        inode->set_write_error(status);

        /*
         * on_io_fail() will clear flushing from all remaining membufs.
         */
        bciov->on_io_fail(status);
    }

    /*
     * If this write_rpc was issued as part of a parent write task, then
     * decrement the ongoing write count and if it is the last write then
     * complete the parent task. This will call the fuse callback.
     */
    if (task->rpc_api->parent_task) {
        struct rpc_task *parent_task = task->rpc_api->parent_task;

        assert(parent_task->magic == RPC_TASK_MAGIC);
        assert(parent_task->get_op_type() == FUSE_WRITE);
        assert(parent_task->rpc_api->write_task.is_fe());
        assert(parent_task->num_ongoing_backend_writes > 0);

        if (--parent_task->num_ongoing_backend_writes == 0) {
            if (inode->get_write_error() == 0) {
                assert(parent_task->rpc_api->write_task.get_size() > 0);
#if 0
                /*
                 * XXX This assert is not valid as sync_membufs() may have
                 *     divided the dirty membufs into multiple bc_iovec (each
                 *     issues as a single WRITE RPC to the backend), some of
                 *     these dirty membufs would have been added by parent_task,
                 *     while most of it would come from other write tasks.
                 *     Each of these will carry the parent_task and only the
                 *     last one would come here, but this last bc_iovec need
                 *     not correspond to the parent_task.
                 */
                assert(parent_task->rpc_api->write_task.get_size() <=
                       bciov->orig_length);
#endif
                parent_task->reply_write(
                        parent_task->rpc_api->write_task.get_size());
            } else {
                parent_task->reply_error(inode->get_write_error());
            }
        }
    }

    /*
     * If this flush has completed successfully, call flush-commit state
     * machine's on_flush_complete() handler.
     *
     * Note: We MUST ensure that on_flush_complete() doesn't block else it'll
     *       block a libnfs thread which may stall further request processing
     *       which may cause deadlock.
     *
     * TODO: Need to handle failure case and prevent state machine from
     *       stalling.
     */
    if (inode->get_write_error() == 0) {
        inode->get_fcsm()->on_flush_complete(bciov->orig_length);
    } else {
        // TODO: Add fcsm::on_flush_fail() and call it from here.
        assert(0);
    }

    delete bciov;
    task->rpc_api->pvt = nullptr;

    // Release the task.
    task->free_rpc_task();
}