int nfs_inode::copy_to_cache()

in turbonfs/src/nfs_inode.cpp [947:1131]


int nfs_inode::copy_to_cache(const struct fuse_bufvec* bufv,
                             off_t offset,
                             uint64_t *extent_left,
                             uint64_t *extent_right)
{
    /*
     * XXX We currently only handle bufv with count=1.
     *     Ref aznfsc_ll_write_buf().
     */
    assert(bufv->count == 1);

    /*
     * copy_to_cache() must be called only for a regular file and it must have
     * filecache initialized.
     */
    assert(is_regfile());
    assert(has_filecache());
    assert(offset < (off_t) AZNFSC_MAX_FILE_SIZE);

    assert(bufv->idx < bufv->count);
    const size_t length = bufv->buf[bufv->idx].size - bufv->off;
    assert((int) length >= 0);
    assert((offset + length) <= AZNFSC_MAX_FILE_SIZE);
    /*
     * TODO: Investigate using splice for zero copy.
     */
    const char *buf = (char *) bufv->buf[bufv->idx].mem + bufv->off;
    int err = 0;
    bool inject_eagain = false;

    /*
     * Get bytes_chunk(s) covering the range [offset, offset+length).
     * We need to copy application data to those.
     */
    std::vector<bytes_chunk> bc_vec =
        filecache_handle->getx(offset, length, extent_left, extent_right);

    size_t remaining = length;

    for (auto& bc : bc_vec) {
        struct membuf *mb = bc.get_membuf();
#ifdef ENABLE_PARANOID
        bool found_not_uptodate = false;

        if (!err && inject_error()) {
            err = EAGAIN;
            AZLogWarn("[{}] PP: copy_to_cache(): injecting EAGAIN for membuf "
                      "[{}, {}) (bc [{}, {})), length={}, remaining={}",
                      ino, mb->offset.load(), mb->offset.load()+mb->length.load(),
                      bc.offset, bc.offset+bc.length,
                      length, remaining);
        }
#endif

        /*
         * If we have already failed with EAGAIN, just drain the bc_vec
         * clearing the inuse count for all the bytes_chunk.
         *
         * TODO: If we have copied at least one byte, do not fail but instead
         *       let the caller know that we copied ledd.
         */
        if (err == EAGAIN) {
            mb->clear_inuse();
            assert(remaining >= bc.length);
            remaining -= bc.length;
            continue;
        }

        /*
         * Lock the membuf while we copy application data into it.
         */
        mb->set_locked();

        /*
         * If we own the full membuf we can safely copy to it, also if the
         * membuf is uptodate we can safely copy to it. In both cases the
         * membuf remains uptodate after the copy.
         *
         * TODO: We need to handle the case where application writes on
         *       the file range in commit_pending state.
         */
try_copy:
        if ((bc.maps_full_membuf() || mb->is_uptodate()) &&
            !mb->is_commit_pending()) {

            assert(bc.length <= remaining);
            ::memcpy(bc.get_buffer(), buf, bc.length);
            mb->set_uptodate();
            mb->set_dirty();

            // Update file size in inode'c cached attr.
            on_cached_write(bc.offset, bc.length);
        } else {
#ifdef ENABLE_PARANOID
            /*
             * Once we find the membuf uptodate, after waiting, and run
             * try_copy again, we must not find the membuf not-uptodate
             * again.
             */
            assert(!found_not_uptodate);
            found_not_uptodate = true;
#endif

            /*
             * bc refers to part of the membuf and membuf is not uptodate.
             * This can happen if our bytes_chunk_cache::get() call raced with
             * some other thread and they requested a bigger bytes_chunk than
             * us. The original bytes_chunk was allocated per their request
             * and our request was smaller one that fitted completely within
             * their request and and hence we were given the same membuf,
             * albeit a smaller bytes_chunk. Now both the threads would next
             * try to lock the membuf to perform their corresponding IO, this
             * time we won the race and hence when we look at the membuf it's
             * a partial one and not uptodate. Since membuf is not uptodate
             * we will need to do a read-modify-write operation to correctly
             * update part of the membuf. Since we know that some other thread
             * is waiting to perform IO on the entire membuf, we simply let
             * that thread proceed with its IO. Once it's done the membuf will
             * be uptodate and then we can perform the simple copy.
             * We wait for 50 msecs after releasing the lock to let the other
             * thread get the lock. Once it gets the lock it'll only release
             * it after it performs the IO. So, after we reacquire the lock
             * if the membuf is not uptodate it implies that the other thread
             * wasn't able to mark the membuf uptodate. In this case we need
             * to get fresh bytes_chunk vector and re-do the copy.
             */
            const uint64_t rand_ms = random_number(1, 50);
            AZLogWarn("[{}] Waiting for membuf [{}, {}) (bc [{}, {})) to "
                      "become uptodate, dropping lock for {} msecs", ino,
                      mb->offset.load(), mb->offset.load()+mb->length.load(),
                      bc.offset, bc.offset+bc.length, rand_ms);

            mb->clear_locked();
            ::usleep(rand_ms * 1000);
            mb->set_locked();

#ifdef ENABLE_PARANOID
            inject_eagain = inject_error();
#endif

            if (mb->is_uptodate() && !inject_eagain) {
                AZLogWarn("[{}] Membuf [{}, {}) (bc [{}, {})) is now uptodate, "
                          "retrying copy", ino,
                          mb->offset.load(), mb->offset.load()+mb->length.load(),
                          bc.offset, bc.offset+bc.length);
                goto try_copy;
            } else {
                AZLogWarn("[{}] {}Membuf [{}, {}) (bc [{}, {})) not marked "
                          "uptodate by other thread, returning EAGAIN",
                          ino, inject_eagain ? "PP: " : "",
                          mb->offset.load(), mb->offset.load()+mb->length.load(),
                          bc.offset, bc.offset+bc.length);
                assert(err == 0);
                err = EAGAIN;

                /*
                 * Release the membuf before returning, so that when the caller
                 * calls us again we get a new "full" membuf not this partial
                 * membuf again, else we will be stuck in a loop.
                 * We need to drop the inuse count for release() to work, then
                 * re-acquire it for subsequent code to work.
                 */
                mb->clear_inuse();
                filecache_handle->release(mb->offset, mb->length);
                mb->set_inuse();
            }
        }

        /*
         * Done with the copy, release the membuf lock and clear inuse.
         * The membuf is marked dirty so it's safe against cache prune/release.
         * When we decide to flush this dirty membuf that time it'll be duly
         * locked.
         */
        mb->clear_locked();
        mb->clear_inuse();

        buf += bc.length;
        assert(remaining >= bc.length);
        remaining -= bc.length;
    }

    assert(remaining == 0);
    return err;
}