void fcsm::on_flush_complete()

in turbonfs/src/fcsm.cpp [1092:1347]


void fcsm::on_flush_complete(uint64_t flush_bytes)
{
    // Must be called only for success.
    assert(inode->get_write_error() == 0);
    assert(flush_bytes > 0);

    // Must be called from flush/write callback.
    assert(fc_cb_running());

    // See below why we cannot assert this.
#if 0
    // Flush callback can only be called if FCSM is running.
    assert(is_running);
#endif

    /*
     * Commit will only be run after current flush completes.
     * Since we are inside flush completion callback, commit cannot be
     * running yet.
     */
    assert(!inode->is_commit_in_progress());

    // a byte can only be committed after it's flushed successfully.
    assert(committing_seq_num <= flushed_seq_num);
    assert(committed_seq_num <= committing_seq_num);
    assert(committing_seq_num <= flushing_seq_num);

    // Update flushed_seq_num to account for the newly flushed bytes.
    flushed_seq_num += flush_bytes;

    // flushed_seq_num can never go more than flushing_seq_num.
    assert(flushed_seq_num <= flushing_seq_num);

    AZLogDebug("[{}] [FCSM] on_flush_complete({}), Fd: {}, Fing: {}, "
               "Cd: {}, Cing: {}, Fq: {}, Cq: {}, bytes_flushing: {}",
               inode->get_fuse_ino(),
               flush_bytes,
               flushed_seq_num.load(),
               flushing_seq_num.load(),
               committed_seq_num.load(),
               committing_seq_num.load(),
               ftgtq.size(),
               ctgtq.size(),
               inode->get_filecache()->bytes_flushing.load());

    /*
     * If this is not the last completing flush (of the multiple parallel
     * flushes that sync_membufs() may start), don't do anything.
     * Only the last completing flush checks flush targets, as we cannot
     * start a new flush or commit till the current flush completes fully.
     */
    if (inode->get_filecache()->is_flushing_in_progress()) {
        return;
    }

    inode->flush_lock();

    /*
     * Multiple libnfs (callback) threads can find is_flushing_in_progress()
     * return false. The first one to get the flush_lock, gets to run the
     * queued flush targets which includes completing the waiting tasks and/or
     * trigger pending flush/commit. Other flush callback threads which get
     * the lock after the first one, should simply return. They check for
     * one of the following conditions to avoid duplicating work:
     * 1. The first one didn't find anything to do, so it stopped the FSCM.
     * 2. The first one triggered a flush target.
     * 3. The first one triggered a commit target.
     */
    if (inode->get_filecache()->is_flushing_in_progress() ||
        inode->is_commit_in_progress() ||
        !is_running()) {
        assert(is_running() || ftgtq.empty());
        inode->flush_unlock();
        return;
    }

    /*
     * Entire flush is done and no new flush can start, so flushed_seq_num must
     * match flushing_seq_num.
     */
    assert(flushed_seq_num == flushing_seq_num);

    /*
     * Go over all queued flush targets to see if any can be completed after
     * the latest flush completed.
     */
    while (!ftgtq.empty()) {
        struct fctgt& tgt = ftgtq.front();

        assert(tgt.fcsm == this);

        /*
         * ftgtq has flush targets in increasing order of flushed_seq_num, so
         * as soon as we find one that's greater than flushed_seq_num, we can
         * safely skip the rest.
         */
        if (tgt.flush_seq > flushed_seq_num) {
            break;
        }

        if (tgt.task) {
            // Only one of task or done can be present.
            assert(!tgt.done);
            assert(tgt.task->magic == RPC_TASK_MAGIC);
            assert(tgt.task->get_op_type() == FUSE_WRITE);
            assert(tgt.task->rpc_api->write_task.is_fe());
            assert(tgt.task->rpc_api->write_task.get_size() > 0);

            AZLogDebug("[{}] [FCSM] completing blocking flush target: {}, "
                       "flushed_seq_num: {}, write task: [{}, {})",
                       inode->get_fuse_ino(),
                       tgt.flush_seq,
                       flushed_seq_num.load(),
                       tgt.task->rpc_api->write_task.get_offset(),
                       tgt.task->rpc_api->write_task.get_offset() +
                       tgt.task->rpc_api->write_task.get_size());

            tgt.task->reply_write(
                    tgt.task->rpc_api->write_task.get_size());
        } else if (tgt.done) {
            AZLogDebug("[{}] [FCSM] completing blocking flush target: {}, "
                       "flushed_seq_num: {}",
                       inode->get_fuse_ino(),
                       tgt.flush_seq,
                       flushed_seq_num.load());

            assert(*tgt.done == false);
            *tgt.done = true;
        } else {
            AZLogDebug("[{}] [FCSM] completing non-blocking flush target: {}, "
                       "flushed_seq_num: {}",
                       inode->get_fuse_ino(),
                       tgt.flush_seq,
                       flushed_seq_num.load());
        }

        // Flush target accomplished, remove from queue.
        ftgtq.pop();
    }

    /*
     * We just completed a flush. See if we have some commit targets that we
     * should trigger now. A commit target can only be triggered if we have
     * flushed all bytes till the commit target.
     * We check commit target before any other flush targets as committing
     * helps us free memory.
     */
    if (!ctgtq.empty() && (flushed_seq_num >= ctgtq.front().commit_seq)) {
        assert(!inode->is_stable_write());

        uint64_t bytes;
        std::vector<bytes_chunk> bc_vec =
            inode->get_filecache()->get_commit_pending_bcs(&bytes);
        assert(bc_vec.empty() == (bytes == 0));

        /*
         * Since we have a commit target asking more data to be committed, we
         * must have the corresponding bcs in the file cache.
         */
        assert(!bc_vec.empty());
        assert(bytes > 0);

        /*
         * commit_membufs() must increase committing_seq_num exactly by bytes,
         * as all the bcs in bc_vec should be committed.
         */
        [[maybe_unused]]
        const uint64_t prev_committing_seq_num = committing_seq_num;
        inode->commit_membufs(bc_vec);
        assert(committing_seq_num == (prev_committing_seq_num + bytes));

    } else if ((!ftgtq.empty() && (ftgtq.front().flush_seq > flushing_seq_num)) ||
               (!ctgtq.empty() && (ctgtq.front().commit_seq > flushing_seq_num))) {
       /*
        * Nothing to commit, or what we want to commit has not yet flushed
        * successfully. Do we want to flush more? We check two things:
        * 1. Is there an explicit flush target which has not yet started
        *    flushing?
        * 2. Is there an commit target which implies flushing?
        *
        * If the next flush or commit target has its flush issued, then we
        * just have to wait for that flush to complete and then we will decide
        * the next action, else issue it now.
        */
        uint64_t bytes;
        std::vector<bytes_chunk> bc_vec;
        if (inode->is_stable_write()) {
            bc_vec = inode->get_filecache()->get_dirty_nonflushing_bcs_range(
                                                                    0, UINT64_MAX,
                                                                    &bytes);
        } else {
            bc_vec = inode->get_filecache()->get_contiguous_dirty_bcs(&bytes);

            /*
             * TODO: Handle the case where the ftgt wants us to do commit_full.
             */
        }

        /*
         * Since we have a flush target asking more data to be flushed, we must
         * have the corresponding bcs in the file cache.
         */
        assert(!bc_vec.empty());
        // We should flush all the dirty data in the chunkmap.
        [[maybe_unused]]
        const uint64_t next_goal =
            std::max((ftgtq.empty() ? 0 : ftgtq.front().flush_seq),
                     (ctgtq.empty() ? 0 : ctgtq.front().commit_seq));
        assert(bytes >= (next_goal - flushing_seq_num));

        // flushed_seq_num can never be more than flushing_seq_num.
        assert(flushed_seq_num <= flushing_seq_num);

        AZLogDebug("[{}] [FCSM] continuing, flushing_seq_num now: {}, "
                   "flushed_seq_num: {}, bc_vec.size(): {}, FQ: {}, CQ: {}",
                   inode->get_fuse_ino(),
                   flushing_seq_num.load(),
                   flushed_seq_num.load(),
                   bc_vec.size(),
                   ftgtq.size(), ctgtq.size());

        // sync_membufs() will update flushing_seq_num.
        [[maybe_unused]]
        const uint64_t prev_flushing_seq_num = flushing_seq_num;
        inode->sync_membufs(bc_vec, false /* is_flush */);
        assert(flushing_seq_num == (prev_flushing_seq_num + bytes));
    } else if (ftgtq.empty() && ctgtq.empty()) {
        /*
         * No flush to issue, if we don't have any to wait for, then we can
         * stop the state machine.
         */
        AZLogDebug("[{}] [FCSM] idling, flushing_seq_num now: {}, "
                   "flushed_seq_num: {}",
                   inode->get_fuse_ino(),
                   flushing_seq_num.load(),
                   flushed_seq_num.load());

        // FCSM should not idle when there's any ongoing flush.
        assert(flushing_seq_num >= flushed_seq_num);

        /*
         * TODO: Modify flush_cache_and_wait() to also use the FCSM for
         *       performing the flush. Then we have any flush or commit
         *       only peformed by the state machine.
         */
        assert(!inode->get_filecache()->is_flushing_in_progress());
        assert(!inode->is_commit_in_progress());

        clear_running();
    } else {
        AZLogCrit("Should not reach here");
        assert(0);
    }

    inode->flush_unlock();
}