void rpc_task::send_readdir_or_readdirplus_response()

in turbonfs/src/rpc_task.cpp [5617:5911]


void rpc_task::send_readdir_or_readdirplus_response(
    const std::vector<std::shared_ptr<const directory_entry>>& readdirentries)
{
    const bool readdirplus = (get_op_type() == FUSE_READDIRPLUS);
    /*
     * Max size the fuse buf is allowed to take.
     * We will allocate this much and then fill as many directory entries can
     * fit in this. Since the caller would have also considered this same size
     * while filling entries in readdirentries, we will usually be able to
     * consume all the entries in readdirentries.
     */
    const size_t size = rpc_api->readdir_task.get_size();
    const fuse_ino_t parent_ino = rpc_api->readdir_task.get_ino();
    [[maybe_unused]] const struct nfs_inode *dir_inode =
        get_client()->get_nfs_inode_from_ino(parent_ino);

    // Fuse always requests 4096 bytes.
    assert(size >= 4096);

    // Allocate fuse response buffer.
    char *buf1 = (char *) ::malloc(size);
    if (!buf1) {
        reply_error(ENOMEM);
        return;
    }

    char *current_buf = buf1;
    size_t rem = size;
    size_t num_entries_added = 0;

    AZLogDebug("[{}] send_readdir_or_readdirplus_response: Number of directory"
               " entries to send {}, size: {}",
               parent_ino, readdirentries.size(), size);

    /*
     * If we return at least one entry, make sure cookie_verifier is valid as
     * the caller might send a READDIR{PLUS} request to the server for querying
     * subsequent entries.
     */
    assert(readdirentries.empty() ||
           *(uint64_t *)dir_inode->get_dircache()->get_cookieverf() != 0);

    for (auto& it : readdirentries) {
        /*
         * Caller should make sure that it adds only directory entries after
         * what was requested in the READDIR{PLUS} call to readdirentries.
         */
        assert((uint64_t) it->cookie >
                (uint64_t) rpc_api->readdir_task.get_offset());
        size_t entsize;

        if (readdirplus) {
            /*
             * For readdirplus, caller MUST have set the inode and bumped
             * lookupcnt ref and forget_expected.
             */
            assert(it->nfs_inode);
            assert(it->nfs_inode->lookupcnt > 0);
            assert(it->nfs_inode->forget_expected > 0);

            struct fuse_entry_param fuseentry;

#ifdef ENABLE_PARANOID
            /*
             * it->attributes are copied from nfs_inode->attr at the time when
             * the directory_entry was created, after that inode's ctime can
             * only go forward. If there are cached writes happening on the file
             * the attribute size in the inode can increase while ctime remains
             * unchanged. See on_cached_write() for how this can happen.
             *
             * TODO: Remove directory_entry->attributes if we don't need them.
             */
            {
                std::shared_lock<std::shared_mutex> lock(it->nfs_inode->ilock_1);
                assert(it->attributes.st_ino == it->nfs_inode->get_attr_nolock().st_ino);

                const bool attr_same = (::memcmp(&it->attributes,
                                                 &it->nfs_inode->get_attr_nolock(),
                                                 sizeof(struct stat)) == 0);
                if (!attr_same) {
                    const bool inode_attr_ctime_newer =
                        (compare_timespec(it->attributes.st_ctim,
                                          it->nfs_inode->get_attr_nolock().st_ctim) < 0);
                    const bool inode_attr_ctime_same =
                        (compare_timespec(it->attributes.st_ctim,
                                          it->nfs_inode->get_attr_nolock().st_ctim) == 0);
                    const bool inode_attr_size_bigger =
                        (it->nfs_inode->get_attr_nolock().st_size > it->attributes.st_size);

                    assert(inode_attr_ctime_newer ||
                           (inode_attr_ctime_same && inode_attr_size_bigger));
                }

            }
#endif

            // We don't need the memset as we are setting all members.
            fuseentry.attr = it->nfs_inode->get_attr();
            fuseentry.ino = it->nfs_inode->get_fuse_ino();
            fuseentry.generation = it->nfs_inode->get_generation();
            fuseentry.attr_timeout = it->nfs_inode->get_actimeo();
            fuseentry.entry_timeout = it->nfs_inode->get_actimeo();

            AZLogDebug("[{}] <{}> Returning ino {} to fuse (filename: {}, "
                       "lookupcnt: {}, dircachecnt: {}, forget_expected: {})",
                       parent_ino,
                       rpc_task::fuse_opcode_to_string(rpc_api->optype),
                       fuseentry.ino,
                       it->name,
                       it->nfs_inode->lookupcnt.load(),
                       it->nfs_inode->dircachecnt.load(),
                       it->nfs_inode->forget_expected.load());

            /*
             * Insert the entry into the buffer.
             * If the buffer space is less, fuse_add_direntry_plus will not
             * add entry to the buffer but will still return the space needed
             * to add this entry.
             */
            entsize = fuse_add_direntry_plus(get_fuse_req(),
                                             current_buf,
                                             rem, /* size left in the buffer */
                                             it->name,
                                             &fuseentry,
                                             it->cookie);
        } else {
            /*
             * Insert the entry into the buffer.
             * If the buffer space is less, fuse_add_direntry will not add
             * entry to the buffer but will still return the space needed to
             * add this entry.
             */
            entsize = fuse_add_direntry(get_fuse_req(),
                                        current_buf,
                                        rem, /* size left in the buffer */
                                        it->name,
                                        &it->attributes,
                                        it->cookie);
        }

        /*
         * Our buffer size was small and hence we can't add any more entries,
         * so just break the loop. This also means that we have not inserted
         * the current entry to the dirent buffer.
         *
         * Note: This should not happen since the caller would have filled
         *       just enough entries in readdirentries.
         */
        if (entsize > rem) {
            break;
        }

#ifdef ENABLE_PRESSURE_POINTS
        if (num_entries_added > 0) {
            if (inject_error()) {
                AZLogWarn("[{}] PP: sending less directory entries to fuse, "
                          "size: {}, rem: {}, num_entries_added: {}",
                          parent_ino, size, rem, num_entries_added);
                break;
            }
        }
#endif

        // Increment the buffer pointer to point to the next free space.
        current_buf += entsize;
        rem -= entsize;
        num_entries_added++;

        if (readdirplus) {
            assert(it->nfs_inode);
            assert(it->nfs_inode->lookupcnt > 0);
            assert(it->nfs_inode->forget_expected > 0);

            /*
             * Caller would have bumped lookupcnt ref and forget_expected fpr
             * *all* entries, fuse expects lookupcnt of every entry returned
             * by readdirplus(), except "." and "..", to be incremented, so
             * drop ref and forget_expected for "." and "..".
             *
             * Note: We clear it->nfs_inode so that if fuse_reply_buf() fails
             *       and we need to drop lookupcnt ref and forget_expected for
             *       all the entries, we don't drop them again for these inodes.
             */
            if (it->is_dot_or_dotdot()) {
                it->nfs_inode->forget_expected--;
                it->nfs_inode->decref();
                assert(it->nfs_inode->lookupcnt >=
                        (uint64_t) it->nfs_inode->forget_expected);
            }
        } else if (it->nfs_inode) {
            /*
             * For READDIR response, we need to drop lookupcnt ref and
             * forget_expected for all entries with a valid inode.
             *
             * Note: entry->nfs_inode may be null for entries populated using
             *       only readdir however, it is guaranteed to be present for
             *       readdirplus.
             */
            assert(it->nfs_inode->forget_expected > 0);
            it->nfs_inode->forget_expected--;
            it->nfs_inode->decref();
            assert(it->nfs_inode->lookupcnt >=
                    (uint64_t) it->nfs_inode->forget_expected);
        }
    }

    /*
     * startidx is the starting index into readdirentries vector from where
     * we start cleaning up. In case of error this will be reset to 0, else
     * it's set to num_entries_added.
     */
    size_t startidx = num_entries_added;
    bool inject_fuse_reply_buf_failure = false;

    /*
     * XXX Applications don't seem to handle EINVAL return from getdents()
     *     as expected, i.e., they don't retry the call with a bigger buffer.
     *     Instead they treat it as an error. Keep it disabled.
     */
#if 0
#ifdef ENABLE_PRESSURE_POINTS
    inject_fuse_reply_buf_failure = inject_error();
#endif
#endif

    if (!inject_fuse_reply_buf_failure) {
        AZLogDebug("[{}] Num of entries sent in readdir response is {}",
                   parent_ino, num_entries_added);

        const int fre = fuse_reply_buf(get_fuse_req(), buf1, size - rem);
        if (fre != 0) {
            INC_GBL_STATS(fuse_reply_failed, 1);
            AZLogError("fuse_reply_buf({}) failed: {}",
                       fmt::ptr(get_fuse_req()), fre);
            startidx = 0;
            assert(0);
        } else {
            readdirectory_cache::num_dirents_returned_g += num_entries_added;
            DEC_GBL_STATS(fuse_responses_awaited, 1);
        }
    } else {
        AZLogWarn("[{}] PP: injecting fuse_reply_buf() failure, "
                  "size: {}, rem: {}, num_entries_added: {}",
                  parent_ino, size, rem, num_entries_added);
        startidx = 0;
    }

    for (size_t i = startidx; i < readdirentries.size(); i++) {
        const std::shared_ptr<const directory_entry>& it = readdirentries[i];
        /*
         * If directory_entry doesn't have a valid inode, no cleanup to do.
         */
        if (!it->nfs_inode) {
            assert(!readdirplus);
            continue;
        }

        /*
         * Till num_entries_added we have dropped the lookupcnt ref and
         * forget_expected for:
         * - "." amd ".." for readdirplus.
         * - all for readdir.
         * Skip those now.
         * Beyond num_entries_added, we have to drop for all.
         */
        if (i < num_entries_added) {
            if (!readdirplus || it->is_dot_or_dotdot()) {
                continue;
            }
        }

        AZLogDebug("[{}] Dropping lookupcnt, now {}, "
                   "forget_expected: {}",
                   it->nfs_inode->get_fuse_ino(),
                   it->nfs_inode->lookupcnt.load(),
                   it->nfs_inode->forget_expected.load());
        assert(it->nfs_inode->forget_expected > 0);
        it->nfs_inode->forget_expected--;
        it->nfs_inode->decref();
        assert(it->nfs_inode->lookupcnt >=
                (uint64_t) it->nfs_inode->forget_expected);
    }

    free(buf1);

    if (!inject_fuse_reply_buf_failure) {
        free_rpc_task();
    } else {
        /*
         * EINVAL return from getdents() imply insufficient buffer, so caller
         * should retry.
         */
        reply_error(EINVAL);
    }
}