void nfs_inode::sync_membufs()

in turbonfs/src/nfs_inode.cpp [654:942]


void nfs_inode::sync_membufs(std::vector<bytes_chunk> &bc_vec,
                             bool is_flush,
                             struct rpc_task *parent_task)
{
    // Caller must hold the flush_lock.
    assert(is_flushing);

    if (!is_stable_write()) {
        /*
         * We do not allow a new flush while there's an ongoing one, in case
         * of unstable writes.
         */
        assert(!get_filecache()->is_flushing_in_progress());
    }

    /*
     * Stable won't have commit and for unstable we cannot flush while
     * commit is going on.
     */
    assert(!is_commit_in_progress());

    INC_GBL_STATS(num_sync_membufs, 1);

    if (bc_vec.empty()) {
        return;
    }

    /*
     * If parent_task is passed, it must refer to the fuse write task that
     * trigerred the inline sync.
     */
    if (parent_task) {
        assert(parent_task->magic == RPC_TASK_MAGIC);
        // Must be a frontend write task.
        assert(parent_task->get_op_type() == FUSE_WRITE);
        assert(parent_task->rpc_api->write_task.is_fe());
        // Must not already have num_ongoing_backend_writes set.
        assert(parent_task->num_ongoing_backend_writes == 0);

        /*
         * Set num_ongoing_backend_writes to 1 before issuing the first backend
         * write. Note that bc_vec may result in possibly multiple backend
         * writes to be issued. After issuing some of those writes and before we
         * could issue all if write_iov_callback() is called for all the writes
         * issued till that point, then we may mistake it for "all issued writes
         * have completed" and wrongly complete the parent_task.
         * This protective ref is decremented at the end of this function.
         */
        parent_task->num_ongoing_backend_writes = 1;
    }

    /*
     * If the new data being written is not right after the last one written
     * we need to switch to stable write.
     */
    if (check_stable_write_required(bc_vec[0].offset)) {
        switch_to_stable_write();
    }

    /*
     * Create the flush task to carry out the write.
     */
    struct rpc_task *write_task = nullptr;

    // Flush dirty membufs to backend.
    for (bytes_chunk& bc : bc_vec) {
        /*
         * We should never write a partial membuf, that will cause issues as
         * membuf flags (dirty, flushing, in this case) are tracked at membuf
         * granularity. Check maps_full_membuf() to see how the membuf itself
         * may have been trimmed by a release() call, but the bc must refer to
         * whatever membuf part is currently valid.
         */
        assert(bc.maps_full_membuf());

        /*
         * Get the underlying membuf for bc.
         * Note that we write the entire membuf, even though bc may be referring
         * to a smaller window.
         *
         * Correction: We may not write the entire membuf in case the bytes_chunk
         *             was trimmed. Since get_dirty_bc_range() returns full
         *             bytes_chunks from the chunkmap, we should get full
         *             (but potentially trimmed) bytes_chunks here.
         */
        struct membuf *mb = bc.get_membuf();

        /*
         * Verify the mb.
         * Caller must hold an inuse count on the membufs.
         * sync_membufs() takes ownership of that inuse count and will drop it.
         * We have two cases:
         * 1. We decide to issue the write IO.
         *    In this case the inuse count will be dropped by
         *    write_iov_callback().
         *    This will be the only inuse count and the buffer will be
         *    release()d after write_iov_callback() (in bc_iovec destructor).
         * 2. We found the membuf as flushing.
         *    In this case we don't issue the write and return, but only after
         *    dropping the inuse count.
         */
        assert(mb != nullptr);
        assert(mb->is_inuse());

        if (is_flush) {
            /*
             * get_dirty_bc_range() must have held an inuse count.
             * We hold an extra inuse count so that we can safely wait for the
             * flush in the "waiting loop" in nfs_inode::flush_cache_and_wait().
             * This is needed as we drop inuse count if membuf is already being
             * flushed by another thread or it may drop when the write_iov_callback()
             * completes which can happen before we reach the waiting loop.
             */
            mb->set_inuse();
        }

        /*
         * Lock the membuf. If multiple writer threads want to flush the same
         * membuf the first one will find it dirty and not flushing, that thread
         * should initiate the Blob write. Others that come in while the 1st thread
         * started flushing but the write has not completed, will find it "dirty
         * and flushing" and they can avoid the write and optionally choose to wait
         * for it to complete by waiting for the lock. Others who find it after the
         * write is done and lock is released will find it not "dirty and not
         * flushing". They can just skip.
         *
         * Note that we allocate the rpc_task for flush before the lock as it may
         * block.
         * TODO: We don't do it currently, fix this!
         */
        if (mb->is_flushing() || !mb->is_dirty()) {
            mb->clear_inuse();

            continue;
        }

        /*
         * We hold the membuf lock here for the following reasons:
         * - Only one thread can flush a membuf. Once it takes the lock
         *   it calls set_flushing() to mark the membuf as flushing and
         *   then no other thread would attempt to flush it.
         * - It also prevents writers from updating the membuf content
         *   while it's being flushed (though this is not mandatory).
         *
         * This is released only when the backend write completes, thus
         * wait_for_ongoing_flush() can simply wait for the membuf lock to
         * get notified when the flush completes.
         *
         * TODO: This can block the fuse thread for longish times affecting
         *       other interactive commands like readdir/stat.
         *
         * Note: Since we are holding flush_lock and flush_lock has a reqirement
         *       that it should not be held while waiting for some write/commit
         *       on that inode to complete, we must ensure that the following
         *       set_locked() call won't wait for write to complete.
         *       This is ensured because we only come here for membufs that are
         *       currently not flushing and hence cannot be waiting for a write.
         *
         * Note: bytes_chunk_cache::truncate() can truncate a membuf after we
         *       get the list of dirty membufs and before we could get the lock
         *       here, skip those.
         */
        mb->set_locked();
        if (mb->is_flushing() ||
            !mb->is_dirty() ||
            mb->is_truncated()) {

            if (mb->is_truncated()) {
                AZLogInfo("[{}] sync_membufs: skipping truncated membuf "
                          "[{}, {})", get_fuse_ino(), mb->offset.load(),
                          mb->offset.load()+mb->length.load());
            }

            mb->clear_locked();
            mb->clear_inuse();
            continue;
        }

        INC_GBL_STATS(tot_bytes_sync_membufs, mb->length.load());

        if (write_task == nullptr) {
            write_task =
                get_client()->get_rpc_task_helper()->alloc_rpc_task(FUSE_WRITE);
            write_task->init_write_be(ino);
            assert(write_task->rpc_api->pvt == nullptr);
            assert(write_task->rpc_api->parent_task == nullptr);

            /*
             * Set the parent_task pointer for this child task, so that we can
             * complete the parent task when all issued writes complete.
             */
            if (parent_task) {
                write_task->rpc_api->parent_task = parent_task;
                parent_task->num_ongoing_backend_writes++;
            }
            write_task->rpc_api->pvt = new bc_iovec(this);

            /*
             * We have at least one flush/write to issue, mark fcsm as running,
             * if not already marked.
             */
            get_fcsm()->mark_running();
        }

        /*
         * XXX Add an assert that unstable writes should only have contiguous
         *     bcs .
         */

        /*
         * Add as many bytes_chunk to the write_task as it allows.
         * Once packed completely, then dispatch the write.
         */
        if (write_task->add_bc(bc)) {
            if (!is_stable_write()) {
                putblock_filesize += bc.length;
            } else {
                assert(putblock_filesize == (off_t) AZNFSC_BAD_OFFSET);
            }
            continue;
        } else {
            /*
             * This write_task will orchestrate this write.
             */
            write_task->issue_write_rpc();

            /*
             * Create the new flush task to carry out the write for next bc,
             * which we failed to add to the existing write_task.
             */
            write_task =
                get_client()->get_rpc_task_helper()->alloc_rpc_task(FUSE_WRITE);
            write_task->init_write_be(ino);
            assert(write_task->rpc_api->pvt == nullptr);

            if (parent_task) {
                write_task->rpc_api->parent_task = parent_task;
                parent_task->num_ongoing_backend_writes++;
            }
            write_task->rpc_api->pvt = new bc_iovec(this);

            // Single bc addition should not fail.
            [[maybe_unused]] bool res = write_task->add_bc(bc);
            assert(res == true);

            if (!is_stable_write()) {
                putblock_filesize += bc.length;
            } else {
                assert(putblock_filesize == (off_t) AZNFSC_BAD_OFFSET);
            }
        }
    }

    // Dispatch the leftover bytes (or full write).
    if (write_task) {
        write_task->issue_write_rpc();
    }

    /*
     * Drop the protective num_ongoing_backend_writes count taken at the start
     * of this function, and if it's the only one remaining that means all
     * backend writes have completed and we can complete the parent_task, else
     * (for the common case) we will complete parent_task when the last backend
     * write completes, in write_iov_callback().
     */
    if (parent_task) {
        assert(parent_task->magic == RPC_TASK_MAGIC);
        assert(parent_task->get_op_type() == FUSE_WRITE);
        assert(parent_task->rpc_api->write_task.is_fe());
        assert(parent_task->num_ongoing_backend_writes > 0);
        assert(parent_task->rpc_api->write_task.get_ino() == get_fuse_ino());

        if (--parent_task->num_ongoing_backend_writes == 0) {
            if (get_write_error() == 0) {
                assert(parent_task->rpc_api->write_task.get_size() > 0);
                parent_task->reply_write(
                        parent_task->rpc_api->write_task.get_size());
            } else {
                parent_task->reply_error(get_write_error());
            }
        }

        /*
         * Note: parent_task could be freed by the above reply callback.
         *       Don't access parent_task after this, either here or the
         *       caller.
         */
    }
}