static void commit_callback()

in turbonfs/src/rpc_task.cpp [838:1027]


static void commit_callback(
    struct rpc_context *rpc,
    int rpc_status,
    void *data,
    void *private_data)
{
    rpc_task *task = (rpc_task*) private_data;

    /*
     * Commit is issued as a FUSE_FLUSH.
     * TODO: Maybe it should have a type of its own.
     *
     * Note that flush/write is issued as FUSE_WRITE.
     */
    assert(task->magic == RPC_TASK_MAGIC);
    assert(task->get_op_type() == FUSE_FLUSH);
    // rpc_api->pvt must contain the list of bcs which were committed.
    assert(task->rpc_api->pvt != nullptr);
    // Commit is never called for a fuse request.
    assert(task->get_fuse_req() == nullptr);

    auto res = (COMMIT3res*) data;

    INJECT_JUKEBOX(res, task);

    const fuse_ino_t ino = task->rpc_api->flush_task.get_ino();
    struct nfs_inode *inode =
        task->get_client()->get_nfs_inode_from_ino(ino);
    // List of bcs committed by this commit call.
    auto bc_vec_ptr = (std::vector<bytes_chunk> *) task->rpc_api->pvt;

    // We should call commit only when inode is doing unstable+commit.
    assert(!inode->is_stable_write());
    // Inode must be marked "commit in progress".
    assert(inode->is_commit_in_progress());
    // Commit and flush/write are exclusive.
    assert(!inode->get_filecache()->is_flushing_in_progress());
    // bytes_commit_pending will be reduced later below.
    assert(inode->get_filecache()->get_bytes_to_commit() > 0);

    const int status = task->status(rpc_status, NFS_STATUS(res));
    UPDATE_INODE_WCC(inode, res->COMMIT3res_u.resok.file_wcc);

    // Set "in commit callback".
    FC_CB_TRACKER fccbt(inode);

    AZLogDebug("[{}] commit_callback, number of bc committed: {}",
               ino, bc_vec_ptr->size());

    uint64_t commit_bytes = 0;
    /*
     * Now that the request has completed, we can query libnfs for the
     * dispatch time.
     */
    task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUS(res));

    if (status == 0) {
        uint64_t offset = 0;
        uint64_t length = 0;

        /*
         * Go over all the successfully committed bcs and release them from
         * file cache (note that successful commit confirms that server has
         * persisted the data and client can fee it).
         * Also complete any tasks waiting for these membufs to be committed.
         */
        for (auto& bc : *bc_vec_ptr) {
            struct membuf *mb = bc.get_membuf();
            assert(mb->is_inuse());
            assert(mb->is_locked());
            assert(mb->is_commit_pending());
            assert(mb->is_uptodate());
            // Dirty membufs must not be committed.
            assert(!mb->is_dirty());

            // We should not have a zero length bc.
            assert(bc.length > 0);
            assert(bc.length <= AZNFSC_MAX_CHUNK_SIZE);
            assert((bc.offset + bc.length) <= AZNFSC_MAX_FILE_SIZE);

            mb->clear_commit_pending();
            mb->clear_locked();
            mb->clear_inuse();

            /**
             * Release commited data from file cache, one contiguous range at
             * a time.
             */
            if (offset == 0 && length == 0) {
                offset = bc.offset;
                length = bc.length;
            } else if (offset + length == bc.offset) {
                length += bc.length;
                assert(length <= AZNFSC_MAX_FILE_SIZE);
            } else {
                commit_bytes += length;
                const uint64_t released =
                    inode->get_filecache()->release(offset, length);
                AZLogDebug("[{}] commit_callback releasing bc [{}, {}), "
                           "released {} bytes",
                           ino, offset, offset+length, released);
                offset = bc.offset;
                length = bc.length;
            }
        }

        // Release the last bc not released by the above loop.
        if (length != 0) {
            commit_bytes += length;
            const uint64_t released =
                inode->get_filecache()->release(offset, length);
            AZLogDebug("[{}] commit_callback releasing bc [{}, {}), "
                       "released {} bytes",
                       ino, offset, offset+length, released);
        }
    } else if (NFS_STATUS(res) == NFS3ERR_JUKEBOX) {
        task->get_client()->jukebox_retry(task);
        return;
    } else {
        /*
         * Commit has failed.
         * Go over all the bcs that this commit was targetting, mark the
         * membufs back as dirty and clear the commit_pending flag.
         * Next write will initiate the flush again with stable write.
         */
        for (auto& bc : *bc_vec_ptr) {
            struct membuf *mb = bc.get_membuf();
            assert(mb->is_inuse());
            assert(mb->is_locked());
            assert(mb->is_commit_pending());
            assert(mb->is_uptodate());
            /*
             * TODO: What happens if application writes over these
             *       commit_pending bcs?
             */
            assert(!mb->is_dirty());

            mb->clear_commit_pending();
            mb->set_dirty();
            mb->clear_locked();
            mb->clear_inuse();
        }

        /*
         * Set the inode to stable write, so that next write will initiate
         * the flush again with stable write.
         * There should be no flush in progress at this moment, also since
         * we always commit *all* the commit pending bcs, there should not
         * be any more commit pending bcs, so we can safely just enable
         * stable writes for the inode w/o the elaborate
         * switch_to_stable_write().
         */
        assert(inode->get_filecache()->is_flushing_in_progress() == false);
        inode->set_stable_write();
    }

#ifdef ENABLE_PARANOID
    /*
     * We always pick *all* commit-pending bcs for committing, and while commit
     * is going on no other flush can run, hence new commit pending bcs cannot
     * be added, so when an ongoing commit completes we we must not have any
     * commit pending bcs in the cache.
     */
    {
        std::vector<bytes_chunk> bc_vec =
            inode->get_filecache()->get_commit_pending_bcs();
        assert(bc_vec.empty());
    }
#endif

    if (status == 0) {
        assert(commit_bytes > 0);
        /*
         * Update the commit bytes in the inode.
         * This will also clear the commit-in-progress flag in the inode as
         * the very first thing.
         */
        inode->get_fcsm()->on_commit_complete(commit_bytes);
    } else {
        // TODO: Add fcsm::on_commit_fail() and call it from here.
        // Clear the commit in progress flag.
        inode->clear_commit_in_progress();
        assert(0);
    }

    delete bc_vec_ptr;

    task->rpc_api->pvt = nullptr;
    task->free_rpc_task();
}