static void readdirplus_callback()

in turbonfs/src/rpc_task.cpp [4914:5399]


static void readdirplus_callback(
    struct rpc_context *rpc,
    int rpc_status,
    void *data,
    void *private_data)
{
    rpc_task *const task = (rpc_task*) private_data;
    assert(task->magic == RPC_TASK_MAGIC);

    assert(task->get_op_type() == FUSE_READDIRPLUS);
    READDIRPLUS3res *const res = (READDIRPLUS3res*) data;

    INJECT_JUKEBOX(res, task);
    INJECT_BAD_COOKIE(res, task);

    const fuse_ino_t dir_ino = task->rpc_api->readdir_task.get_ino();
    struct nfs_inode *const dir_inode =
        task->get_client()->get_nfs_inode_from_ino(dir_ino);
    // Get handle to the readdirectory cache.
    std::shared_ptr<readdirectory_cache>& dircache_handle =
        dir_inode->get_dircache();

    // How many max bytes worth of entries data does the caller want?
    ssize_t rem_size = task->rpc_api->readdir_task.get_size();
    std::vector<std::shared_ptr<const directory_entry>> readdirentries;
    const int status = task->status(rpc_status, NFS_STATUS(res));
    bool eof = false;

    /*
     * readdir can be called on a directory after open()ing it, so we must have
     * created dircache.
     */
    assert(dir_inode->has_dircache());

    // For readdir we don't use parent_task to track the fuse request.
    assert(task->rpc_api->parent_task == nullptr);

    /*
     * Is this READDIRPLUS request querying a cookie after a gap, i.e., one or
     * more cookies not queried. In case of gap we cannot tell if directory
     * shrank.
     */
    const bool cookie_gap =
        ((uint64_t) task->rpc_api->readdir_task.get_offset() >
         dircache_handle->get_seq_last_cookie());

    /*
     * Last valid offset seen for this directory enumeration. We keep on
     * updating this as we read entries from the returned list, so at any
     * point it contains the last cookie seen from the server and in case of
     * re-enumeration the next READDIR RPC should ask entries starting from
     * last_valid_offset+1.
     * Only used when re-enumerating.
     */
    off_t last_valid_offset = task->rpc_api->readdir_task.get_offset();

    /*
     * This tracks if we have got a "new entry" that we would like to send to
     * fuse. For the regular case (no re-enumeration) this is not very
     * interesting as all entries received are entries, but for re-enumeration
     * case this will be set only when we get an entry with cookie greater than
     * the target_offset (set when we received the NFS3ERR_BAD_COOKIE error).
     * Note that we will need to send response to fuse when either got_new_entry
     * is set or we got eof.
     */
    bool got_new_entry = false;

    const bool is_reenumerating =
        (task->rpc_api->readdir_task.get_target_offset() != 0);

    /*
     * Now that the request has completed, we can query libnfs for the
     * dispatch time.
     */
    task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUSX(rpc_status, res));

    /*
     * "get_seq_last_cookie() == 0" is a common case when we purge the dircache
     * if it grows beyond the configured limit. In that case all subsequent
     * calls will see a cookie_gap and flood the logs.
     */
    if (cookie_gap && (dircache_handle->get_seq_last_cookie() != 0)) {
        AZLogWarn("[{}] readdirplus_callback: GAP in cookie requested ({} -> {})",
                  dir_ino, dircache_handle->get_seq_last_cookie(),
                  task->rpc_api->readdir_task.get_offset());
    }

    if (status == 0) {
        /*
         * If we send a cookieverf other than 0, then if server returns a
         * success response it means that cookie+cookieverf passed by us was
         * valid. In that case it MUST return the same cookieverf in the
         * response, else it should fail with NFS3ERR_BAD_COOKIE error.
         */
        if (task->rpc_api->readdir_task.get_cookieverf() != 0) {
            if (cv2i(res->READDIRPLUS3res_u.resok.cookieverf) !=
                task->rpc_api->readdir_task.get_cookieverf()) {
                AZLogWarn("[{}][SERVER BUG] readdirplus_callback: Successful "
                          "READDIRPLUS response carried different cookieverf "
                          "than request (0x{:x} -> 0x{:x})",
                          dir_ino,
                          task->rpc_api->readdir_task.get_cookieverf(),
                          cv2i(res->READDIRPLUS3res_u.resok.cookieverf));
                assert(0);
            }
        }

        /*
         * Update attributes of parent directory returned in postop
         * attributes. If directory mtime has changed since the last time it'll
         * invalidate the cache.
         */
        UPDATE_INODE_ATTR(dir_inode, res->READDIRPLUS3res_u.resok.dir_attributes);

        const struct entryplus3 *entry =
            res->READDIRPLUS3res_u.resok.reply.entries;
        eof = res->READDIRPLUS3res_u.resok.reply.eof;
        int64_t eof_cookie = -1;
        int num_dirents = 0;

        // Process all dirents received.
        while (entry) {
#ifdef ENABLE_PRESSURE_POINTS
            /*
             * Short readdir pressure point, skip when eof received, return
             * at least one entry.
             */
            if (inject_error() && !eof && num_dirents) {
                AZLogWarn("[{}] PP: short readdirplus after {} entries",
                          dir_ino, num_dirents);
                break;
            }
#endif
            /*
             * Blob NFS server should not send a cookie less than what we asked
             * for.
             */
            assert(entry->cookie > (uint64_t) last_valid_offset);
            last_valid_offset = entry->cookie;

            const struct fattr3 *fattr = nullptr;
            /*
             * Keep updating eof_cookie, when we exit the loop we will have
             * eof_cookie set correctly.
             */
            if (eof) {
                eof_cookie = entry->cookie;
            }

            if (entry->name_attributes.attributes_follow) {
                fattr = &(entry->name_attributes.post_op_attr_u.attributes);

                // Blob NFS will never send these two different.
                assert(fattr->fileid == entry->fileid);
            }

            /*
             * Get the nfs inode for the entry.
             * Note that we first check if this inode exists (i.e., we have
             * conveyed it to fuse in the past and fuse has not FORGOTten it)
             * and if so use that, else create a new nfs_inode.
             * This will grab a lookupcnt ref on this inode. We will transfer
             * this same ref to fuse if we are able to successfully convey
             * this directory_entry to fuse.
             * We also increment forget_expected as fuse will call forget()
             * for these inodes.
             *
             * Note:  Caller must call decref() and decrement forget_expected
             *        for inodes corresponding to directory_entrys that are not
             *        returned to fuse.
             */
            struct nfs_inode *const nfs_inode =
                task->get_client()->get_nfs_inode(
                    &entry->name_handle.post_op_fh3_u.handle, fattr);
            nfs_inode->forget_expected++;

            if (!fattr) {
                /*
                 * If readdirplus entry doesn't carry attributes, then we
                 * just save the inode number and filetype as DT_UNKNOWN.
                 *
                 * Blob NFS though must always send attributes in a readdirplus
                 * response.
                 */
                assert(0);
                nfs_inode->get_attr_nolock().st_ino = entry->fileid;
                nfs_inode->get_attr_nolock().st_mode = 0;
            }

            /*
             * See if we have the directory_entry corresponding to this
             * cookie already present in the readdirectory_cache.
             * If so, we need to first remove the existing entry and add
             * this new entry. The existing entry may be created by readdir
             * in which case it won't have attributes stored or it could be
             * a readdirplus created entry in which case it will have inode
             * and attributes stored. If this is the last dircachecnt ref
             * on this inode remove() will also try to delete the inode.
             *
             * Then create the new directory entry which will be added to
             * readdirectory_cache, and also conveyed to fuse as part of
             * readdirplus response. The directory_entry added to the
             * readdirectory_cache will be freed when the directory cache
             * is purged (when fuse FORGETs the directory or under memory
             * pressure).
             *
             * TODO: Try to steal entry->name to avoid the strdup().
             */
            std::shared_ptr<struct directory_entry> dir_entry =
                dircache_handle->lookup(entry->cookie);

            if (dir_entry) {
                assert(dir_entry->cookie == entry->cookie);
                if (dir_entry->nfs_inode) {
                    /*
                     * Drop the extra ref held by lookup().
                     * Original ref held by readdirectory_cache::add()
                     * must also be present, remove() will drop that.
                     */
                    assert(dir_entry->nfs_inode->dircachecnt >= 2);
                    dir_entry->nfs_inode->dircachecnt--;
                }

                /*
                 * Reset the dir_entry shared_ptr so that the subsequent
                 * remove() call can release the original shared_ptr ref
                 * on the directory_entry, and also delete the inode if the
                 * lookupcnt ref is also 0.
                 */
                dir_entry.reset();
                dircache_handle->remove(entry->cookie);
            }

            /*
             * This dir_entry shared_ptr will hold one dircachecnt ref on
             * the inode. This will be transferred to the directory_entry
             * installed by the following add() call.
             */
            dir_entry = std::make_shared<struct directory_entry>(
                                                   strdup(entry->name),
                                                   entry->cookie,
                                                   nfs_inode->get_attr(),
                                                   nfs_inode);

            /*
             * dir_entry must have one ref on the inode.
             * This ref will protect the inode while this directory_entry is
             * present in the readdirectory_cache (added below).
             */
            assert(nfs_inode->dircachecnt >= 1);

            // Add to readdirectory_cache for future use.
            [[maybe_unused]]
            const bool added = dircache_handle->add(
                                    dir_entry,
                                    &res->READDIRPLUS3res_u.resok.cookieverf);

#ifdef ENABLE_PARANOID
            if (added) {
                /*
                 * Now we should be able to perform dnlc lookup for
                 * dir_entry->name and it must yield nfs_inode. Try both from
                 * the dircache_handle and the inode.
                 *
                 * Note: This assert can fail under very rare circumstances.
                 *       See note in readdir_callback().
                 */
                struct nfs_inode *tmpi =
                    dircache_handle->dnlc_lookup(dir_entry->name);
                assert(tmpi == nfs_inode);
                tmpi->decref();

                tmpi = dir_inode->dnlc_lookup(dir_entry->name);
                assert(tmpi == nfs_inode);
                tmpi->decref();
            }
#endif

            /*
             * If this is a re-enumeration callback, the target_offset would
             * be set to one more than the last cookie received before we got
             * the badcookie error, otherwise target_offset will be 0.
             * If we see something new here, this can mean one of the two:
             * - This is a regular (non re-enumeration) call.
             * - This is a re-enumeration call and we have seen a cookie >=
             *   target_offset, the last cookie seen before the badcookie error.
             * In either case, we need to return this new entry (and subsequent
             * ones) to fuse.
             */
            got_new_entry = (((off_t) entry->cookie >=
                        task->rpc_api->readdir_task.get_target_offset()));

            // Only for re-enumeration case we can have got_new_entry as false.
            assert(got_new_entry || is_reenumerating);

            /*
             * If we found an entry that has not been sent before, we need to
             * add it to the directory_entry vector but ONLY upto the byte
             * limit requested by fuse readdirplus call.
             */
            if (got_new_entry && rem_size >= 0) {
                rem_size -= dir_entry->get_fuse_buf_size(true /* readdirplus */);
                if (rem_size >= 0) {
                    /*
                     * Any directory_entry added must have the inode's lookupcnt
                     * ref and forget_expected bumped.
                     */
                    assert(dir_entry->nfs_inode);
                    assert(dir_entry->nfs_inode->forget_expected > 0);
                    assert(dir_entry->nfs_inode->lookupcnt > 0);

                    readdirentries.push_back(dir_entry);
                } else {
                    /*
                     * We are unable to add this entry to the fuse response
                     * buffer, so we won't notify fuse of this entry.
                     * Drop the ref held by get_nfs_inode().
                     */
                    AZLogDebug("[{}] {}/{}: Dropping ref since couldn't fit in "
                               "fuse response buffer",
                               nfs_inode->get_fuse_ino(),
                               dir_ino, dir_entry->name);
                    assert(nfs_inode->forget_expected > 0);
                    nfs_inode->forget_expected--;
                    dir_entry.reset();
                    nfs_inode->decref();
                    assert(nfs_inode->lookupcnt >=
                            (uint64_t) nfs_inode->forget_expected);
                }
            } else {
                AZLogDebug("[{}] {}/{}: Dropping ref since couldn't fit in "
                           "fuse response buffer or re-enumerating after "
                           "NFS3ERR_BAD_COOKIE and did not hit the target, "
                           "cookie: {}, target_offset: {}, rem_size: {}",
                           nfs_inode->get_fuse_ino(),
                           dir_ino, dir_entry->name,
                           dir_entry->cookie,
                           task->rpc_api->readdir_task.get_target_offset(),
                           rem_size);
                assert(nfs_inode->forget_expected > 0);
                nfs_inode->forget_expected--;
                dir_entry.reset();
                nfs_inode->decref();
                assert(nfs_inode->lookupcnt >=
                        (uint64_t) nfs_inode->forget_expected);
            }

            entry = entry->nextentry;
            ++num_dirents;
        }

        AZLogDebug("[{}] readdirplus_callback {}: Num of entries returned by server "
                   "is {}, returned to fuse: {}, eof: {}, eof_cookie: {}",
                   dir_ino, is_reenumerating ? "(R)" : "",
                   num_dirents, readdirentries.size(), eof, eof_cookie);

        assert(readdirentries.size() <= (size_t) num_dirents);

        if (eof) {
            assert((eof_cookie != -1) || (readdirentries.size() == 0));
            /*
             * If we pass the last cookie or beyond it, then server won't
             * return any directory entries, but it'll set eof to true.
             * In such case, we must already have set eof and eof_cookie,
             * unless the cookie queried by this READDIRPLUS3res_u request was
             * not immediately following the last cookie received from the
             * server in prev READDIR/READDIRPLUS response.
             */
            if (eof_cookie != -1) {
                assert(num_dirents > 0);
                dircache_handle->set_eof(eof_cookie);
            } else if (!cookie_gap) {
                if (dircache_handle->get_eof() != true) {
                    /*
                     * Server returned 0 entries and set eof to true, but the
                     * previous READDIR call that we made, for that server
                     * didn't return eof, this means the directory shrank in the
                     * server. Note that we can claim "directory shrank" only
                     * if this READDIRPLUS call queried next cookie after the
                     * last one received (and it returned no entries with
                     * eof=true, while the last one didn't return eof).
                     * If there's a gap between the last cookie received from
                     * the server and this one queried then we cannot say that.
                     * To be safe, invalidate the cache.
                     */
                    AZLogWarn("[{}] readdirplus_callback {}: Directory shrank in "
                            "the server! cookie asked: {} target_offset: {}. "
                            "Purging cache!",
                            dir_ino, is_reenumerating ? "(R)" : "",
                            task->rpc_api->readdir_task.get_offset(),
                            task->rpc_api->readdir_task.get_target_offset());
                    dir_inode->invalidate_cache();
                } else {
                    assert((int64_t) dircache_handle->get_eof_cookie() != -1);
                }
            }
        }

        // Only send to fuse if we have seen new entries.
        if (got_new_entry || eof) {
            task->send_readdir_or_readdirplus_response(readdirentries);
            return;
        }
    } else if (NFS_STATUS(res) == NFS3ERR_JUKEBOX) {
        task->get_client()->jukebox_retry(task);
        return;
    } else if (NFS_STATUS(res) == NFS3ERR_BAD_COOKIE) {
        AZLogWarn("[{}] readdirplus_callback {}: got NFS3ERR_BAD_COOKIE for "
                  "offset: {}, clearing dircache and starting re-enumeration",
                  dir_ino,
                  is_reenumerating ? "(R)" : "",
                  task->rpc_api->readdir_task.get_offset());

        dir_inode->invalidate_cache();

        /*
         * We have received a bad cookie error, we have to restart enumeration
         * until either the server returns a valid response or we reach eof.
         * If we keep getting bad cookie we will keep on reenumerating forever.
         */
        last_valid_offset = 0;

        /*
         * We need to maintain the monotonocity of the target_offset
         * because it represents the offsets already sent to fuse as part of
         * this enumeration. This protects us from sending duplicate entries
         * to fuse if we receive bad_cookie before we reach the target during
         * reenumeration.
         * If this is the first bad_cookie error for this enumeration, then
         * target_offset must be set to "get_offset() + 1", else if it's a
         * re-enumeration and we again got a badcookie then the target_offset
         * must not be set less than the original target_offset.
         */
        task->rpc_api->readdir_task.set_target_offset(
                std::max(task->rpc_api->readdir_task.get_offset() + 1,
                         task->rpc_api->readdir_task.get_target_offset()));
    } else {
        task->reply_error(status);
        return;
    }

    /*
     * We have not seen a new entry and the call has not failed, hence this is a
     * reenumeration call and we have not reached the target_offset yet. We have to
     * start another readdirplus call for the next batch.
     * The assert has last_valid_offset==0 clause for cases where the callback
     * was called for a regular readdir (not re-enumerating) but it failed with
     * badcookie and hence we are here enumerating.
     */
    assert(!got_new_entry);
    assert(is_reenumerating || last_valid_offset == 0);
    assert(last_valid_offset <
            task->rpc_api->readdir_task.get_target_offset());
    assert(!eof);

    /*
     * Create a new child task to carry out this request.
     * Query cookies starting from last_valid_offset+1.
     * If re-enumeration, set the target_offset appropriately.
     */
    struct rpc_task *child_tsk =
        task->get_client()->get_rpc_task_helper()->alloc_rpc_task_reserved(FUSE_READDIRPLUS);
    child_tsk->init_readdirplus(
        task->rpc_api->req,
        task->rpc_api->readdir_task.get_ino(),
        task->rpc_api->readdir_task.get_size(),
        last_valid_offset,
        task->rpc_api->readdir_task.get_target_offset(),
        task->rpc_api->readdir_task.get_fuse_file());

    assert(child_tsk->rpc_api->parent_task == nullptr);

    AZLogDebug("[{}] readdirplus_callback{}: Re-enumerating from {} with "
               "target_offset {}",
               dir_ino, last_valid_offset,
               task->rpc_api->readdir_task.get_target_offset());

    /*
     * This will orchestrate a new readdir call and we will handle the response
     * in the callback. We already ensure we do not send duplicate entries to fuse.
     */
    child_tsk->fetch_readdirplus_entries_from_server();

    // Free the current task here, the child task will ensure a response is sent.
    task->free_rpc_task();
}