in turbonfs/src/rpc_task.cpp [4914:5399]
static void readdirplus_callback(
struct rpc_context *rpc,
int rpc_status,
void *data,
void *private_data)
{
rpc_task *const task = (rpc_task*) private_data;
assert(task->magic == RPC_TASK_MAGIC);
assert(task->get_op_type() == FUSE_READDIRPLUS);
READDIRPLUS3res *const res = (READDIRPLUS3res*) data;
INJECT_JUKEBOX(res, task);
INJECT_BAD_COOKIE(res, task);
const fuse_ino_t dir_ino = task->rpc_api->readdir_task.get_ino();
struct nfs_inode *const dir_inode =
task->get_client()->get_nfs_inode_from_ino(dir_ino);
// Get handle to the readdirectory cache.
std::shared_ptr<readdirectory_cache>& dircache_handle =
dir_inode->get_dircache();
// How many max bytes worth of entries data does the caller want?
ssize_t rem_size = task->rpc_api->readdir_task.get_size();
std::vector<std::shared_ptr<const directory_entry>> readdirentries;
const int status = task->status(rpc_status, NFS_STATUS(res));
bool eof = false;
/*
* readdir can be called on a directory after open()ing it, so we must have
* created dircache.
*/
assert(dir_inode->has_dircache());
// For readdir we don't use parent_task to track the fuse request.
assert(task->rpc_api->parent_task == nullptr);
/*
* Is this READDIRPLUS request querying a cookie after a gap, i.e., one or
* more cookies not queried. In case of gap we cannot tell if directory
* shrank.
*/
const bool cookie_gap =
((uint64_t) task->rpc_api->readdir_task.get_offset() >
dircache_handle->get_seq_last_cookie());
/*
* Last valid offset seen for this directory enumeration. We keep on
* updating this as we read entries from the returned list, so at any
* point it contains the last cookie seen from the server and in case of
* re-enumeration the next READDIR RPC should ask entries starting from
* last_valid_offset+1.
* Only used when re-enumerating.
*/
off_t last_valid_offset = task->rpc_api->readdir_task.get_offset();
/*
* This tracks if we have got a "new entry" that we would like to send to
* fuse. For the regular case (no re-enumeration) this is not very
* interesting as all entries received are entries, but for re-enumeration
* case this will be set only when we get an entry with cookie greater than
* the target_offset (set when we received the NFS3ERR_BAD_COOKIE error).
* Note that we will need to send response to fuse when either got_new_entry
* is set or we got eof.
*/
bool got_new_entry = false;
const bool is_reenumerating =
(task->rpc_api->readdir_task.get_target_offset() != 0);
/*
* Now that the request has completed, we can query libnfs for the
* dispatch time.
*/
task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUSX(rpc_status, res));
/*
* "get_seq_last_cookie() == 0" is a common case when we purge the dircache
* if it grows beyond the configured limit. In that case all subsequent
* calls will see a cookie_gap and flood the logs.
*/
if (cookie_gap && (dircache_handle->get_seq_last_cookie() != 0)) {
AZLogWarn("[{}] readdirplus_callback: GAP in cookie requested ({} -> {})",
dir_ino, dircache_handle->get_seq_last_cookie(),
task->rpc_api->readdir_task.get_offset());
}
if (status == 0) {
/*
* If we send a cookieverf other than 0, then if server returns a
* success response it means that cookie+cookieverf passed by us was
* valid. In that case it MUST return the same cookieverf in the
* response, else it should fail with NFS3ERR_BAD_COOKIE error.
*/
if (task->rpc_api->readdir_task.get_cookieverf() != 0) {
if (cv2i(res->READDIRPLUS3res_u.resok.cookieverf) !=
task->rpc_api->readdir_task.get_cookieverf()) {
AZLogWarn("[{}][SERVER BUG] readdirplus_callback: Successful "
"READDIRPLUS response carried different cookieverf "
"than request (0x{:x} -> 0x{:x})",
dir_ino,
task->rpc_api->readdir_task.get_cookieverf(),
cv2i(res->READDIRPLUS3res_u.resok.cookieverf));
assert(0);
}
}
/*
* Update attributes of parent directory returned in postop
* attributes. If directory mtime has changed since the last time it'll
* invalidate the cache.
*/
UPDATE_INODE_ATTR(dir_inode, res->READDIRPLUS3res_u.resok.dir_attributes);
const struct entryplus3 *entry =
res->READDIRPLUS3res_u.resok.reply.entries;
eof = res->READDIRPLUS3res_u.resok.reply.eof;
int64_t eof_cookie = -1;
int num_dirents = 0;
// Process all dirents received.
while (entry) {
#ifdef ENABLE_PRESSURE_POINTS
/*
* Short readdir pressure point, skip when eof received, return
* at least one entry.
*/
if (inject_error() && !eof && num_dirents) {
AZLogWarn("[{}] PP: short readdirplus after {} entries",
dir_ino, num_dirents);
break;
}
#endif
/*
* Blob NFS server should not send a cookie less than what we asked
* for.
*/
assert(entry->cookie > (uint64_t) last_valid_offset);
last_valid_offset = entry->cookie;
const struct fattr3 *fattr = nullptr;
/*
* Keep updating eof_cookie, when we exit the loop we will have
* eof_cookie set correctly.
*/
if (eof) {
eof_cookie = entry->cookie;
}
if (entry->name_attributes.attributes_follow) {
fattr = &(entry->name_attributes.post_op_attr_u.attributes);
// Blob NFS will never send these two different.
assert(fattr->fileid == entry->fileid);
}
/*
* Get the nfs inode for the entry.
* Note that we first check if this inode exists (i.e., we have
* conveyed it to fuse in the past and fuse has not FORGOTten it)
* and if so use that, else create a new nfs_inode.
* This will grab a lookupcnt ref on this inode. We will transfer
* this same ref to fuse if we are able to successfully convey
* this directory_entry to fuse.
* We also increment forget_expected as fuse will call forget()
* for these inodes.
*
* Note: Caller must call decref() and decrement forget_expected
* for inodes corresponding to directory_entrys that are not
* returned to fuse.
*/
struct nfs_inode *const nfs_inode =
task->get_client()->get_nfs_inode(
&entry->name_handle.post_op_fh3_u.handle, fattr);
nfs_inode->forget_expected++;
if (!fattr) {
/*
* If readdirplus entry doesn't carry attributes, then we
* just save the inode number and filetype as DT_UNKNOWN.
*
* Blob NFS though must always send attributes in a readdirplus
* response.
*/
assert(0);
nfs_inode->get_attr_nolock().st_ino = entry->fileid;
nfs_inode->get_attr_nolock().st_mode = 0;
}
/*
* See if we have the directory_entry corresponding to this
* cookie already present in the readdirectory_cache.
* If so, we need to first remove the existing entry and add
* this new entry. The existing entry may be created by readdir
* in which case it won't have attributes stored or it could be
* a readdirplus created entry in which case it will have inode
* and attributes stored. If this is the last dircachecnt ref
* on this inode remove() will also try to delete the inode.
*
* Then create the new directory entry which will be added to
* readdirectory_cache, and also conveyed to fuse as part of
* readdirplus response. The directory_entry added to the
* readdirectory_cache will be freed when the directory cache
* is purged (when fuse FORGETs the directory or under memory
* pressure).
*
* TODO: Try to steal entry->name to avoid the strdup().
*/
std::shared_ptr<struct directory_entry> dir_entry =
dircache_handle->lookup(entry->cookie);
if (dir_entry) {
assert(dir_entry->cookie == entry->cookie);
if (dir_entry->nfs_inode) {
/*
* Drop the extra ref held by lookup().
* Original ref held by readdirectory_cache::add()
* must also be present, remove() will drop that.
*/
assert(dir_entry->nfs_inode->dircachecnt >= 2);
dir_entry->nfs_inode->dircachecnt--;
}
/*
* Reset the dir_entry shared_ptr so that the subsequent
* remove() call can release the original shared_ptr ref
* on the directory_entry, and also delete the inode if the
* lookupcnt ref is also 0.
*/
dir_entry.reset();
dircache_handle->remove(entry->cookie);
}
/*
* This dir_entry shared_ptr will hold one dircachecnt ref on
* the inode. This will be transferred to the directory_entry
* installed by the following add() call.
*/
dir_entry = std::make_shared<struct directory_entry>(
strdup(entry->name),
entry->cookie,
nfs_inode->get_attr(),
nfs_inode);
/*
* dir_entry must have one ref on the inode.
* This ref will protect the inode while this directory_entry is
* present in the readdirectory_cache (added below).
*/
assert(nfs_inode->dircachecnt >= 1);
// Add to readdirectory_cache for future use.
[[maybe_unused]]
const bool added = dircache_handle->add(
dir_entry,
&res->READDIRPLUS3res_u.resok.cookieverf);
#ifdef ENABLE_PARANOID
if (added) {
/*
* Now we should be able to perform dnlc lookup for
* dir_entry->name and it must yield nfs_inode. Try both from
* the dircache_handle and the inode.
*
* Note: This assert can fail under very rare circumstances.
* See note in readdir_callback().
*/
struct nfs_inode *tmpi =
dircache_handle->dnlc_lookup(dir_entry->name);
assert(tmpi == nfs_inode);
tmpi->decref();
tmpi = dir_inode->dnlc_lookup(dir_entry->name);
assert(tmpi == nfs_inode);
tmpi->decref();
}
#endif
/*
* If this is a re-enumeration callback, the target_offset would
* be set to one more than the last cookie received before we got
* the badcookie error, otherwise target_offset will be 0.
* If we see something new here, this can mean one of the two:
* - This is a regular (non re-enumeration) call.
* - This is a re-enumeration call and we have seen a cookie >=
* target_offset, the last cookie seen before the badcookie error.
* In either case, we need to return this new entry (and subsequent
* ones) to fuse.
*/
got_new_entry = (((off_t) entry->cookie >=
task->rpc_api->readdir_task.get_target_offset()));
// Only for re-enumeration case we can have got_new_entry as false.
assert(got_new_entry || is_reenumerating);
/*
* If we found an entry that has not been sent before, we need to
* add it to the directory_entry vector but ONLY upto the byte
* limit requested by fuse readdirplus call.
*/
if (got_new_entry && rem_size >= 0) {
rem_size -= dir_entry->get_fuse_buf_size(true /* readdirplus */);
if (rem_size >= 0) {
/*
* Any directory_entry added must have the inode's lookupcnt
* ref and forget_expected bumped.
*/
assert(dir_entry->nfs_inode);
assert(dir_entry->nfs_inode->forget_expected > 0);
assert(dir_entry->nfs_inode->lookupcnt > 0);
readdirentries.push_back(dir_entry);
} else {
/*
* We are unable to add this entry to the fuse response
* buffer, so we won't notify fuse of this entry.
* Drop the ref held by get_nfs_inode().
*/
AZLogDebug("[{}] {}/{}: Dropping ref since couldn't fit in "
"fuse response buffer",
nfs_inode->get_fuse_ino(),
dir_ino, dir_entry->name);
assert(nfs_inode->forget_expected > 0);
nfs_inode->forget_expected--;
dir_entry.reset();
nfs_inode->decref();
assert(nfs_inode->lookupcnt >=
(uint64_t) nfs_inode->forget_expected);
}
} else {
AZLogDebug("[{}] {}/{}: Dropping ref since couldn't fit in "
"fuse response buffer or re-enumerating after "
"NFS3ERR_BAD_COOKIE and did not hit the target, "
"cookie: {}, target_offset: {}, rem_size: {}",
nfs_inode->get_fuse_ino(),
dir_ino, dir_entry->name,
dir_entry->cookie,
task->rpc_api->readdir_task.get_target_offset(),
rem_size);
assert(nfs_inode->forget_expected > 0);
nfs_inode->forget_expected--;
dir_entry.reset();
nfs_inode->decref();
assert(nfs_inode->lookupcnt >=
(uint64_t) nfs_inode->forget_expected);
}
entry = entry->nextentry;
++num_dirents;
}
AZLogDebug("[{}] readdirplus_callback {}: Num of entries returned by server "
"is {}, returned to fuse: {}, eof: {}, eof_cookie: {}",
dir_ino, is_reenumerating ? "(R)" : "",
num_dirents, readdirentries.size(), eof, eof_cookie);
assert(readdirentries.size() <= (size_t) num_dirents);
if (eof) {
assert((eof_cookie != -1) || (readdirentries.size() == 0));
/*
* If we pass the last cookie or beyond it, then server won't
* return any directory entries, but it'll set eof to true.
* In such case, we must already have set eof and eof_cookie,
* unless the cookie queried by this READDIRPLUS3res_u request was
* not immediately following the last cookie received from the
* server in prev READDIR/READDIRPLUS response.
*/
if (eof_cookie != -1) {
assert(num_dirents > 0);
dircache_handle->set_eof(eof_cookie);
} else if (!cookie_gap) {
if (dircache_handle->get_eof() != true) {
/*
* Server returned 0 entries and set eof to true, but the
* previous READDIR call that we made, for that server
* didn't return eof, this means the directory shrank in the
* server. Note that we can claim "directory shrank" only
* if this READDIRPLUS call queried next cookie after the
* last one received (and it returned no entries with
* eof=true, while the last one didn't return eof).
* If there's a gap between the last cookie received from
* the server and this one queried then we cannot say that.
* To be safe, invalidate the cache.
*/
AZLogWarn("[{}] readdirplus_callback {}: Directory shrank in "
"the server! cookie asked: {} target_offset: {}. "
"Purging cache!",
dir_ino, is_reenumerating ? "(R)" : "",
task->rpc_api->readdir_task.get_offset(),
task->rpc_api->readdir_task.get_target_offset());
dir_inode->invalidate_cache();
} else {
assert((int64_t) dircache_handle->get_eof_cookie() != -1);
}
}
}
// Only send to fuse if we have seen new entries.
if (got_new_entry || eof) {
task->send_readdir_or_readdirplus_response(readdirentries);
return;
}
} else if (NFS_STATUS(res) == NFS3ERR_JUKEBOX) {
task->get_client()->jukebox_retry(task);
return;
} else if (NFS_STATUS(res) == NFS3ERR_BAD_COOKIE) {
AZLogWarn("[{}] readdirplus_callback {}: got NFS3ERR_BAD_COOKIE for "
"offset: {}, clearing dircache and starting re-enumeration",
dir_ino,
is_reenumerating ? "(R)" : "",
task->rpc_api->readdir_task.get_offset());
dir_inode->invalidate_cache();
/*
* We have received a bad cookie error, we have to restart enumeration
* until either the server returns a valid response or we reach eof.
* If we keep getting bad cookie we will keep on reenumerating forever.
*/
last_valid_offset = 0;
/*
* We need to maintain the monotonocity of the target_offset
* because it represents the offsets already sent to fuse as part of
* this enumeration. This protects us from sending duplicate entries
* to fuse if we receive bad_cookie before we reach the target during
* reenumeration.
* If this is the first bad_cookie error for this enumeration, then
* target_offset must be set to "get_offset() + 1", else if it's a
* re-enumeration and we again got a badcookie then the target_offset
* must not be set less than the original target_offset.
*/
task->rpc_api->readdir_task.set_target_offset(
std::max(task->rpc_api->readdir_task.get_offset() + 1,
task->rpc_api->readdir_task.get_target_offset()));
} else {
task->reply_error(status);
return;
}
/*
* We have not seen a new entry and the call has not failed, hence this is a
* reenumeration call and we have not reached the target_offset yet. We have to
* start another readdirplus call for the next batch.
* The assert has last_valid_offset==0 clause for cases where the callback
* was called for a regular readdir (not re-enumerating) but it failed with
* badcookie and hence we are here enumerating.
*/
assert(!got_new_entry);
assert(is_reenumerating || last_valid_offset == 0);
assert(last_valid_offset <
task->rpc_api->readdir_task.get_target_offset());
assert(!eof);
/*
* Create a new child task to carry out this request.
* Query cookies starting from last_valid_offset+1.
* If re-enumeration, set the target_offset appropriately.
*/
struct rpc_task *child_tsk =
task->get_client()->get_rpc_task_helper()->alloc_rpc_task_reserved(FUSE_READDIRPLUS);
child_tsk->init_readdirplus(
task->rpc_api->req,
task->rpc_api->readdir_task.get_ino(),
task->rpc_api->readdir_task.get_size(),
last_valid_offset,
task->rpc_api->readdir_task.get_target_offset(),
task->rpc_api->readdir_task.get_fuse_file());
assert(child_tsk->rpc_api->parent_task == nullptr);
AZLogDebug("[{}] readdirplus_callback{}: Re-enumerating from {} with "
"target_offset {}",
dir_ino, last_valid_offset,
task->rpc_api->readdir_task.get_target_offset());
/*
* This will orchestrate a new readdir call and we will handle the response
* in the callback. We already ensure we do not send duplicate entries to fuse.
*/
child_tsk->fetch_readdirplus_entries_from_server();
// Free the current task here, the child task will ensure a response is sent.
task->free_rpc_task();
}