int nfs_inode::wait_for_ongoing_flush()

in turbonfs/src/nfs_inode.cpp [1138:1342]


int nfs_inode::wait_for_ongoing_flush()
{
    // Caller must call us with flush_lock held.
    assert(is_flushing);

    /*
     * MUST be called only for regular files.
     * Leave the assert to catch if fuse ever calls flush() on non-reg files.
     */
    if (!is_regfile()) {
        assert(0);
        return 0;
    }

    /*
     * If flush() is called w/o open(), there won't be any cache, skip.
     */
    if (!has_filecache()) {
        return 0;
    }

    /*
     * Flushing not in progress and no new flushing can be started as we hold
     * the flush_lock(), and callback drained.
     */
    /*
     * Stable writes do not need commit, so no commit inprogress and no pending
     * commit data.
     */
    if (is_stable_write()) {
        assert(!is_commit_in_progress());
        assert(get_filecache()->bytes_commit_pending == 0);
    }

    if (get_filecache()->is_flushing_in_progress()) {
        assert(!is_commit_in_progress());
    } else if (!is_commit_in_progress() &&
               !get_fcsm()->fc_cb_running() &&
               (get_filecache()->bytes_commit_pending == 0)) {
        /*
         * Flushing not in progress and no new flushing can be started as we hold
         * the flush_lock(), and callback drained.
         * No commit inprogress and no pending commit data, return.
         */
        AZLogDebug("[{}] No flush or commit in progress, returning", ino);
        return 0;
    }

    /*
     * We don't want to hold flush_lock while we wait for the ongoing flush to
     * complete, as this can cause deadlock as on_flush_complete() also takes
     * flush_lock. We get the current flushing bcs atomically under flush_lock,
     * and then release the flush_lock while waiting. We repeat the same till
     * there are no flushing bcs. Technically we should not have back to back
     * flushes started, but to be safe we retry few times.
     * In debug builds we can induce sleep in the write/flush callback, so we
     * need to wait enough.
     */
    int retry, err = 0;
    const int max_retry = 200;
    for (retry = 0; retry < max_retry; retry++) {
        assert(is_flushing);

        /*
         * Get the flushing bytes_chunk from the filecache handle.
         * This will grab an exclusive lock on the file cache and return the
         * list of flushing bytes_chunks at that point. Note that we can have
         * new dirty bytes_chunks created but we don't want to wait for those.
         */
        std::vector<bytes_chunk> bc_vec =
            filecache_handle->get_flushing_bc_range();

        /*
         * Nothing to flush and callback drained, job done!
         * Note that we unlock the membuf before calling on_flush_complete(),
         * so there's a window where the callback is still running while all
         * membufs have completed. We need to drain the callbacks too, else
         * we can deadlock with on_flush_complete() trying to acquire the
         * flush_lock which we would be holding and waiting. Once the current
         * callback completes, there cannot be any other callback that can run
         * as we have the flush_lock which will block any new flushes.
         */
        if (bc_vec.empty() && !get_fcsm()->fc_cb_running()) {
            assert(err == 0);
            break;
        }

        flush_unlock();

        AZLogDebug("[{}] wait_for_ongoing_flush(), attempt #{}, {} membufs, "
                   "fc_cb_count: {}",
                   ino, retry+1, bc_vec.size(), get_fcsm()->fc_cb_count());

        /*
         * Give 10ms to the callback to drain completely.
         */
        if (bc_vec.empty() && get_fcsm()->fc_cb_running()) {
            ::usleep(10 * 1000);
        }

        /*
         * Our caller expects us to return only after the flush completes.
         * Wait for all the membufs to flush and get result back.
         */
        for (bytes_chunk &bc : bc_vec) {
            struct membuf *mb = bc.get_membuf();

            assert(mb != nullptr);
            assert(mb->is_inuse());

            /*
             * sync_membufs() would have taken the membuf lock for the duration
             * of the backend wite that flushes the membuf, so once we get the
             * lock we know that the flush write has completed.
             */
            mb->set_locked();

            /*
             * If still dirty after we get the lock, it may mean two things:
             * - Write failed.
             * - Some other thread got the lock before us and it made the
             *   membuf dirty again.
             */
            if (mb->is_dirty() && get_write_error()) {
                AZLogError("[{}] Flush [{}, {}) failed with error: {}",
                        ino,
                        bc.offset, bc.offset + bc.length,
                        get_write_error());
            }

            mb->clear_locked();
            mb->clear_inuse();

            /*
             * Release the bytes_chunk back to the filecache.
             * These bytes_chunks are not needed anymore as the flush is done.
             *
             * Note: We come here for bytes_chunks which were found dirty by the
             *       above loop. These writes may or may not have been issued by
             *       us (if not issued by us it was because some other thread,
             *       mostly the writer issued the write so we found it flushing
             *       and hence didn't issue). In any case since we have an inuse
             *       count, release() called from write_callback() would not
             *       have released it, so we need to release it now.
             */
            filecache_handle->release(bc.offset, bc.length);
        }

        // Re-grab flush_lock, now that the wait is over.
        flush_lock();

        err = get_write_error();

        if (err != 0) {
            AZLogDebug("[{}] wait_for_ongoing_flush() failed with error: {}",
                    ino, err);
            break;
        }
    }

    if (retry == max_retry) {
        err = EINPROGRESS;
        AZLogError("[{}] wait_for_ongoing_flush(), failed after {} retries!",
                   ino, retry);
        assert(0);
    } else if (err == 0) {
        AZLogDebug("[{}] wait_for_ongoing_flush(), succeeded after {} "
                   "retry(s)!", ino, retry);
    }

    /*
     * We should leave with flush_lock held and flush callback drained.
     */
    assert(is_flushing);
    assert(!get_fcsm()->fc_cb_running());

    /*
     * Unstable write case, we need to wait for the commit to complete.
     */
    if (get_filecache()->get_bytes_to_commit() > 0) {
        assert(!is_stable_write());

        if (!is_commit_in_progress()) {
            uint64_t bytes = 0;
            std::vector<bytes_chunk> bc_vec =
                get_filecache()->get_commit_pending_bcs(&bytes);
            assert(bc_vec.empty() == (bytes == 0));
            assert(bytes > 0);

            /*
             * Issue the commit RPC to commit the pending data.
             */
            commit_membufs(bc_vec);
        }
    }

    if (is_commit_in_progress()) {
        wait_for_ongoing_commit();
    }

    assert(!is_commit_in_progress());
    assert(!get_filecache()->is_flushing_in_progress());

    return get_write_error();
}