static void readdir_callback()

in turbonfs/src/rpc_task.cpp [4500:4904]


static void readdir_callback(
    struct rpc_context *rpc,
    int rpc_status,
    void *data,
    void *private_data)
{
    rpc_task *const task = (rpc_task*) private_data;
    assert(task->magic == RPC_TASK_MAGIC);

    assert(task->get_op_type() == FUSE_READDIR);
    READDIR3res *const res = (READDIR3res*) data;

    INJECT_JUKEBOX(res, task);
    INJECT_BAD_COOKIE(res, task);

    const fuse_ino_t dir_ino = task->rpc_api->readdir_task.get_ino();
    struct nfs_inode *const dir_inode =
        task->get_client()->get_nfs_inode_from_ino(dir_ino);
    // Get handle to the readdirectory cache.
    std::shared_ptr<readdirectory_cache>& dircache_handle =
        dir_inode->get_dircache();

    // How many max bytes worth of entries data does the caller want?
    ssize_t rem_size = task->rpc_api->readdir_task.get_size();
    std::vector<std::shared_ptr<const directory_entry>> readdirentries;
    const int status = task->status(rpc_status, NFS_STATUS(res));
    bool eof = false;

    /*
     * readdir can be called on a directory after open()ing it, so we must have
     * created dircache.
     */
    assert(dir_inode->has_dircache());

    // For readdir we don't use parent_task to track the fuse request.
    assert(task->rpc_api->parent_task == nullptr);

    /*
     * Is this READDIR request querying a cookie after a gap, i.e., one or
     * more cookies not queried. In case of gap we cannot tell if directory
     * shrank.
     */
    const bool cookie_gap =
        ((uint64_t) task->rpc_api->readdir_task.get_offset() >
         dircache_handle->get_seq_last_cookie());

    /*
     * Last valid offset seen for this directory enumeration. We keep on
     * updating this as we read entries from the returned list, so at any
     * point it contains the last cookie seen from the server and in case of
     * re-enumeration the next READDIR RPC should ask entries starting from
     * last_valid_offset+1.
     * Only used when re-enumerating.
     */
    off_t last_valid_offset = task->rpc_api->readdir_task.get_offset();

    /*
     * This tracks if we have got a "new entry" that we would like to send to
     * fuse. For the regular case (no re-enumeration) this is not very
     * interesting as all entries received are entries, but for re-enumeration
     * case this will be set only when we get an entry with cookie greater than
     * the target_offset (set when we received the NFS3ERR_BAD_COOKIE error).
     * Note that we will need to send response to fuse when either got_new_entry
     * is set or we got eof.
     */
    bool got_new_entry = false;

    const bool is_reenumerating =
        (task->rpc_api->readdir_task.get_target_offset() != 0);

    /*
     * Now that the request has completed, we can query libnfs for the
     * dispatch time.
     */
    task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUSX(rpc_status, res));

    /*
     * "get_seq_last_cookie() == 0" is a common case when we purge the dircache
     * if it grows beyond the configured limit. In that case all subsequent
     * calls will see a cookie_gap and flood the logs.
     */
    if (cookie_gap && (dircache_handle->get_seq_last_cookie() != 0)) {
        AZLogWarn("[{}] readdir_callback: GAP in cookie requested ({} -> {})",
                  dir_ino, dircache_handle->get_seq_last_cookie(),
                  task->rpc_api->readdir_task.get_offset());
    }

    if (status == 0) {
        /*
         * If we send a cookieverf other than 0, then if server returns a
         * success response it means that cookie+cookieverf passed by us was
         * valid. In that case it MUST return the same cookieverf in the
         * response, else it should fail with NFS3ERR_BAD_COOKIE error.
         */
        if (task->rpc_api->readdir_task.get_cookieverf() != 0) {
            if (cv2i(res->READDIR3res_u.resok.cookieverf) !=
                task->rpc_api->readdir_task.get_cookieverf()) {
                AZLogWarn("[{}][SERVER BUG] readdir_callback: Successful "
                          "READDIR response carried different cookieverf than "
                          "request (0x{:x} -> 0x{:x})",
                          dir_ino,
                          task->rpc_api->readdir_task.get_cookieverf(),
                          cv2i(res->READDIR3res_u.resok.cookieverf));
                assert(0);
            }
        }

        /*
         * Update attributes of parent directory returned in postop
         * attributes. If directory mtime has changed since the last time it'll
         * invalidate the cache.
         */
        UPDATE_INODE_ATTR(dir_inode, res->READDIR3res_u.resok.dir_attributes);

        const struct entry3 *entry = res->READDIR3res_u.resok.reply.entries;
        eof = res->READDIR3res_u.resok.reply.eof;
        int64_t eof_cookie = -1;
        int num_dirents = 0;

        // Process all dirents received.
        while (entry) {
#ifdef ENABLE_PRESSURE_POINTS
            /*
             * Short readdir pressure point, skip when eof received, return
             * at least one entry.
             */
            if (inject_error() && !eof && num_dirents) {
                AZLogWarn("[{}] PP: short readdir after {} entries",
                          dir_ino, num_dirents);
                break;
            }
#endif

            /*
             * Blob NFS server should not send a cookie less than what we asked
             * for.
             */
            assert(entry->cookie > (uint64_t) last_valid_offset);
            last_valid_offset = entry->cookie;

            /*
             * Keep updating eof_cookie, when we exit the loop we will have
             * eof_cookie set correctly.
             */
            if (eof) {
                eof_cookie = entry->cookie;
            }

            /*
             * See if we have the directory_entry corresponding to this
             * cookie already present in the readdirectory_cache.
             * If so, we need to first remove the existing entry and add
             * this new entry.
             *
             * Then create the new directory entry which will be added to
             * readdirectory_cache, and also conveyed to fuse as part of
             * readdir response. The directory_entry added to the
             * readdirectory_cache will be freed when the directory cache
             * is purged (when fuse forgets the directory or under memory
             * pressure).
             *
             * TODO: Try to steal entry->name to avoid the strdup().
             */
            std::shared_ptr<struct directory_entry> dir_entry =
                dircache_handle->lookup(entry->cookie);

            if (dir_entry ) {
                assert(dir_entry->cookie == entry->cookie);
                if (dir_entry->nfs_inode) {
                    /*
                     * Drop the extra ref held by lookup().
                     * Original ref held by readdirectory_cache::add()
                     * must also be present, remove() will drop that.
                     */
                    assert(dir_entry->nfs_inode->dircachecnt >= 2);
                    dir_entry->nfs_inode->dircachecnt--;
                }

                /*
                 * Reset the dir_entry shared_ptr so that the subsequent
                 * remove() call can release the original shared_ptr ref
                 * on the directory_entry and free it.
                 */
                dir_entry.reset();
                dircache_handle->remove(entry->cookie);
            }

            dir_entry = std::make_shared<struct directory_entry>(
                                            strdup(entry->name),
                                            entry->cookie,
                                            entry->fileid);

            // Add to readdirectory_cache for future use.
            [[maybe_unused]]
            const bool added = dircache_handle->add(
                                    dir_entry,
                                    &res->READDIR3res_u.resok.cookieverf);

#ifdef ENABLE_PARANOID
            if (added) {
                /*
                 * Entries added by readdir do not contribute to dnlc cache.
                 *
                 * Note: Technically there could be some other thread processing
                 *       readdirplus response for the same directory and it may
                 *       race with this thread, remove the above entry added by
                 *       readdir and add an entry by readdirplus. This is so
                 *       rare that we assert anyway.
                 */
                assert(dircache_handle->dnlc_lookup(dir_entry->name) == nullptr);
                assert(dir_inode->dnlc_lookup(dir_entry->name) == nullptr);
            }
#endif

            /*
             * If this is a re-enumeration callback, the target_offset would
             * be set to one more than the last cookie received before we got
             * the badcookie error, otherwise target_offset will be 0.
             * If we see something new here, this can mean one of the two:
             * - This is a regular (non re-enumeration) call.
             * - This is a re-enumeration call and we have seen a cookie >=
             *   target_offset, the last cookie seen before the badcookie error.
             * In either case, we need to return this new entry (and subsequent
             * ones) to fuse.
             */
            got_new_entry = (((off_t) entry->cookie >=
                        task->rpc_api->readdir_task.get_target_offset()));

            // Only for re-enumeration case we can have got_new_entry as false.
            assert(got_new_entry || is_reenumerating);

            /*
             * If we found an entry that has not been sent before, we need to
             * add it to the directory_entry vector but ONLY upto the byte
             * limit requested by fuse readdir call.
             */
            if (got_new_entry && rem_size >= 0) {
                rem_size -= dir_entry->get_fuse_buf_size(false /* readdirplus */);
                if (rem_size >= 0) {
                    /*
                     * readdir_callback() MUST NOT return directory_entry with
                     * nfs_inode set.
                     */
                    assert(dir_entry->nfs_inode == nullptr);
                    readdirentries.push_back(dir_entry);
                }  else {
                    /*
                     * We are unable to add this entry to the fuse response
                     * buffer, so we won't notify fuse of this entry.
                     */
                    AZLogDebug("{}/{}: Couldn't fit in fuse response buffer",
                               dir_ino, dir_entry->name);
                }
            } else {
                AZLogDebug("{}/{}: Couldn't fit in fuse response buffer "
                           "or re-enumerating after NFS3ERR_BAD_COOKIE and did "
                           "not hit the target, cookie: {}, target_offset: {}, "
                           "rem_size: {}",
                           dir_ino, dir_entry->name,
                           dir_entry->cookie,
                           task->rpc_api->readdir_task.get_target_offset(),
                           rem_size);
            }

            entry = entry->nextentry;
            ++num_dirents;
        }

        AZLogDebug("[{}] readdir_callback {}: Num of entries returned by server is {}, "
                   "returned to fuse: {}, eof: {}, eof_cookie: {}",
                   dir_ino, is_reenumerating ? "(R)" : "",
                   num_dirents, readdirentries.size(), eof, eof_cookie);

        assert(readdirentries.size() <= (size_t) num_dirents);

        if (eof) {
            assert((eof_cookie != -1) || (readdirentries.size() == 0));
            /*
             * If we pass the last cookie or beyond it, then server won't
             * return any directory entries, but it'll set eof to true.
             * In such case, we must already have set eof and eof_cookie,
             * unless the cookie queried by this READDIR request was not
             * immediately following the last cookie received from the server
             * in prev READDIR/READDIRPLUS response.
             */
            if (eof_cookie != -1) {
                assert(num_dirents > 0);
                dircache_handle->set_eof(eof_cookie);
            } else if (!cookie_gap) {
                assert(num_dirents == 0);
                if (dircache_handle->get_eof() != true) {
                    /*
                     * Server returned 0 entries and set eof to true, but the
                     * previous READDIR call that we made, for that server
                     * didn't return eof, this means the directory shrank in the
                     * server. Note that we can claim "directory shrank" only
                     * if this READDIR call queried next cookie after the last
                     * one received (and it returned no entries with eof=true,
                     * while the last one didn't return eof). If there's a gap
                     * between the last cookie received from the server and this
                     * one queried then we cannot say that.
                     * To be safe, invalidate the cache.
                     */
                    AZLogWarn("[{}] readdir_callback {}: Directory shrank in "
                            "the server! cookie asked: {} target_offset: {}. "
                            "Purging cache!",
                            dir_ino, is_reenumerating ? "(R)" : "",
                            task->rpc_api->readdir_task.get_offset(),
                            task->rpc_api->readdir_task.get_target_offset());
                    dir_inode->invalidate_cache();
                } else {
                    assert((int64_t) dircache_handle->get_eof_cookie() != -1);
                }
            }
        }

        // Only send to fuse if we have seen new entries or EOF.
        if (got_new_entry || eof) {
            task->send_readdir_or_readdirplus_response(readdirentries);
            return;
        }
    } else if (NFS_STATUS(res) == NFS3ERR_JUKEBOX) {
        task->get_client()->jukebox_retry(task);
        return;
    } else if (NFS_STATUS(res) == NFS3ERR_BAD_COOKIE) {
        AZLogWarn("[{}] readdir_callback {}: got NFS3ERR_BAD_COOKIE for "
                  "offset: {}, clearing dircache and starting re-enumeration",
                  dir_ino,
                  is_reenumerating ? "(R)" : "",
                  task->rpc_api->readdir_task.get_offset());

        dir_inode->invalidate_cache();

        /*
         * We have received a bad cookie error, we have to restart enumeration
         * until either the server returns a valid response or we reach eof.
         * If we keep getting bad cookie we will keep on reenumerating forever.
         */
        last_valid_offset = 0;

        /*
         * We need to maintain the monotonocity of the target_offset
         * because it represents the offsets already sent to fuse as part of
         * this enumeration. This protects us from sending duplicate entries
         * to fuse if we receive bad_cookie before we reach the target during
         * reenumeration.
         * If this is the first bad_cookie error for this enumeration, then
         * target_offset must be set to "get_offset() + 1", else if it's a
         * re-enumeration and we again got a badcookie then the target_offset
         * must not be set less than the original target_offset.
         */
        task->rpc_api->readdir_task.set_target_offset(
                std::max(task->rpc_api->readdir_task.get_offset() + 1,
                         task->rpc_api->readdir_task.get_target_offset()));
    } else {
        task->reply_error(status);
        return;
    }

    /*
     * We have not seen a new entry and the call has not failed, hence this is a
     * reenumeration call and we have not reached the target_offset yet. We have to
     * start another readdir call for the next batch.
     * The assert has last_valid_offset==0 clause for cases where the callback
     * was called for a regular readdir (not re-enumerating) but it failed with
     * badcookie and hence we are here enumerating.
     */
    assert(!got_new_entry);
    assert(is_reenumerating || last_valid_offset == 0);
    assert(last_valid_offset <
            task->rpc_api->readdir_task.get_target_offset());
    assert(!eof);

    /*
     * Create a new child task to carry out this request.
     * Query cookies starting from last_valid_offset+1.
     * If re-enumeration, set the target_offset appropriately.
     */
    struct rpc_task *child_tsk =
        task->get_client()->get_rpc_task_helper()->alloc_rpc_task_reserved(FUSE_READDIR);

    child_tsk->init_readdir(
        task->rpc_api->req,
        task->rpc_api->readdir_task.get_ino(),
        task->rpc_api->readdir_task.get_size(),
        last_valid_offset,
        task->rpc_api->readdir_task.get_target_offset(),
        task->rpc_api->readdir_task.get_fuse_file());

    assert(child_tsk->rpc_api->parent_task == nullptr);

    AZLogDebug("[{}] readdir_callback{}: Re-enumerating from {} with "
               "target_offset {}",
               dir_ino, last_valid_offset,
               task->rpc_api->readdir_task.get_target_offset());

    /*
     * This will orchestrate a new readdir call and we will handle the response
     * in the callback. We already ensure we do not send duplicate entries to fuse.
     */
    child_tsk->fetch_readdir_entries_from_server();

    // Free the current task here, the child task will ensure a response is sent.
    task->free_rpc_task();
}