void rpc_task::run_read()

in turbonfs/src/rpc_task.cpp [3382:3776]


void rpc_task::run_read()
{
    const fuse_ino_t ino = rpc_api->read_task.get_ino();
    struct nfs_inode *inode = get_client()->get_nfs_inode_from_ino(ino);
    int64_t cfsize, sfsize;

    // run_read() must be called only for an fe task.
    assert(rpc_api->read_task.is_fe());

    assert(inode->is_regfile());
    /*
     * aznfsc_ll_read() can only be called after aznfsc_ll_open() so filecache
     * and readahead state must have been allocated when we reach here.
     */
    assert(inode->has_filecache());
    assert(inode->has_rastate());

    std::shared_ptr<bytes_chunk_cache>& filecache_handle =
        inode->get_filecache();

    /*
     * run_read() is called once for a fuse read request and must not be
     * called for a child task.
     */
    assert(rpc_api->parent_task == nullptr);

    /*
     * Get server and effective file size estimates.
     * We use these to find holes that we should zero fill.
     */
    inode->get_file_sizes(cfsize, sfsize);

    /*
     * get_client_file_size() returns a fairly recent estimate of the file
     * size taking into account cto consistency, attribute cache timeout, etc,
     * so it's a size we can "trust". If READ offset is more than this size
     * then we can avoid sending the call to the server and generate eof
     * locally. This is an optimization that saves an extra READ call to the
     * server. The server will correctly return 0+eof for this READ call so we
     * are functionally correct, barring the small possibility that the file
     * can grow and if we send the READ to the server we will get the new data,
     * but this is permissible with our weak/cto cache consistency guarantees.
     *
     * TODO: Shall we put it behind a config?
     */
    if (cfsize != -1) {
        if (rpc_api->read_task.get_offset() >= cfsize) {
            /*
             * Entire request lies beyond cfsize.
             */
            AZLogDebug("[{}] Read returning 0 bytes (eof) as requested "
                       "offset ({}) >= file size ({})",
                       ino, rpc_api->read_task.get_offset(), cfsize);
            INC_GBL_STATS(zero_reads, 1);
            reply_iov(nullptr, 0);
            return;
        } else if ((rpc_api->read_task.get_offset() +
                    rpc_api->read_task.get_size()) > (uint64_t) cfsize) {
            /*
             * Request extends beyond cfsize, trim it so that we don't have
             * to worry about excluding it later.
             */
            const uint64_t trim_len =
                (rpc_api->read_task.get_offset() +
                 rpc_api->read_task.get_size()) - cfsize;
            const uint64_t trimmed_size =
                rpc_api->read_task.get_size() - trim_len;
            assert(trimmed_size < rpc_api->read_task.get_size());

            AZLogDebug("[{}] Trimming application read beyond cfsize: {} "
                       "({} -> {})",
                       ino, cfsize, rpc_api->read_task.get_size(), trimmed_size);

            rpc_api->read_task.set_size(trimmed_size);
        }
    }

    /*
     * Get bytes_chunks covering the region caller wants to read.
     * The bytes_chunks returned could be any mix of old (already cached) or
     * new (cache allocated but yet to be read from blob). Note that reads
     * don't need any special protection. The caller just wants to read the
     * current contents of the blob, these can change immediately after or
     * even while we are reading, resulting in any mix of old or new data.
     */
    assert(bc_vec.empty());
    bc_vec = filecache_handle->get(
                       rpc_api->read_task.get_offset(),
                       rpc_api->read_task.get_size());

    /*
     * inode->in_ra_window() (called from inline_prune()) considers anything
     * before max_byte_read as non-useful and can purge that from the cache.
     * Now that we have called get() we will have an inuse count on any
     * membuf(s) used by this read, so they won't be purged even if
     * in_ra_window() suggests that, so we can safely update max_byte_read now.
     * We don't want to wait for read_callback() to update max_byte_read, as
     * that would mean many readers may be waiting to be read and we won't
     * perform sufficient readahead.
     */
    inode->get_rastate()->on_application_read(
            rpc_api->read_task.get_offset(),
            rpc_api->read_task.get_size());

    /*
     * send_read_response() will later convey this read completion to fuse
     * using fuse_reply_iov() which can send max FUSE_REPLY_IOV_MAX_COUNT
     * vector elements.
     */
    const size_t size = std::min((int) bc_vec.size(), FUSE_REPLY_IOV_MAX_COUNT);
    assert(size > 0 && size <= FUSE_REPLY_IOV_MAX_COUNT);

    // There should not be any reads running for this RPC task initially.
    assert(num_ongoing_backend_reads == 0);

    AZLogDebug("[{}] run_read: offset {}, size: {}, chunks: {}{}, "
               "sfsize: {}, cfsize: {}, csfsize: {}",
               ino,
               rpc_api->read_task.get_offset(),
               rpc_api->read_task.get_size(),
               bc_vec.size(),
               size != bc_vec.size() ? " (capped at 1023)" : "",
               sfsize, cfsize, inode->get_cached_filesize());

    /*
     * Now go through the byte chunk vector to see if the chunks are
     * uptodate. Uptodate chunks already have the data we need, while non
     * uptodate chunks need to be populated with data. These will mostly be
     * READ from the server, with certain exceptions. See below.
     * Chunks that are READ from the server are issued in parallel. Once all
     * chunks are uptodate we can complete the read to the caller.
     *
     * Here are the rules that we follow to decide if a non-uptodate chunk
     * must be READ from the server.
     * - (bc.offset + bc.length) > get_client_file_size()
     *   These MUST NOT occur as we trim the read request before making the
     *   bytes_chunk_cache::get() call. We assert for these.
     * - bc.offset < get_server_file_size()
     *   Part or whole of the bc need to be read from the server, so we MUST
     *   issue READ to the server for such bcs. If part of the bc lies after
     *   the server file size, read_callback() will fill 0s for that part.
     *   This is correct as that part corresponds to hole in our cache.
     * - bc.offset >= get_server_file_size()
     *   Server does not know about these so we MUST NOT ask the server about
     *   these. Given that the bc is non-uptodate this means our cache also
     *   doesn't have it, which means these correspond to holes in our cache
     *   and we MUST return 0s for these bcs.
     *
     * Note that we bump num_ongoing_backend_reads by 1 before issuing
     * the first backend read. This is done to make sure if read_callback()
     * is called before we could issues all reads, we don't mistake it for
     * "all issued reads have completed". It is ok to update this without a lock
     * since this is the only thread at this point which will access this.
     *
     * Note: Membufs which are found uptodate here shouldn't suddenly become
     *       non-uptodate when the other reads complete, o/w we have a problem.
     *       An uptodate membuf doesn't become non-uptodate but it can be
     *       written by some writer thread, while we are waiting for other
     *       chunk(s) to be read from backend or even while we are reading
     *       them while sending the READ response.
     */

    [[maybe_unused]] size_t total_length = 0;
    bool found_in_cache = true;

    num_ongoing_backend_reads = 1;

    for (size_t i = 0; i < bc_vec.size(); i++) {
        /*
         * Every bytes_chunk returned by get() must have its inuse count
         * bumped. Also they must have pvt set to the initial value of 0
         * and num_backend_calls_issued set to initial value of 0.
         */
        assert(bc_vec[i].get_membuf()->is_inuse());
        assert(bc_vec[i].pvt == 0);
        assert(bc_vec[i].num_backend_calls_issued == 0);
        // We trim the read size to not exceed cfsize.
        assert((cfsize == -1) ||
               ((int64_t) (bc_vec[i].offset + bc_vec[i].length) <= cfsize));

        total_length += bc_vec[i].length;

        if (i >= size) {
            AZLogDebug("[{}] Skipping read beyond vector count {}, "
                       "offset: {}, length: {}",
                       ino, size,
                       bc_vec[i].offset,
                       bc_vec[i].length);
            assert(size == FUSE_REPLY_IOV_MAX_COUNT);
            bc_vec[i].get_membuf()->clear_inuse();
            continue;
        }

        if (!bc_vec[i].get_membuf()->is_uptodate()) {
            /*
             * Now we are going to call read_from_server() which will issue
             * an NFS read that will read the data from the NFS server and
             * update the buffer. Grab the membuf lock, this will be unlocked
             * in read_callback() once the data has been read into the buffer
             * and it's marked uptodate.
             *
             * Note: This will block till the lock is obtained.
             */
            bc_vec[i].get_membuf()->set_locked();

            // Check if the buffer got updated by the time we got the lock.
            if (bc_vec[i].get_membuf()->is_uptodate()) {
                /*
                 * Release the lock since we no longer intend on writing
                 * to this buffer.
                 */
                bc_vec[i].get_membuf()->clear_locked();
                bc_vec[i].get_membuf()->clear_inuse();

                /*
                 * Set "bytes read" to "bytes requested" since the data is read
                 * from the cache.
                 */
                bc_vec[i].pvt = bc_vec[i].length;

                INC_GBL_STATS(bytes_read_from_cache, bc_vec[i].length);

                AZLogDebug("[{}] Data read from cache. offset: {}, length: {}",
                           ino, bc_vec[i].offset, bc_vec[i].length);

#ifdef RELEASE_CHUNK_AFTER_APPLICATION_READ
                /*
                 * Since the data is read from the cache, the chances of reading
                 * it again from cache is negligible since this is a sequential
                 * read pattern.
                 * Free such chunks to reduce the memory utilization.
                 */
                filecache_handle->release(bc_vec[i].offset, bc_vec[i].length);
#endif
                continue;
            }

            /*
             * Ok, non-uptodate buffer, see if we should read from server or
             * return 0s. We read from server only if at least one byte from
             * the bc can be served from the server, else it's the case of
             * unmapped chunk within the cached portion (case of sparse cache
             * due to sparse writes beyond file size) which should correspond
             * with holes aka 0s.
             */
            const bool read_from_server =
                ((sfsize == -1) || (int64_t) bc_vec[i].offset < sfsize);

            if (!read_from_server) {
                AZLogDebug("[{}] Hole in cache. offset: {}, length: {}",
                           ino, bc_vec[i].offset, bc_vec[i].length);

                /*
                 * Set "bytes read" to "bytes requested" since we provide all
                 * the data as 0s.
                 */
                bc_vec[i].pvt = bc_vec[i].length;
                ::memset(bc_vec[i].get_buffer(), 0, bc_vec[i].length);

                bc_vec[i].get_membuf()->set_uptodate();
                bc_vec[i].get_membuf()->clear_locked();
                bc_vec[i].get_membuf()->clear_inuse();

                INC_GBL_STATS(bytes_zeroed_from_cache, bc_vec[i].length);
#ifdef RELEASE_CHUNK_AFTER_APPLICATION_READ
                filecache_handle->release(bc_vec[i].offset, bc_vec[i].length);
#endif
                continue;
            }

            found_in_cache = false;

            /*
             * TODO: If we have just 1 bytes_chunk to fill, which is the most
             *       common case, avoid creating child task and process
             *       everything in this same task.
             *       Also for contiguous reads use the libnfs vectored read API.
             */

            /*
             * Create a child rpc task to issue the read RPC to the backend.
             */
            struct rpc_task *child_tsk =
                get_client()->get_rpc_task_helper()->alloc_rpc_task_reserved(FUSE_READ);

            child_tsk->init_read_be(
                rpc_api->read_task.get_ino(),
                bc_vec[i].length,
                bc_vec[i].offset);

            // Set the parent task of the child to the current RPC task.
            child_tsk->rpc_api->parent_task = this;

            /*
             * Set "bytes read" to 0 and this will be updated as data is read,
             * likely in partial read calls. So at any time bc.pvt will be the
             * total data read.
             */
            bc_vec[i].pvt = 0;

            // Set the byte chunk that this child task is incharge of updating.
            child_tsk->rpc_api->bc = &bc_vec[i];

            /*
             * Child task should always read a subset of the parent task.
             */
            assert(child_tsk->rpc_api->read_task.get_offset() >=
                   rpc_api->read_task.get_offset());
            assert(child_tsk->rpc_api->read_task.get_size() <=
                   rpc_api->read_task.get_size());

            child_tsk->read_from_server(bc_vec[i]);
        } else {
            AZLogDebug("[{}] Data read from cache. offset: {}, length: {}",
                       ino, bc_vec[i].offset, bc_vec[i].length);

            bc_vec[i].get_membuf()->clear_inuse();

            /*
             * Set "bytes read" to "bytes requested" since the data is read
             * from the cache.
             */
            bc_vec[i].pvt = bc_vec[i].length;

            INC_GBL_STATS(bytes_read_from_cache, bc_vec[i].length);

#ifdef RELEASE_CHUNK_AFTER_APPLICATION_READ
            /*
             * Data read from cache. For the most common sequential read
             * pattern this cached data won't be needed again, release
             * it promptly to ease memory pressure.
             * Note that this is just a suggestion to release the buffer.
             * The buffer may not be released if it's in use by any other
             * user.
             */
            const uint64_t released =
                filecache_handle->release(bc_vec[i].offset, bc_vec[i].length);
            assert(released <= bc_vec[i].length);

            /*
             * If we could not release this buffer, try to release all cached
             * data upto release_till, which is a safe offset returned by the
             * readahead state machine.
             */
            if (released != bc_vec[i].length) {
                const uint64_t release_till =
                    inode->get_rastate()->release_till();
                if (release_till != 0) {
                    filecache_handle->release(0, release_till);
                }
            }
#endif
        }
    }

    if (bc_vec.size() > FUSE_REPLY_IOV_MAX_COUNT) {
        bc_vec.resize(FUSE_REPLY_IOV_MAX_COUNT);
    }

    // get() must return bytes_chunks exactly covering the requested range.
    assert(total_length == rpc_api->read_task.get_size());

    /*
     * Decrement the read ref incremented above.
     * Each completing child task will also update the parent task's
     * num_ongoing_backend_reads, so we check for that.
     */
    assert(num_ongoing_backend_reads >= 1);
    if (--num_ongoing_backend_reads != 0) {
        assert(!found_in_cache);
        /*
         * Not all backend reads have completed yet. When the last backend
         * read completes read_callback() will arrange to send the read
         * response to fuse.
         * This is the more common case as backend READs will take time to
         * complete.
         */
        return;
    }

    /*
     * Either no chunk needed backend read (likely) or all backend reads issued
     * above completed (unlikely).
     */

    if (found_in_cache) {
        AZLogDebug("[{}] Data read from cache, offset: {}, size: {}",
                   ino,
                   rpc_api->read_task.get_offset(),
                   rpc_api->read_task.get_size());
    }

    // Send the response.
    send_read_response();
}