static void readahead_callback()

in turbonfs/src/readahead.cpp [122:422]


static void readahead_callback (
    struct rpc_context *rpc,
    int rpc_status,
    void *data,
    void *private_data)
{
    struct ra_context *ctx = (struct ra_context*) private_data;
    struct rpc_task *task = ctx->task;
    struct bytes_chunk *bc = &ctx->bc;
    auto res = (READ3res*) data;
    const char *errstr = nullptr;

    assert(task->magic == RPC_TASK_MAGIC);
    assert(bc->length > 0);
    assert(task->rpc_api->read_task.get_offset() >= (off_t) bc->offset);
    assert(task->rpc_api->read_task.get_size() <= bc->length);

    // Cannot have read more than requested.
    assert(res->READ3res_u.resok.count <= bc->length);

    /*
     * This callback would be called for some backend call that we must have
     * issued.
     */
    assert(bc->num_backend_calls_issued >= 1);

    /*
     * If we have already finished reading the entire bytes_chunk, why are we
     * here.
     */
    assert(bc->pvt < bc->length);

    const int status = task->status(rpc_status, NFS_STATUS(res), &errstr);
    const fuse_ino_t ino = task->rpc_api->read_task.get_ino();
    struct nfs_inode *inode = task->get_client()->get_nfs_inode_from_ino(ino);

    assert(inode->has_filecache());
    const auto read_cache = inode->get_filecache();

    assert(read_cache != nullptr);
    assert(ino == inode->get_fuse_ino());

    /*
     * Now that the request has completed, we can query libnfs for the
     * dispatch time.
     */
    task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUS(res));

    /*
     * Offset and length for the actual read request for which this callback
     * is called. Note that the entire read may not be satisfied, it may be
     * a partial read response.
     */
    const uint64_t issued_offset = bc->offset + bc->pvt;
    const uint64_t issued_length = bc->length - bc->pvt;

    if (status != 0) {
        /*
         * Readahead read failed? Nothing to do, unlock the membuf, release
         * the byte range and pretend as if we never issued this read.
         * We may have successfully read some part of it, as some prior read
         * calls may have completed partially, but we cannot mark the membuf
         * uptodate unless we read it fully, so we have to just drop it.
         * Note that those prio successful reads would have caused the RPC
         * stats to be updated, but that's fine.
         */
        bc->get_membuf()->clear_locked();
        bc->get_membuf()->clear_inuse();

        // Release the buffer since we did not fill it.
        read_cache->release(bc->offset, bc->length);

        AZLogWarn("[{}] <{}> readahead_callback [FAILED] for offset: {} size: {} "
                  "total bytes read till now: {} of {} for [{}, {}) "
                  "num_backend_calls_issued: {}, rpc_status: {}, nfs_status: {}, "
                  "error: {}",
                  ino, task->issuing_tid,
                  issued_offset,
                  issued_length,
                  bc->pvt,
                  bc->length,
                  bc->offset,
                  bc->offset + bc->length,
                  bc->num_backend_calls_issued,
                  rpc_status,
                  (int) NFS_STATUS(res),
                  errstr);

        goto delete_ctx;
    } else {
        UPDATE_INODE_ATTR(inode, res->READ3res_u.resok.file_attributes);
        INC_GBL_STATS(server_bytes_read, res->READ3res_u.resok.count);
        INC_GBL_STATS(server_read_reqs, 1);

        /*
         * Only first read call would have bc->pvt == 0, for subsequent calls
         * we will have num_backend_calls_issued > 1.
         */
        assert((bc->pvt == 0) || (bc->num_backend_calls_issued > 1));

        // We should never get more data than what we requested.
        assert(res->READ3res_u.resok.count <= issued_length);

        const bool is_partial_read = !res->READ3res_u.resok.eof &&
            (res->READ3res_u.resok.count < issued_length);

        // Update bc->pvt with fresh bytes read in this call.
        bc->pvt += res->READ3res_u.resok.count;
        assert(bc->pvt <= bc->length);

        INC_GBL_STATS(bytes_read_ahead, res->READ3res_u.resok.count);

        AZLogDebug("[{}] <{}> readahead_callback: {}Read completed for offset: {} "
                   " size: {} Bytes read: {} eof: {}, total bytes read till "
                   "now: {} of {} for [{}, {}) num_backend_calls_issued: {}",
                   ino, task->issuing_tid,
                   is_partial_read ? "Partial " : "",
                   issued_offset,
                   issued_length,
                   res->READ3res_u.resok.count,
                   res->READ3res_u.resok.eof,
                   bc->pvt,
                   bc->length,
                   bc->offset,
                   bc->offset + bc->length,
                   bc->num_backend_calls_issued);

        /*
         * In case of partial read, issue read for the remaining.
         */
        if (is_partial_read) {
            assert(bc->pvt < bc->length);

            const off_t new_offset = bc->offset + bc->pvt;
            const size_t new_size = bc->length - bc->pvt;
            bool rpc_retry = false;

            READ3args new_args;
            new_args.file = inode->get_fh();
            new_args.offset = new_offset;
            new_args.count = new_size;

            // Create a new child task to carry out this request.
            struct rpc_task *partial_read_tsk =
                task->get_client()->get_rpc_task_helper()->alloc_rpc_task(FUSE_READ);

            partial_read_tsk->init_read_be(
                task->rpc_api->read_task.get_ino(),
                new_size,
                new_offset);

            ctx->task = partial_read_tsk;

            bc->num_backend_calls_issued++;
            assert(bc->num_backend_calls_issued > 1);

            AZLogDebug("[{}] Issuing partial read at offset: {} size: {}"
                       " for [{}, {})",
                       ino,
                       new_offset,
                       new_size,
                       bc->offset,
                       bc->offset + bc->length);

            do {
                rpc_retry = false;
                /*
                 * We have identified partial read case where the
                 * server has returned fewer bytes than requested.
                 * Hence we will issue read for the remaining.
                 *
                 * Note: It is okay to issue a read call directly here
                 *       as we are holding all the needed locks and refs.
                 */
                partial_read_tsk->get_stats().on_rpc_issue();
                if (rpc_nfs3_read_task(
                        partial_read_tsk->get_rpc_ctx(),
                        readahead_callback,
                        bc->get_buffer() + bc->pvt,
                        new_size,
                        &new_args,
                        (void *) ctx) == NULL) {
                    partial_read_tsk->get_stats().on_rpc_cancel();
                    /*
                     * This call fails due to internal issues like OOM
                     * etc and not due to an actual error, hence retry.
                     */
                    AZLogWarn("rpc_nfs3_read_task failed to issue, retrying "
                              "after 5 secs!");
                    ::sleep(5);

                    rpc_retry = true;
                }
            } while (rpc_retry);

            // Free the current RPC task as it has done its bit.
            task->free_rpc_task();

            /*
             * Return from the callback here. The rest of the callback
             * will be processed once this partial read completes.
             */
            return;
        }
    }

    /*
     * We come here only after the complete readahead read has successfully
     * completed.
     * We should never return lesser bytes than requested, unless eof is
     * encountered.
     */
    assert(status == 0);
    assert((bc->length == bc->pvt) || res->READ3res_u.resok.eof);

    if (bc->maps_full_membuf() && (bc->length == bc->pvt)) {
        /*
         * Only if this bytes_chunk maps the entire membuf and the read has
         * completed, we can mark the membuf uptodate.
         */
        AZLogDebug("[{}] Setting uptodate flag for membuf [{}, {})",
                   ino, bc->offset, bc->offset + bc->length);

        bc->get_membuf()->set_uptodate();
    } else {
        bool set_uptodate = false;
        /*
         * If we got eof in a partial read, release the non-existent
         * portion of the chunk.
         */
        if (bc->maps_full_membuf() && (bc->length > bc->pvt) &&
            res->READ3res_u.resok.eof) {
            assert(res->READ3res_u.resok.count < issued_length);

            /*
             * We need to clear the inuse count held by this thread, else
             * release() will not be able to release. We drop and then
             * promptly grab the inuse count after the release(), so that
             * set_uptodate() can be called.
             */
            bc->get_membuf()->clear_inuse();
            const uint64_t released_bytes =
                read_cache->release(bc->offset + bc->pvt,
                                    bc->length - bc->pvt);
            bc->get_membuf()->set_inuse();

            /*
             * If we are able to successfully release all the extra bytes
             * from the bytes_chunk, that means there's no other thread
             * actively performing IOs to the underlying membuf, so we can
             * mark it uptodate.
             */
            assert(released_bytes <= (bc->length - bc->pvt));
            if (released_bytes == (bc->length - bc->pvt)) {
                AZLogWarn("[{}] Setting uptodate flag for membuf [{}, {}), "
                          "after readahead hit eof, requested [{}, {}), "
                          "got [{}, {})",
                          ino,
                          bc->offset, bc->offset + bc->length,
                          issued_offset,
                          issued_offset + issued_length,
                          issued_offset,
                          issued_offset + res->READ3res_u.resok.count);
                bc->get_membuf()->set_uptodate();
                set_uptodate = true;
            }
        }

        if (!set_uptodate) {
            AZLogDebug("[{}] Not setting uptodate flag for membuf "
                       "[{}, {}), maps_full_membuf={}, is_new={}, "
                       "bc->length={}, bc->pvt={}",
                       ino, bc->offset, bc->offset + bc->length,
                       bc->maps_full_membuf(), bc->is_new, bc->length,
                       bc->pvt);
        }
    }

    /*
     * Release the lock that we held on the membuf since the data is now
     * written and ready to be read.
     */
    bc->get_membuf()->clear_locked();
    bc->get_membuf()->clear_inuse();

delete_ctx:
    /*
     * Success or failure, report readahead completion.
     * This MUST be called after dropping the membuf lock and inuse count.
     */
    inode->get_rastate()->on_readahead_complete(bc->offset, bc->length);

    // Free the readahead RPC task.
    task->free_rpc_task();

    // Free the context.
    delete ctx;

    // Decrement the extra ref taken on inode at the time read was issued.
    inode->decref();
}