void fcsm::ensure_flush()

in turbonfs/src/fcsm.cpp [658:871]


void fcsm::ensure_flush(uint64_t write_off,
                        uint64_t write_len,
                        struct rpc_task *task,
                        std::atomic<bool> *done,
                        bool flush_full_unstable)
{
    assert(inode->is_flushing);
    /*
     * Only one of task and done can be passed.
     */
    assert((!!task + !!done) < 2);
    assert(!done || (*done == false));

    /*
     * If any of the flush/commit targets are waiting completion, state machine
     * must be running.
     */
    assert(is_running() || (ctgtq.empty() && ftgtq.empty()));
    assert(is_running() || (flushed_seq_num == flushing_seq_num));
    assert(is_running() || (committed_seq_num == committing_seq_num));
#if 0
    assert(!flush_full_unstable || !is_running());
#endif

    AZLogDebug("[{}] [FCSM] ensure_flush<{}> write req [{}, {}], task: {}, "
               "done: {}",
               inode->get_fuse_ino(),
               task ? "blocking" : "non-blocking",
               write_off, write_off + write_len,
               fmt::ptr(task),
               fmt::ptr(done));

    // flushed_seq_num can never be more than flushing_seq_num.
    assert(flushed_seq_num <= flushing_seq_num);

    if (task) {
        // task provided must be a frontend write task.
        assert(task->magic == RPC_TASK_MAGIC);
        assert(task->get_op_type() == FUSE_WRITE);
        assert(task->rpc_api->write_task.is_fe());
        assert(task->rpc_api->write_task.get_size() > 0);
        // write_len and write_off must match that of the task.
        assert(task->rpc_api->write_task.get_size() == write_len);
        assert(task->rpc_api->write_task.get_offset() == (off_t) write_off);
    }

    /*
     * What will be the flushed_seq_num value after *all* current dirty bytes
     * are flushed? That becomes our target flushed_seq_num.
     * Since bytes_chunk_cache::{bytes_dirty,bytes_flushing} are not updated
     * inside flush_lock, we can have race conditions where later values of
     * target_flushed_seq_num may be less than what we have already queued in
     * the latest flush target. In such case, just wait for the larger value.
     */
    const uint64_t bytes_to_flush =
        inode->get_filecache()->get_bytes_to_flush();
    const uint64_t last_flush_seq =
                !ftgtq.empty() ? ftgtq.front().flush_seq : 0;
    const uint64_t target_flushed_seq_num =
             std::max((flushing_seq_num + bytes_to_flush), last_flush_seq);

    /*
     * If the state machine is already running, we just need to add an
     * appropriate flush target and return. When the ongoing operation
     * completes, this flush would be dispatched.
     */
    if (is_running()) {
#ifndef NDEBUG
        /*
         * Make sure flush targets are always added in an increasing flush_seq.
         */
        if (!ftgtq.empty()) {
            assert(ftgtq.front().flush_seq <= target_flushed_seq_num);
            assert(ftgtq.front().commit_seq == 0);
        }
#endif
#ifdef ENABLE_PARANOID
        /*
         * Since we are adding a flush target make sure we have that much dirty
         * data in the chunkmap.
         */
        {
            uint64_t bytes;
            std::vector<bytes_chunk> bc_vec =
                inode->get_filecache()->get_dirty_nonflushing_bcs_range(
                        0, UINT64_MAX, &bytes);
            assert(bc_vec.empty() == (bytes == 0));
            assert(bytes >= bytes_to_flush);

            for (auto& bc : bc_vec) {
                bc.get_membuf()->clear_inuse();
            }
        }
#endif

        /*
         * If no new flush target and caller doesn't need to be notified,
         * don't add a dup target. The already queued target will ensure
         * the requested flush is done.
         */
        if (!task && !done &&
            (target_flushed_seq_num == last_flush_seq)) {
            return;
        }

        ftgtq.emplace(this,
                      target_flushed_seq_num /* target flush_seq */,
                      0 /* commit_seq */,
                      task,
                      done,
                      flush_full_unstable);
        return;
    }

    /*
     * FCSM not running.
     */
    assert(flushed_seq_num == flushing_seq_num);
    assert(target_flushed_seq_num >= flushing_seq_num);

    // No new data to flush.
    if (target_flushed_seq_num == flushed_seq_num) {
        if (task) {
            assert(!done);
            task->reply_write(task->rpc_api->write_task.get_size());
        } else if (done) {
            assert(*done == false);
            *done = true;
        }
        return;
    }

    uint64_t bytes;
    std::vector<bytes_chunk> bc_vec;

    if (inode->is_stable_write()) {
        bc_vec = inode->get_filecache()->get_dirty_nonflushing_bcs_range(
                                                    0, UINT64_MAX, &bytes);
        /*
         * Dirty flushable data can increase after get_bytes_to_flush() call
         * above as more dirty data can be added, while no dirty data can
         * become flushing as we have the flush_lock.
         */
        assert(bytes >= bytes_to_flush);
    } else {
        bc_vec = inode->get_filecache()->get_contiguous_dirty_bcs(&bytes);
        /*
         * If caller wants us to flush *all* dirty data, and we figured out that
         * all of dirty data is not contiguous, then we need to switch to stable
         * write and flush *all* dirty data.
         */
        if (flush_full_unstable && bytes_to_flush > bytes) {
            /*
             * Release inuse count held by get_contiguous_dirty_bcs().
             */
            for (bytes_chunk& bc : bc_vec) {
                struct membuf *mb = bc.get_membuf();
                mb->clear_inuse();
            }

            assert(!inode->is_commit_in_progress());
            assert(!inode->get_filecache()->is_flushing_in_progress());

            bc_vec = inode->get_filecache()->get_dirty_nonflushing_bcs_range(
                    0, UINT64_MAX, &bytes);
            assert(bytes >= bytes_to_flush);

            inode->set_stable_write();
        }
    }
    assert(bc_vec.empty() == (bytes == 0));
    assert(bytes > 0);

    /*
     * Kickstart the state machine.
     * Since we pass the 3rd arg to sync_membufs, it tells sync_membufs()
     * to call the fuse callback after all the issued backend writes
     * complete. This will be done asynchronously while the sync_membufs()
     * call will return after issuing the writes.
     *
     * Note: sync_membufs() can free this rpc_task if all issued backend
     *       writes complete before sync_membufs() can return.
     *       DO NOT access rpc_task after sync_membufs() call.
     */
    AZLogDebug("[{}] [FCSM] kicking, flushing_seq_num now: {} "
               "flushed_seq_num: {}",
               inode->get_fuse_ino(),
               flushing_seq_num.load(),
               flushed_seq_num.load());

    [[maybe_unused]]
    const uint64_t flushing_seq_num_before = flushing_seq_num;
    assert(flushed_seq_num <= flushing_seq_num);

    /*
     * sync_membufs() will update flushing_seq_num and mark fcsm running.
     * Task is not passed to sync_membufs, but enqueued to ftgtq.
     */
    inode->sync_membufs(bc_vec, false /* is_flush */, nullptr);

    assert(is_running());
    assert(flushing_seq_num == (flushing_seq_num_before + bytes));
    assert(flushed_seq_num <= flushing_seq_num);

    /*
     * Enqueue a flush target for caller to be notified when all data
     * till target_flushed_seq_num is flushed.
     */
    ftgtq.emplace(this,
                 target_flushed_seq_num /* target flush_seq */,
                 0 /* commit_seq */,
                 task,
                 done);
}