static void read_callback()

in turbonfs/src/rpc_task.cpp [3879:4308]


static void read_callback(
    struct rpc_context *rpc,
    int rpc_status,
    void *data,
    void *private_data)
{
    struct read_context *ctx = (read_context*) private_data;
    rpc_task *task = ctx->task;
    assert(task->magic == RPC_TASK_MAGIC);
    // This must be a BE task.
    assert(task->rpc_api->read_task.is_be());

    /*
     * Parent task corresponds to the fuse request that intiated the read.
     * This will be used to complete the request to fuse.
     */
    rpc_task *parent_task = task->rpc_api->parent_task;

    /*
     * Only BR tasks can issue the read RPC, hence the callback should
     * be called only for them.
     */
    assert(parent_task != nullptr);
    assert(parent_task->magic == RPC_TASK_MAGIC);

    /*
     * num_ongoing_backend_reads is updated only for the parent task and it
     * counts how many child rpc tasks are ongoing for this parent task.
     * num_ongoing_backend_reads will always be 0 for child tasks.
     */
    assert(parent_task->num_ongoing_backend_reads > 0);
    assert(task->num_ongoing_backend_reads == 0);

    struct bytes_chunk *bc = ctx->bc;
    assert(bc->length > 0);

    // We are in the callback, so at least one backend call was issued.
    assert(bc->num_backend_calls_issued > 0);

    /*
     * If we have already finished reading the entire bytes_chunk, why are we
     * here. We must have locked the membuf and marked inuse before we issued
     * the read.
     */
    assert(bc->pvt < bc->length);
    assert(bc->get_membuf()->is_inuse());
    assert(bc->get_membuf()->is_locked());

    const char* errstr;
    auto res = (READ3res*)data;

    INJECT_JUKEBOX(res, task);

    const int status = (task->status(rpc_status, NFS_STATUS(res), &errstr));
    fuse_ino_t ino = task->rpc_api->read_task.get_ino();
    struct nfs_inode *inode = task->get_client()->get_nfs_inode_from_ino(ino);

    /*
     * Applications can only issue reads on an open fd and we ensure filecache
     * is created on file open.
     */
    assert(inode->has_filecache());
    auto filecache_handle = inode->get_filecache();
    /*
     * read_callback() must only be called for read done from fuse for which
     * we must have allocated the cache.
     */
    assert(filecache_handle);
    const uint64_t issued_offset = bc->offset + bc->pvt;
    const uint64_t issued_length = bc->length - bc->pvt;

    /*
     * It is okay to free the context here as we do not access it after this
     * point.
     */
    delete ctx;

    /*
     * Now that the request has completed, we can query libnfs for the
     * dispatch time.
     */
    task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUSX(rpc_status, res));

    if (status == 0) {
        /*
         * Post-op attributes are optional, if server returns then
         * update the inode attribute cache.
         */
        UPDATE_INODE_ATTR(inode, res->READ3res_u.resok.file_attributes);

        /*
         * Reads are counted in the callback as that's when the read
         * responses have just come on the wire.
         */
        INC_GBL_STATS(server_bytes_read, res->READ3res_u.resok.count);
        INC_GBL_STATS(server_read_reqs, 1);

#ifdef ENABLE_PRESSURE_POINTS
        /*
         * Short read pressure point, skip when eof received.
         */
        if (inject_error() && !res->READ3res_u.resok.eof) {
            // Set read size to a random percent of the actual size.
            const uint32_t pct = random_number(1, 100);
            const uint32_t adj_size =
                std::max((res->READ3res_u.resok.count * pct) / 100, 1U);
            assert(adj_size <= res->READ3res_u.resok.count);
            AZLogWarn("[{}] PP: short read {} -> {}",
                      ino, res->READ3res_u.resok.count, adj_size);
            res->READ3res_u.resok.count = adj_size;
        }
#endif

        assert((bc->pvt == 0) || (bc->num_backend_calls_issued > 1));

        // We should never get more data than what we requested.
        assert(res->READ3res_u.resok.count <= issued_length);

        const bool is_partial_read = !res->READ3res_u.resok.eof &&
            (res->READ3res_u.resok.count < issued_length);

        // Update bc->pvt with fresh bytes read in this call.
        bc->pvt += res->READ3res_u.resok.count;
        assert(bc->pvt <= bc->length);

        AZLogDebug("[{}] <{}> read_callback: {}Read completed for [{}, {}), "
                   "Bytes read: {} eof: {}, total bytes read till "
                   "now: {} of {} for [{}, {}) num_backend_calls_issued: {}",
                   ino, task->issuing_tid,
                   is_partial_read ? "Partial " : "",
                   issued_offset,
                   issued_offset + issued_length,
                   res->READ3res_u.resok.count,
                   res->READ3res_u.resok.eof,
                   bc->pvt,
                   bc->length,
                   bc->offset,
                   bc->offset + bc->length,
                   bc->num_backend_calls_issued);

        /*
         * There's a special case that we need to handle.
         * Since we support sparse writes (beyond the server file size) we can
         * have a case where we have a bc which spans the server file size
         * boundary. This will happen when inode->get_server_file_size() is
         * less than inode->get_cached_filesize() and the portion of the file
         * immediately after server file size is not cached. For such a bc we
         * issue the READ call to the server and we expect the server to return
         * partial bc data + eof. Now we should set rest of the bc to 0s as it
         * corresponds to hole in the file.
         */
        const uint64_t csfsize = inode->get_cached_filesize();
        if (res->READ3res_u.resok.eof &&
            (res->READ3res_u.resok.count < issued_length) &&
            ((bc->offset + bc->pvt) < csfsize)) {
            void *const zb = bc->get_buffer() + bc->pvt;
            const int64_t zb_len =
                std::min(csfsize, bc->offset + bc->length) - (bc->offset + bc->pvt);

            AZLogDebug("[{}] <{}> read_callback: bc [{}, {}) spans across "
                       "server file size boundary, filling remaining {} bytes "
                       "@ offset {}, with 0s. cfsize: {}, sfsize: {}, csfsize: {}",
                       ino, task->issuing_tid,
                       issued_offset,
                       issued_offset + issued_length,
                       zb_len, bc->offset + bc->pvt,
                       inode->get_client_file_size(),
                       inode->get_server_file_size(),
                       csfsize);

            assert(zb_len > 0);
            ::memset(zb, 0, zb_len);
            // Added zb_len more bytes to bc.
            bc->pvt += zb_len;
            assert(bc->pvt <= bc->length);
        }

        /*
         * In case of partial read, issue read for the remaining.
         */
        if (is_partial_read) {
            const off_t new_offset = bc->offset + bc->pvt;
            const size_t new_size = bc->length - bc->pvt;

            // Create a new child task to carry out this request.
            struct rpc_task *child_tsk =
                task->get_client()->get_rpc_task_helper()->alloc_rpc_task_reserved(FUSE_READ);

            child_tsk->init_read_be(
                task->rpc_api->read_task.get_ino(),
                new_size,
                new_offset);

            /*
             * Set the parent task of the child to the parent of the
             * current RPC task. This is required if the current task itself
             * is one of the child tasks running part of the fuse read request.
             */
            child_tsk->rpc_api->parent_task = parent_task;

            /*
             * Child task must continue to fill the same bc.
             */
            child_tsk->rpc_api->bc = bc;

            /*
             * TODO: To avoid allocating a new read_context we can reuse the
             *       existing contest but we have to update the task member.
             */
            struct read_context *new_ctx = new read_context(child_tsk, bc);
            bool rpc_retry;
            READ3args new_args;

            new_args.file = inode->get_fh();
            new_args.offset = new_offset;
            new_args.count = new_size;

            // One more backend call issued to fill this bc.
            bc->num_backend_calls_issued++;

            AZLogDebug("[{}] Issuing partial read at offset: {} size: {}"
                       " for [{}, {})",
                       ino,
                       new_offset,
                       new_size,
                       bc->offset,
                       bc->offset + bc->length);

            do {
                /*
                 * We have identified partial read case where the
                 * server has returned fewer bytes than requested.
                 * Fuse cannot accept fewer bytes than requested,
                 * unless it's an eof or error.
                 * Hence we will issue read for the remaining.
                 *
                 * Note: It is okay to issue a read call directly here
                 *       as we are holding all the needed locks and refs.
                 */
                rpc_retry = false;
                child_tsk->get_stats().on_rpc_issue();
                if (rpc_nfs3_read_task(
                        child_tsk->get_rpc_ctx(),
                        read_callback,
                        bc->get_buffer() + bc->pvt,
                        new_size,
                        &new_args,
                        (void *) new_ctx) == NULL) {
                    child_tsk->get_stats().on_rpc_cancel();
                    /*
                     * Most common reason for this is memory allocation failure,
                     * hence wait for some time before retrying. Also block the
                     * current thread as we really want to slow down things.
                     *
                     * TODO: For soft mount should we fail this?
                     */
                    rpc_retry = true;

                    AZLogWarn("rpc_nfs3_read_task failed to issue, retrying "
                              "after 5 secs!");
                    ::sleep(5);
                }
            } while (rpc_retry);

            // Free the current RPC task as it has done its bit.
            task->free_rpc_task();

            /*
             * Return from the callback here. The rest of the callback
             * will be processed once this partial read completes.
             */
            return;
        }

        /*
         * We should never return lesser bytes to the fuse than requested,
         * unless error or eof is encountered after this point.
         */
        assert((bc->length == bc->pvt) || res->READ3res_u.resok.eof);

        if (bc->maps_full_membuf() && (bc->length == bc->pvt)) {
            /*
             * If this bc maps the entire chunkmap bytes_chunk and we have
             * read the entire range represented by this bc, then we can
             * mark it uptodate.
             */
#ifdef ENABLE_PRESSURE_POINTS
            if (inject_error()) {
                AZLogDebug("[{}] PP: Not setting uptodate flag for membuf "
                           "[{}, {})",
                           ino, bc->offset, bc->offset + bc->length);
            } else
#endif
            {
                AZLogDebug("[{}] Setting uptodate flag for membuf [{}, {})",
                           ino, bc->offset, bc->offset + bc->length);

                bc->get_membuf()->set_uptodate();
            }
        } else {
            bool set_uptodate = false;

            /*
             * If we got eof in a partial read, release the non-existent
             * portion of the chunk.
             */
            if (bc->maps_full_membuf() && (bc->length > bc->pvt) &&
                res->READ3res_u.resok.eof) {
                assert(res->READ3res_u.resok.count < issued_length);

                /*
                 * We need to clear the inuse count held by this thread, else
                 * release() will not be able to release. We drop and then
                 * promptly grab the inuse count after the release(), so that
                 * set_uptodate() can be called.
                 */
                bc->get_membuf()->clear_inuse();
                const uint64_t released_bytes =
                    filecache_handle->release(bc->offset + bc->pvt,
                                              bc->length - bc->pvt);
                bc->get_membuf()->set_inuse();

                /*
                 * If we are able to successfully release all the extra bytes
                 * from the bytes_chunk, that means there's no other thread
                 * actively performing IOs to the underlying membuf, so we can
                 * mark it uptodate as future readers will get the trimmed
                 * membuf which *is* uptodate.
                 */
                assert(released_bytes <= (bc->length - bc->pvt));
                if (released_bytes == (bc->length - bc->pvt)) {
                    AZLogDebug("[{}] Setting uptodate flag for membuf [{}, {}) "
                               "after read hit eof, requested [{}, {}), "
                               "got [{}, {})",
                               ino,
                               bc->offset, bc->offset + bc->length,
                               issued_offset,
                               issued_offset + issued_length,
                               issued_offset,
                               issued_offset + res->READ3res_u.resok.count);
                    bc->get_membuf()->set_uptodate();
                    set_uptodate = true;
                }
            }

            if (!set_uptodate) {
                AZLogDebug("[{}] Not setting uptodate flag for membuf "
                           "[{}, {}), maps_full_membuf={}, is_new={}, "
                           "bc->length={}, bc->pvt={}",
                           ino, bc->offset, bc->offset + bc->length,
                           bc->maps_full_membuf(), bc->is_new, bc->length,
                           bc->pvt);
            }
        }
    } else if (NFS_STATUS(res) == NFS3ERR_JUKEBOX) {
        task->get_client()->jukebox_retry(task);

        /*
         * Note: The lock on the membuf will be held till the task is retried.
         *       This lock will be released only if the retry passes or fails
         *       with error other than NFS3ERR_JUKEBOX.
         */
        return;
    } else {
        AZLogError("[{}] <{}> Read failed for offset: {} size: {} "
                   "total bytes read till now: {} of {} for [{}, {}) "
                   "num_backend_calls_issued: {} error: {}",
                   ino, task->issuing_tid,
                   issued_offset,
                   issued_length,
                   bc->pvt,
                   bc->length,
                   bc->offset,
                   bc->offset + bc->length,
                   bc->num_backend_calls_issued,
                   errstr);
    }

    /*
    * Release the lock that we held on the membuf since the data is now
    * written to it.
    * The lock is needed only to write the data and not to just read it.
    * Hence it is safe to read this membuf even beyond this point.
    */
    bc->get_membuf()->clear_locked();
    bc->get_membuf()->clear_inuse();

#ifdef RELEASE_CHUNK_AFTER_APPLICATION_READ
    /*
    * Since we come here only for client reads, we will not cache the data,
    * hence release the chunk.
    * This can safely be done for both success and failure case.
    */
    filecache_handle->release(bc->offset, bc->length);
#endif

    // For failed status we must never mark the buffer uptodate.
    assert(!status || !bc->get_membuf()->is_uptodate());

    // Once failed, read_status remains at failed.
    int expected = 0;
    parent_task->read_status.compare_exchange_weak(expected, status);

    /*
     * Decrement the number of reads issued atomically and if it becomes zero
     * it means this is the last read completing. We send the response if all
     * the reads have completed or the read failed.
     */
    if (--parent_task->num_ongoing_backend_reads == 0) {
        /*
         * Parent task must send the read response to fuse.
         * This will also free parent_task.
         */
        parent_task->send_read_response();

        // Free the child task after sending the response.
        task->free_rpc_task();
    } else {
        AZLogDebug("No response sent, waiting for more reads to complete."
                   " num_ongoing_backend_reads: {}",
                   parent_task->num_ongoing_backend_reads.load());

        /*
         * This task has completed its part of the read, free it here.
         * When all reads complete, the parent task will be completed.
         */
        task->free_rpc_task();
        return;
    }
}