void fcsm::on_commit_complete()

in turbonfs/src/fcsm.cpp [878:1082]


void fcsm::on_commit_complete(uint64_t commit_bytes)
{
    // Commit must be called only for unstable writes.
    assert(!inode->is_stable_write());

    // Must be called only for success.
    assert(inode->get_write_error() == 0);
    assert(commit_bytes > 0);

    // Must be called from flush/write callback.
    assert(fc_cb_running());

    // Commit callback can only be called if FCSM is running.
    assert(is_running());

    /*
     * Commit callback can be called only when commit is in progress, clear
     * it now. Must do it before grabbing the flush_lock, note that
     * wait_for_ongoing_commit() is waiting for commit-in-progress to be
     * cleared, with flush_lock held.
     */
    assert(inode->is_commit_in_progress());
    inode->clear_commit_in_progress();

    // If commit is running, flush cannot be running.
    assert(!inode->get_filecache()->is_flushing_in_progress());

    // commit_pending_bytes must be 0 here.
    assert(inode->get_filecache()->get_bytes_to_commit() == 0);

    // a byte can only be committed after it's flushed successfully.
    assert(committing_seq_num <= flushed_seq_num);
    assert(committing_seq_num <= flushing_seq_num);

    // Update committed_seq_num to account for the commit_bytes.
    committed_seq_num += commit_bytes;

    /*
     * When a commit completes it commits everything that has been flushed
     * till now also whatever has been scheduled for commit.
     */
    assert(flushed_seq_num == committed_seq_num);
    assert(committed_seq_num == committing_seq_num);

    AZLogDebug("[{}] [FCSM] on_commit_complete({}), Fd: {}, Fing: {}, "
               "Cd: {}, Cing: {}, Fq: {}, Cq: {}, bytes_flushing: {}",
               inode->get_fuse_ino(),
               commit_bytes,
               flushed_seq_num.load(),
               flushing_seq_num.load(),
               committed_seq_num.load(),
               committing_seq_num.load(),
               ftgtq.size(),
               ctgtq.size(),
               inode->get_filecache()->bytes_flushing.load());

    inode->flush_lock();

    /*
     * This can only come here with stable write true when
     * switch_to_stable_write() was waiting for ongoing commits to complete
     * and it went ahead and set inode stable write after we cleared the
     * commit_in_progress above.
     */
    assert(!inode->is_stable_write() || ctgtq.empty());

    /*
     * Go over all queued commit targets to see if any can be completed after
     * the latest commit completed.
     */
    while (!ctgtq.empty()) {
        struct fctgt& tgt = ctgtq.front();

        assert(tgt.fcsm == this);

        /*
         * ftgtq has commit targets in increasing order of committed_seq_num, so
         * as soon as we find one that's greater than committed_seq_num, we can
         * safely skip the rest.
         */
        if (tgt.commit_seq > committed_seq_num) {
            break;
        }

        if (tgt.task) {
            // Only one of task or done can be present.
            assert(!tgt.done);
            assert(tgt.task->magic == RPC_TASK_MAGIC);
            assert(tgt.task->get_op_type() == FUSE_WRITE);
            assert(tgt.task->rpc_api->write_task.is_fe());
            assert(tgt.task->rpc_api->write_task.get_size() > 0);

            AZLogDebug("[{}] [FCSM] completing blocking commit target: {}, "
                       "committed_seq_num: {}, write task: [{}, {})",
                       inode->get_fuse_ino(),
                       tgt.commit_seq,
                       committed_seq_num.load(),
                       tgt.task->rpc_api->write_task.get_offset(),
                       tgt.task->rpc_api->write_task.get_offset() +
                       tgt.task->rpc_api->write_task.get_size());

            tgt.task->reply_write(
                    tgt.task->rpc_api->write_task.get_size());
        } else if (tgt.done) {
            AZLogDebug("[{}] [FCSM] completing blocking commit target: {}, "
                       "committed_seq_num: {}",
                       inode->get_fuse_ino(),
                       tgt.commit_seq,
                       committed_seq_num.load());

            assert(*tgt.done == false);
            *tgt.done = true;
        } else {
            AZLogDebug("[{}] [FCSM] completing non-blocking commit target: {}, "
                       "committed_seq_num: {}",
                       inode->get_fuse_ino(),
                       tgt.commit_seq,
                       committed_seq_num.load());
        }

        // Commit target accomplished, remove from queue.
        ctgtq.pop();
    }

    /*
     * See if we have more commit targets and issue flush for the same.
     */
    if (!ftgtq.empty() || !ctgtq.empty()) {
        /*
         * If we have any commit target here it must have commit_seq greater
         * than committed_seq_num, else it would have been completed by the
         * above loop.
         * If we have any flush target it must have flush_seq greater than
         * flushed_seq_num. This is because commit would have started after
         * the flush and we would have completed all eligible flush targets.
         */
        assert(ftgtq.empty() || ftgtq.front().flush_seq > flushed_seq_num);
        assert(ctgtq.empty() || ctgtq.front().commit_seq > committed_seq_num);

        uint64_t bytes;
        std::vector<bytes_chunk> bc_vec;

        /*
         * This means we are here after switch_to_stable_write() switched to
         * stable, we need to handle that.
         */
        if (inode->is_stable_write()) {
            assert(ctgtq.empty());
            bc_vec =
                inode->get_filecache()->get_dirty_nonflushing_bcs_range(
                                                        0, UINT64_MAX, &bytes);
            // Here bc_vec can be empty, sync_membufs() can handle that.
        } else {
            bc_vec =
                inode->get_filecache()->get_contiguous_dirty_bcs(&bytes);
            /*
             * Since we have a flush target asking more data to be flushed, we
             * must have the corresponding bcs in the file cache.
             *
             * Note: We cannot have this assert for the stable write case, as
             *       sync_membufs() that called switch_to_stable_write(), might
             *       have consumed all these bcs and marked them flushing. When
             *       we come here we won't find any dirty-and-not-flushing bcs.
             */
            assert(!bc_vec.empty());
            assert(bytes > 0);
            /*
             * TODO: Handle the case where the ftgt wants us to do commit_full.
             */
        }

        // flushed_seq_num can never be more than flushing_seq_num.
        assert(flushed_seq_num <= flushing_seq_num);

        AZLogDebug("[{}] [FCSM] continuing, flushing_seq_num now: {}, "
                   "flushed_seq_num: {}",
                   inode->get_fuse_ino(),
                   flushing_seq_num.load(),
                   flushed_seq_num.load());

        // sync_membufs() will update flushing_seq_num() and mark fcsm running.
        [[maybe_unused]]
        const uint64_t prev_flushing_seq_num = flushing_seq_num;
        inode->sync_membufs(bc_vec, false /* is_flush */);
        assert(flushing_seq_num == (prev_flushing_seq_num + bytes));
    } else {
        AZLogDebug("[{}] [FCSM] idling, flushed_seq_num now: {}, "
                   "committed_seq_num: {}",
                   inode->get_fuse_ino(),
                   flushed_seq_num.load(),
                   committed_seq_num.load());

        // FCSM should not idle when there's any ongoing flush or commit.
        assert(flushing_seq_num == flushed_seq_num);
        assert(committing_seq_num == committed_seq_num);
        assert(flushed_seq_num == committed_seq_num);

        assert(!inode->get_filecache()->is_flushing_in_progress());
        assert(!inode->is_commit_in_progress());

        clear_running();
    }

    inode->flush_unlock();
}