void fcsm::ensure_commit()

in turbonfs/src/fcsm.cpp [413:653]


void fcsm::ensure_commit(uint64_t write_off,
                         uint64_t write_len,
                         struct rpc_task *task,
                         std::atomic<bool> *done,
                         bool commit_full)
{
    assert(inode->is_flushing);
    assert(!inode->is_stable_write());

    /*
     * Caller passes commit_full when they want all dirty data to be flushed
     * and committed (o/w ensure_commit() can choose how much to flush/commit
     * based on configured limits). In such case it will pass a pointer to an
     * atomic bool 'done' which we should set to true once the flush/commit is
     * done. Only one of task and done can be passed.
     */
    assert(!commit_full || (task == nullptr));
    assert(!commit_full || (done != nullptr));
    assert((!!task + !!done) < 2);
    assert(!done || (*done == false));

    /*
     * If any of the flush/commit targets are waiting completion, state machine
     * must be running.
     */
    assert(is_running() || (ctgtq.empty() && ftgtq.empty()));
    assert(is_running() || (flushed_seq_num == flushing_seq_num));
    assert(is_running() || (committed_seq_num == committing_seq_num));

    AZLogDebug("[{}] [FCSM] ensure_commit<{}>"" write req [{}, {}], task: {}, "
               "done: {}",
               inode->get_fuse_ino(),
               task ? "blocking" : "non-blocking",
               commit_full ? " FULL" : "",
               write_off, write_off + write_len,
               fmt::ptr(task),
               fmt::ptr(done));

    // committed_seq_num can never be more than committing_seq_num.
    assert(committed_seq_num <= committing_seq_num);

    // we can only commit bytes which are flushed.
    assert(committing_seq_num <= flushed_seq_num);

    if (task) {
        // task provided must be a frontend write task.
        assert(task->magic == RPC_TASK_MAGIC);
        assert(task->get_op_type() == FUSE_WRITE);
        assert(task->rpc_api->write_task.is_fe());
        assert(task->rpc_api->write_task.get_size() > 0);
    }

    /*
     * Find how many bytes we would like to commit.
     * If there are some commit-pending bytes we commit all of those, else
     * we set a commit target large enough to flush+commit all leaving
     * one full sized dirty extent.
     */
    uint64_t commit_bytes =
        inode->get_filecache()->get_bytes_to_commit();

    /*
     * Only known caller to pass commit_full as true is flush_cache_and_wait().
     * It will first drain all commit_pending data by making a call to
     * wait_for_ongoing_flush() before calling us, so we should not have any
     * commit_pending data. Now we need to flush *all* dirty bytes and commit
     * them and let the caller know once flush+commit is done.
     */
    if (commit_full) {
        assert(done && !*done);
        assert(commit_bytes == 0);
        commit_bytes = inode->get_filecache()->get_bytes_to_flush();
    } else if (commit_bytes == 0) {
        /*
         * TODO: Make sure this doesn't result in small-blocks being written.
         */
        const int64_t bytes =
            (inode->get_filecache()->get_bytes_to_flush() -
             inode->get_filecache()->max_dirty_extent_bytes());

        commit_bytes = std::max(bytes, (int64_t) 0);
    }

    /*
     * No new bytes to commit, complete the task if it was a blocking call.
     */
    if (commit_bytes == 0) {
        AZLogDebug("COMMIT BYTES ZERO");
        if (task) {
            assert(!done);
            task->reply_write(task->rpc_api->write_task.get_size());
        } else if (done) {
            assert(*done == false);
            *done = true;
        }
        return;
    }

    /*
     * What will be the committed_seq_num value after commit_bytes are committed?
     * Since commit_pending_bytes can reduce as another thread could be parallely
     * running commit completion, so we may set target_commited_seq_num lower than
     * the last queued commit_seq, so take the max.
     */
    const uint64_t last_commit_seq =
                !ctgtq.empty() ? ctgtq.front().commit_seq : 0;
    const uint64_t target_committed_seq_num =
             std::max((committed_seq_num + commit_bytes), last_commit_seq);

    /*
     * If the state machine is already running, we just need to add an
     * appropriate commit target and return. When the ongoing operation
     * completes, this commit would be dispatched.
     * Make sure to convey commit_full correctly via the target.
     */
    if (is_running()) {
#ifndef NDEBUG
        /*
         * Make sure commit targets are always added in an increasing commit_seq.
         */
        if (!ctgtq.empty()) {
            assert(ctgtq.front().commit_seq <= target_committed_seq_num);
            assert(ctgtq.front().flush_seq == 0);
        }
#endif
        ctgtq.emplace(this,
                      0 /* target flush_seq */,
                      target_committed_seq_num /* target commit_seq */,
                      task,
                      done,
                      commit_full);
        return;
    }

    /*
     * FCSM not running.
     * Flushed_seq_num tells us how much data is already flushed, If it's less
     * than the target_committed_seq_num, we need to schedule a flush to catch up
     * with the target_committed_seq_num.
     */
    if (flushed_seq_num < target_committed_seq_num) {
        AZLogDebug("[{}] [FCSM] not running, schedule a new flush to catch up, "
                   "flushed_seq_num: {}, target_committed_seq_num: {}, "
                   "stable: {}",
                   inode->get_fuse_ino(),
                   flushed_seq_num.load(),
                   target_committed_seq_num,
                   inode->is_stable_write());

        /*
         * ensure_flush()->sync_membufs() below may convert this inode to stable
         * writes. In that case we should let caller know of completion once all
         * dirty data is flushed, else we want to let caller know once all data
         * is flushed and committed.
         *
         * commit_full amounts to flush_full_unstable.
         */
        ensure_flush(task ? task->rpc_api->write_task.get_offset() : 0,
                     task ? task->rpc_api->write_task.get_size() : 0,
                     nullptr,
                     nullptr,
                     commit_full);

        /*
         * ensure_flush() flushes *all* dirty data, so it must have scheduled
         * flushing till target_committed_seq_num.
         */
        assert(flushing_seq_num >= target_committed_seq_num);

        if (!inode->is_stable_write()) {
            /**
             * Enqueue a commit target to be triggered once the flush completes.
             */
            ctgtq.emplace(this,
                          0 /* target flush_seq */,
                          target_committed_seq_num /* target commit_seq */,
                          task,
                          done);
        } else {
            /*
             * Caller wanted to wait till commit completes, but now the inode
             * has been converted to stable writes, there won't be any commits,
             * complete the task.
             */
            if (task) {
                assert(!commit_full);
                assert(!done);
                task->reply_write(task->rpc_api->write_task.get_size());
            } else if (done) {
                assert(commit_full);
                assert(*done == false);

                ensure_flush(0, 0, nullptr, done);
            }
        }

        return;
    } else {
        /*
         * No new data to flush for the current commit goal, just add a commit.
         * target and we are done.
         * Since FCSM is not running and we discovered that we have one or more
         * bytes to be committed, get_commit_pending_bcs() MUST return those.
         */
        AZLogDebug("[{}] [FCSM] not running, schedule a new commit, "
                   "flushed_seq_num: {}, "
                   "target_committed_seq_num: {}",
                   inode->get_fuse_ino(),
                   flushed_seq_num.load(),
                   target_committed_seq_num);

        uint64_t bytes;
        std::vector<bytes_chunk> bc_vec =
            inode->get_filecache()->get_commit_pending_bcs(&bytes);
        assert(!bc_vec.empty());
        assert(bytes > 0);

        // With FCSM not running, these should be same.
        assert(committing_seq_num == committed_seq_num);
        [[maybe_unused]]
        const uint64_t prev_committing_seq_num = committing_seq_num;
        inode->commit_membufs(bc_vec);
        assert(is_running());

        assert(committing_seq_num == (prev_committing_seq_num + bytes));
        assert(committing_seq_num > committed_seq_num);

        /*
         * Enqueue a commit target for caller to be notified when all data
         * till target_committed_seq_num is flushed+committed. In case
         * commit_full is true, above commit_membufs() may not be sufficient
         * to commit all that data, but FCSM will ensure that all the requested
         * data is flushed and committed.
         */
        ctgtq.emplace(this,
                      0 /* target flush_seq */,
                      target_committed_seq_num /* target commit_seq */,
                      task,
                      done);
    }
}