int ra_state::issue_readaheads()

in turbonfs/src/readahead.cpp [498:701]


int ra_state::issue_readaheads()
{
    int64_t ra_offset;

    /*
     * issue_readaheads() MUST only be called for open files which will have
     * the file cache initialized.
     */
    assert(inode->has_filecache());

    auto& read_cache = inode->get_filecache();
    int ra_issued = 0;

    /*
     * If userspace data cache is disabled, don't do readaheads.
     */
    if (!aznfsc_cfg.cache.data.user.enable) {
        return 0;
    }

    /*
     * Issue all readaheads allowed by this ra_state.
     */
    while ((ra_offset = get_next_ra()) > 0) {
        AZLogDebug("[{}] Issuing readahead at off: {} len: {}: ongoing: {} "
                   "sfsize: {} cfsize: {} csfsize: {} ({})",
                   inode->get_fuse_ino(), ra_offset, def_ra_size,
                   ra_ongoing.load(),
                   inode->get_server_file_size(),
                   inode->get_client_file_size(),
                   inode->get_cached_filesize(),
                   get_ra_bytes());

        /*
         * Get bytes_chunk representing the byte range we want to readahead
         * and issue READ RPCs for all.
         */
        std::vector<bytes_chunk> bcv = read_cache->get(ra_offset, def_ra_size);

        for (bytes_chunk& bc : bcv) {

            // Every bytes_chunk must lie within the readahead.
            assert(bc.offset >= (uint64_t) ra_offset);
            assert((bc.offset + bc.length) <= (ra_offset + def_ra_size));

            // get() must grab the inuse count.
            assert(bc.get_membuf()->is_inuse());

            /*
             * Before we issue READ to populate the bytes_chunk, take the
             * membuf lock. We use try_lock() and skip readahead if we don't
             * get the lock. It's ok to skip readahead rather than holding the
             * caller. Mostly if there is a single reader we will get the lock.
             * This lock will be released in the readahead_callback() after the
             * buffer is populated.
             * Note that if the membuf is already locked it means some other
             * context is already performing IO to it. We should not release
             * the buffer.
             */
            if (!bc.get_membuf()->try_lock()) {
                AZLogWarnNR("[{}] Skipping readahead at off: {} len: {}. "
                            "Could not get membuf lock!",
                            inode->get_fuse_ino(), bc.offset, bc.length);

                on_readahead_complete(bc.offset, bc.length);
                bc.get_membuf()->clear_inuse();
                continue;
            }

            /*
             * If the buffer is already uptodate, skip readahead.
             */
            if (bc.get_membuf()->is_uptodate()) {
                AZLogDebug("[{}] Skipping readahead at off: {} len: {}. "
                           "Membuf already uptodate!",
                           inode->get_fuse_ino(), bc.offset, bc.length);

                on_readahead_complete(bc.offset, bc.length);
                bc.get_membuf()->clear_locked();
                bc.get_membuf()->clear_inuse();
                continue;
            }

            /*
             * Ok, now issue READ RPCs to read this byte range.
             */
            struct rpc_task *tsk =
                client->get_rpc_task_helper()->alloc_rpc_task(FUSE_READ);

            /*
             * fuse_req is needed to send the fuse response, since we don't
             * need to send response for readahead reads, it can be null.
             * fuse_file_info is not used too.
             */
            tsk->init_read_be(inode->get_fuse_ino(),  /* ino */
                              bc.length,              /* size */
                              bc.offset);             /* offset */

            // No reads should be issued to backend at this point.
            assert(bc.num_backend_calls_issued == 0);
            bc.num_backend_calls_issued++;

            assert(bc.pvt == 0);

            /*
             * bc holds a ref on the membuf so we can safely access membuf
             * only till we have bc in the scope. In readahead_callback() we
             * need to access bc, hence we transfer ownership to the ra_context
             * object allocated below.
             */
            struct ra_context *ctx = new ra_context(tsk, bc);
            assert(ctx->bc.num_backend_calls_issued == 1);

            READ3args args;
            ::memset(&args, 0, sizeof(args));
            args.file = inode->get_fh();
            args.offset = bc.offset;
            args.count = bc.length;

            /*
             * Grab a ref on this inode so that it is not freed when the
             * readahead reads are going on. Since the fuse layer does not
             * know of this readahead operation, it is possible that the fuse
             * may release this inode soon after the application read returns.
             * We do not want to be in that state and hence grab an extra ref
             * on this inode.
             * This should be decremented in readahead_callback()
             */
            inode->incref();

            AZLogDebug("[{}] Issuing readahead read to backend at "
                       "off: {} len: {}",
                       inode->get_fuse_ino(),
                       args.offset,
                       args.count);

            /*
             * tsk->get_rpc_ctx() call below will round robin readahead
             * requests across all available connections.
             *
             * TODO: See if issuing a batch of reads over one connection
             *       before moving to the other connection helps.
             */
            tsk->get_stats().on_rpc_issue();
            if (rpc_nfs3_read_task(
                        tsk->get_rpc_ctx(),
                        readahead_callback,
                        bc.get_buffer(),
                        bc.length,
                        &args,
                        ctx) == NULL) {
                tsk->get_stats().on_rpc_cancel();
                /*
                 * This call failed due to internal issues like OOM etc
                 * and not due to an actual RPC/NFS error, anyways pretend
                 * as if we never issued this.
                 */
                AZLogWarn("[{}] Skipping readahead at off: {} len: {}. "
                          "rpc_nfs3_read_task() failed!",
                          inode->get_fuse_ino(), args.offset, args.count);

                on_readahead_complete(bc.offset, bc.length);

                bc.get_membuf()->clear_locked();
                bc.get_membuf()->clear_inuse();

                // Release the buffer since we did not fill it.
                read_cache->release(bc.offset, bc.length);

                tsk->free_rpc_task();
                delete ctx;

                // Decrement the extra ref that was taken.
                inode->decref();

                continue;
            }

            ra_issued++;

            AZLogDebug("[{}] rpc_nfs3_read_task() successfully dispatched "
                       "#{} readahead at off: {} len: {}. ",
                       inode->get_fuse_ino(),
                       ra_issued,
                       args.offset,
                       args.count);
        }
    }

    if (ra_issued == 0) {
        static std::atomic<uint64_t> num_no_readahead;

        // Log once every 1000 failed calls.
        if ((++num_no_readahead % 1000) == 0) {
            AZLogDebug("[{}] num_no_readahead={}, reason={}",
                       inode->get_fuse_ino(),
                       num_no_readahead.load(), ra_offset);
        }
    } else {
        INC_GBL_STATS(num_readhead, 1);
    }

    return ra_issued;
}