turbonfs/inc/rpc_task.h (1,594 lines of code) (raw):

#ifndef __RPC_TASK_H__ #define __RPC_TASK_H__ #include <cstddef> #include <string> #include <mutex> #include <stack> #include <shared_mutex> #include <vector> #include <set> #include <thread> #include "nfs_client.h" #include "file_cache.h" #include "rpc_stats.h" #include "log.h" // Maximum number of simultaneous rpc tasks (sync + async). #define MAX_OUTSTANDING_RPC_TASKS 65536 // Maximum number of simultaneous async rpc tasks. #define MAX_ASYNC_RPC_TASKS 1024 /** * Update cached inode attributes from freshly received postop attributes. * This should be called for (read) operations which do not update file/dir * and hence NFS server returns just the postop attributes. * * Note: Blob NFS server must always return postop attributes for success * returns, hence assert to check. * * XXX There are certain scenarios where Blob NFS may fail to provide postop * attributes for success return. If getattr fails after the operation * then operation will complete successfully but postop attributes won't * be returned, f.e., if file is deleted right after the operation. * XXX The attributes_follow assert is disabled as we have a PP in the server * which emulates postop attributes not being sent. When running against * prod tenants where PPs are disabled, you can enable the assert check. * Note: We use has_filecache() for printing get_cached_filesize() in place of * is_regfile(). This is because we have a special case of regular files * created by aznfsc_ll_mknod(). These do not have on_fuse_open() called * for them as mknod() (unlike creat()) does not return an open fd and * caller need to call aznfsc_ll_open(), which calls on_fuse_open(). Such * files return true for is_regfile() but do not have file cache created. */ #define UPDATE_INODE_ATTR(inode, postop) \ do { \ assert(inode->magic == NFS_INODE_MAGIC); \ /* assert(postop.attributes_follow); */\ if (postop.attributes_follow) { \ AZLogDebug("[{}] UPDATE_INODE_ATTR() from {}", \ inode->get_fuse_ino(), \ __FUNCTION__); \ inode->update(&(postop.post_op_attr_u.attributes)); \ AZLogDebug("[{}] UPDATE_INODE_ATTR(): sfsize: {}, cfsize: {}, " \ "csfsize: {}", \ inode->get_fuse_ino(), \ inode->get_server_file_size(), \ (inode->is_regfile() && inode->has_filecache()) ? \ inode->get_client_file_size() : -1, \ (inode->is_regfile() && inode->has_filecache()) ? \ inode->get_cached_filesize() : -1); \ } \ } while (0) /** * Update cached inode attributes from freshly received wcc attributes. * This should be called for (update) operations which update file/dir * and hence NFS server returns both preop and postop attributes. * The preop attributes are compared with the cached attributes to see if * the file/dir has changed from what we have cached, and if yes then we * invalidate the file/dir cache. If the postop attribute are present and * newer (based on ctime comparison) than the cached attributes, inode cached * attributes are updated. * * Note: Blob NFS server must always return both preop and postop attributes * for success returns, hence assert to check. */ #define UPDATE_INODE_WCC(inode, wcc_data) \ do { \ assert(inode->magic == NFS_INODE_MAGIC); \ const struct post_op_attr& postop = wcc_data.after; \ const struct pre_op_attr& preop = wcc_data.before; \ const struct fattr3 *postattr = nullptr; \ const struct wcc_attr *preattr = nullptr; \ /* \ assert(postop.attributes_follow); \ assert(preop.attributes_follow); \ */ \ if (postop.attributes_follow) { \ postattr = &postop.post_op_attr_u.attributes; \ } \ if (preop.attributes_follow) { \ preattr = &preop.pre_op_attr_u.attributes; \ } \ if (preattr || postattr) { \ AZLogDebug("[{}] UPDATE_INODE_WCC() from {}", \ inode->get_fuse_ino(), \ __FUNCTION__); \ inode->update(postattr, preattr); \ AZLogDebug("[{}] UPDATE_INODE_WCC(): sfsize: {}, cfsize: { }" \ "csfsize: {}", \ inode->get_fuse_ino(), \ inode->get_server_file_size(), \ (inode->is_regfile() && inode->has_filecache()) ? \ inode->get_client_file_size() : -1, \ (inode->is_regfile() && inode->has_filecache()) ? \ inode->get_cached_filesize() : -1); \ } \ } while (0) /** * LOOKUP RPC task definition. */ struct lookup_rpc_task { void set_file_name(const char *name) { file_name = ::strdup(name); } void set_parent_ino(fuse_ino_t parent) { parent_ino = parent; } /* * Note: fileinfo is not passed by fuse for lookup request, this is only * used for storing the fileinfo of the original request, when this * lookup_rpc_task is used for proxy lookup. * See do_proxy_lookup(). */ void set_fuse_file(fuse_file_info *fileinfo) { if (fileinfo != nullptr) { file = *fileinfo; file_ptr = &file; } else { file_ptr = nullptr; } } fuse_ino_t get_parent_ino() const { return parent_ino; } const char *get_file_name() const { return file_name; } struct fuse_file_info *get_fuse_file() const { return file_ptr; } /** * Release any resources used up by this task. */ void release() { ::free(file_name); } private: fuse_ino_t parent_ino; char *file_name; struct fuse_file_info file; struct fuse_file_info *file_ptr; }; struct access_rpc_task { void set_ino(fuse_ino_t _ino) { ino = _ino; } void set_mask(int _mask) { mask = _mask; } fuse_ino_t get_ino() const { return ino; } int get_mask() const { return mask; } private: fuse_ino_t ino; int mask; }; /** * WRITE RPC task definition. */ struct write_rpc_task { void set_ino(fuse_ino_t ino) { file_ino = ino; } void set_offset(off_t off) { offset = off; } void set_size(size_t size) { /* * length is how much this WRITE RPC wants to write. */ length = size; } void set_buffer_vector(struct fuse_bufvec *bufv) { write_bufv = bufv; } fuse_ino_t get_ino() const { return file_ino; } off_t get_offset() const { return offset; } size_t get_size() const { return length; } struct fuse_bufvec *get_buffer_vector() const { return write_bufv; } bool is_fe() const { const bool is_fe = (write_bufv != nullptr); assert(is_fe == (length > 0)); assert(is_fe || (offset == 0)); assert(file_ino != 0); return is_fe; } bool is_be() const { const bool is_be = (write_bufv == nullptr); assert(is_be == (length == 0)); assert(!is_be || (offset == 0)); assert(file_ino != 0); return is_be; } /** * Release any resources used up by this task. */ void release() { } private: fuse_ino_t file_ino; size_t length; off_t offset; struct fuse_bufvec *write_bufv; }; /** * This is an io vector of bytes_chunks. * Anyone trying to perform IOs to/from a vector of bytes_chunks should use * this instead of the vanilla std::vector<bytes_chunk>. Apart from acting * as the storage for bytes_chunks (which helps to grab reference on * underlying membufs so that they are not freed) this also provides iovecs * needed for performing vectored IO, with support for updating the iovecs * as IOs complete (partially or fully), and the updated offset and length * into the file where the IO is performed. * * Note: Currently it's used only for writing a vector of bytes_chunks, so * the code sets membuf flags accordingly. If we want to use it for * vectored reads also then we will have to make those conditional. */ #define BC_IOVEC_MAGIC *((const uint32_t *)"BIOV") /* * This must not be greater than libnfs RPC_MAX_VECTORS. * We limit it to 1000 as libnfs adds some more iovecs for header, marker etc. */ #define BC_IOVEC_MAX_VECTORS 1000 struct bc_iovec { const uint32_t magic = BC_IOVEC_MAGIC; /** * Constructor must initialize max_iosize, the maximum IO size that we * will issue to the file using this bc_iovec. * It takes nfs_inode for releasing the cache chunks as IOs get completed * for the queued bytes_chunks. * * Note: If the inode has stable writes enabled then we set max_iosize to * wsize_adj as advertised by the server (since server can only deal * with that big write size), else writes are sent as UNSTABLE writes * for which we use AZNFSCFG_WSIZE_MAX. * Note: This takes shared lock on ilock_1. */ bc_iovec(struct nfs_inode *_inode) : inode(_inode), max_iosize(inode->is_stable_write() ? inode->get_client()->mnt_options.wsize_adj : AZNFSCFG_WSIZE_MAX) { assert(inode->magic == NFS_INODE_MAGIC); assert(inode->is_regfile()); /* * bc_iovec is used to perform writeback of cached data, so cache must * be present. */ assert(inode->has_filecache()); /* * Grab a ref on the inode as we will need it for releasing cache * chunks as IOs complete. */ inode->incref(); assert(max_iosize > 0); /* * TODO: Currently we don't support wsize smaller than 1MB. * See below. */ assert(max_iosize >= AZNFSCFG_WSIZE_MIN); assert(max_iosize <= AZNFSCFG_WSIZE_MAX); assert(iovcnt == 0); assert(iov == base); } /* * Note: This takes shared lock on ilock_1. */ ~bc_iovec() { assert(bcq.empty()); assert(inode->magic == NFS_INODE_MAGIC); /* * We don't want to cache the data after the write completes. * We do it here once for the entire bc_iovec and not in * on_io_complete() as every bytes_chunk completes as scanning the * bytes_chunk_cache is expensive. */ assert(inode->has_filecache()); inode->get_filecache()->release(orig_offset, orig_length); inode->decref(); } /** * Add a new bytes_chunk to this bc_iovec. * If added successfully it returns true else returns false. * A bytes_chunk is added successfully if all the following conditions * are met: * - This is the first bytes_chunk to be added, or * this is contiguos to the last bytes_chunk. * - After adding this bytes_chunk the total queued bytes does not * exceed max_iosize. * - After adding this bytes_chunk the total number of iovecs don't exceed * BC_IOVEC_MAX_VECTORS. * * It sets flushing for a successfully added membuf. * bc must be set locked and inuse by the caller. * * A false return signifies to the caller that no more bytes_chunks can be * packed into this bc_iovec and it should dispatch it now. A true return * otoh indicates that there is still space for more bytes_chunks and caller * should wait. */ bool add_bc(const struct bytes_chunk& bc) { /* * Caller must be holding inode->flush_lock(). */ assert(inode->is_flushing); /* * All bytes_chunks must be added in the beginning before dispatching * the first write, till then iov will be same as base. */ assert(iov == base); // There's one iov per bytes_chunk. assert(iovcnt == (int) bcq.size()); /* * We don't support single bytes_chunk having length greater than the * max_iosize. * * Note: This means we should not set wsize less than 1MB, since fuse * can send writes upto 1MB. */ assert(bc.length <= max_iosize); // pvt must start as 0. assert(bc.pvt == 0); /* * We should never write a partial membuf, that will cause issues as * membuf flags (dirty, flushing, in this case) are tracked at membuf * granularity. Check maps_full_membuf() to see how the membuf itself * may have been trimmed by a release() call, but the bc must refer to * whatever membuf part is currently valid. */ assert(bc.maps_full_membuf()); struct membuf *const mb = bc.get_membuf(); /* * Caller must have held the membuf inuse count and the lock. * Also only uptodate membufs can be written. */ assert(mb->is_inuse()); assert(mb->is_locked()); assert(mb->is_uptodate()); // First iovec being added. if (iovcnt == 0) { assert(offset == 0); assert(length == 0); /* * XXX This should be the entire bytes_chunk as stored in the * chunkmap, though because of trimming it may be smaller than * the underlying membuf. Unfortunately today we do not have a * safe way to assert for that. */ iov[0].iov_base = bc.get_buffer(); iov[0].iov_len = bc.length; orig_offset = offset = bc.offset; orig_length = length = bc.length; iovcnt++; mb->set_flushing(); /* * Add new bytes_chunk to the tail of the queue. * This will now hold the reference on the underlying membuf till * the bytes_chunk is removed from the queue. */ bcq.emplace(bc); /* * Update fcsm::flushing_seq_num, as we have now arranged to flush * bc.length bytes. They will be flushed later once we have * accumulated enough, but they will definitely be flushed. * We want to update flushing_seq_num before they are actually * flushed as flush callbacks can start getting called anytime after * this. */ inode->get_fcsm()->add_flushing(bc.length); return true; } else if (((offset + length) == bc.offset) && ((length + bc.length) <= max_iosize) && (iovcnt + 1) <= BC_IOVEC_MAX_VECTORS) { iov[iovcnt].iov_base = bc.get_buffer(); iov[iovcnt].iov_len = bc.length; length += bc.length; orig_length = length; iovcnt++; mb->set_flushing(); bcq.emplace(bc); inode->get_fcsm()->add_flushing(bc.length); return true; } // Could not add this bc. return false; } /** * Must be called when bytes_completed bytes are successfully read/written. */ void on_io_complete(uint64_t bytes_completed, bool is_unstable_write = false); /** * Call on IO failure. */ void on_io_fail(int status); /* * Current iovec we should be performing IO to/from, updated as we finish * reading/writing whole iovecs. iovcnt holds the count of iovecs remaining * to be read/written and is decremented as we read whole iovecs. We also * update the iov_base and iov_len as we read/write data from the current * iov[], so at any point iov and iovcnt can be passed to any function * that operates on iovecs. */ struct iovec *iov = base; int iovcnt = 0; /* * Offset and length in the file where the IO should be performed. * These are updated as partial IOs complete. * orig_offset and orig_length track the originally requested offset and * length, used only for logging. */ uint64_t offset = 0; uint64_t length = 0; uint64_t orig_offset = 0; uint64_t orig_length = 0; /* * Hold refs to the bytes_chunks. * add_bc() adds new bytes_chunk to the front of this and on_io_complete() * removes from the tail (if the completed IO covers the entire bytes_chunk). */ std::queue<bytes_chunk> bcq; private: struct nfs_inode *const inode; /* * Fixed iovec array, iov points into it. * * TODO: See if we should allocate this as a variable sized vector * dynamically. That will be useful only when we know the size of * the vector in advance. */ struct iovec base[BC_IOVEC_MAX_VECTORS]; /* * Maximum IO size for performing IO to the backing file. */ const uint64_t max_iosize; }; /** * FLUSH RPC task definition. */ struct flush_rpc_task { void set_ino(fuse_ino_t ino) { file_ino = ino; } fuse_ino_t get_ino() const { return file_ino; } /** * Release any resources used up by this task. */ void release() { } private: fuse_ino_t file_ino; }; /** * GETATTR RPC task definition. */ struct getattr_rpc_task { void set_ino(fuse_ino_t ino) { this->ino = ino; } fuse_ino_t get_ino() const { return ino; } private: fuse_ino_t ino; }; /** * SETATTR RPC task definition. */ struct setattr_rpc_task { void set_ino(fuse_ino_t ino) { this->ino = ino; } void set_fuse_file(fuse_file_info *fileinfo) { /* * fuse can pass this as nullptr. * The fuse_file_info pointer passed to the fuse lowlevel API is only * valid in the issue path. Since we want to use it after that, we have * to make a deep copy of that. */ if (fileinfo != nullptr) { file = *fileinfo; file_ptr = &file; } else { file_ptr = nullptr; } } void set_attribute_and_mask(const struct stat *_attr, int mask) { // TODO: Should we only copy the required fields? attr = *_attr; /* * We don't make use of FUSE_SET_ATTR_CTIME, ignore it. */ to_set = mask & ~FUSE_SET_ATTR_CTIME; } const struct stat *get_attr() const { return &attr; } int get_attr_flags_to_set() const { return to_set; } struct fuse_file_info *get_fuse_file() const { return file_ptr; } fuse_ino_t get_ino() const { return ino; } private: // Inode of the file for which attributes have to be set. fuse_ino_t ino; // File info passed by the fuse layer. fuse_file_info file; fuse_file_info *file_ptr; /* * Attributes value to be set to. */ struct stat attr; // Valid attribute mask to be set. int to_set; }; struct statfs_rpc_task { fuse_ino_t get_ino() const { return ino; } void set_ino(fuse_ino_t _ino) { ino = _ino; } private: fuse_ino_t ino; }; struct create_file_rpc_task { fuse_ino_t get_parent_ino() const { return parent_ino; } const char *get_file_name() const { return file_name; } uid_t get_uid() const { return uid; } gid_t get_gid() const { return gid; } mode_t get_mode() const { return mode; } struct fuse_file_info *get_fuse_file() const { return file_ptr; } void set_parent_ino(fuse_ino_t parent) { parent_ino = parent; } void set_file_name(const char *name) { file_name = ::strdup(name); } void set_uid(uid_t _uid) { uid = _uid; } void set_gid(gid_t _gid) { gid = _gid; } void set_mode(mode_t _mode) { mode = _mode; } void set_fuse_file(fuse_file_info *fileinfo) { assert(fileinfo != nullptr); file = *fileinfo; file_ptr = &file; } void release() { ::free(file_name); } private: fuse_ino_t parent_ino; char *file_name; uid_t uid; gid_t gid; mode_t mode; struct fuse_file_info file; struct fuse_file_info *file_ptr; }; struct mknod_rpc_task { fuse_ino_t get_parent_ino() const { return parent_ino; } const char *get_file_name() const { return file_name; } uid_t get_uid() const { return uid; } gid_t get_gid() const { return gid; } mode_t get_mode() const { return mode; } void set_parent_ino(fuse_ino_t parent) { parent_ino = parent; } void set_file_name(const char *name) { file_name = ::strdup(name); } void set_uid(uid_t _uid) { uid = _uid; } void set_gid(gid_t _gid) { gid = _gid; } void set_mode(mode_t _mode) { mode = _mode; } void release() { ::free(file_name); } private: fuse_ino_t parent_ino; char *file_name; uid_t uid; gid_t gid; mode_t mode; }; struct mkdir_rpc_task { fuse_ino_t get_parent_ino() const { return parent_ino; } const char *get_dir_name() const { return dir_name; } uid_t get_uid() const { return uid; } gid_t get_gid() const { return gid; } mode_t get_mode() const { return mode; } void set_parent_ino(fuse_ino_t parent) { parent_ino = parent; } void set_dir_name(const char *name) { dir_name = ::strdup(name); } void set_uid(uid_t _uid) { uid = _uid; } void set_gid(gid_t _gid) { gid = _gid; } void set_mode(mode_t _mode) { mode = _mode; } void release() { ::free(dir_name); } private: fuse_ino_t parent_ino; char *dir_name; uid_t uid; gid_t gid; mode_t mode; }; struct unlink_rpc_task { fuse_ino_t get_parent_ino() const { return parent_ino; } const char *get_file_name() const { return file_name; } bool get_for_silly_rename() const { return for_silly_rename; } void set_parent_ino(fuse_ino_t parent) { parent_ino = parent; } void set_file_name(const char *name) { file_name = ::strdup(name); } void set_for_silly_rename(bool _for_silly_rename) { for_silly_rename = _for_silly_rename; } void release() { ::free(file_name); } private: fuse_ino_t parent_ino; char *file_name; /* * Is this unlink task deleting a silly-renamed file when the last opencnt * is dropped? If yes, then we need to drop the extra ref on the parent * inode held by rename_callback(). */ bool for_silly_rename; }; struct rmdir_rpc_task { fuse_ino_t get_parent_ino() const { return parent_ino; } const char *get_dir_name() const { return dir_name; } void set_parent_ino(fuse_ino_t parent) { parent_ino = parent; } void set_dir_name(const char *name) { dir_name = ::strdup(name); } void release() { ::free(dir_name); } private: fuse_ino_t parent_ino; char *dir_name; }; struct symlink_rpc_task { fuse_ino_t get_parent_ino() const { return parent_ino; } const char *get_name() const { return name; } const char *get_link() const { return link; } uid_t get_uid() const { return uid; } gid_t get_gid() const { return gid; } void set_parent_ino(fuse_ino_t parent) { parent_ino = parent; } void set_name(const char *_name) { name = ::strdup(_name); } void set_link(const char *_link) { link = ::strdup(_link); } void set_uid(uid_t _uid) { uid = _uid; } void set_gid(gid_t _gid) { gid = _gid; } void release() { ::free(name); ::free(link); } private: fuse_ino_t parent_ino; char *name; char *link; uid_t uid; gid_t gid; }; /** * NFS RENAME is performed from parent_ino/name -> newparent_ino/newname. * These will be the original {oldpath, newpath} in case of user requested * rename while for silly rename we rename the outgoing file. * For details of various members and methods see init_rename() prototype * below. */ struct rename_rpc_task { void set_parent_ino(fuse_ino_t parent) { parent_ino = parent; assert(parent_ino != 0); } fuse_ino_t get_parent_ino() const { assert(parent_ino != 0); return parent_ino; } void set_name(const char *_name) { assert(_name != nullptr); name = ::strdup(_name); } const char *get_name() const { return name; } void set_newparent_ino(fuse_ino_t parent) { newparent_ino = parent; assert(newparent_ino != 0); } fuse_ino_t get_newparent_ino() const { assert(newparent_ino != 0); return newparent_ino; } void set_newname(const char *name) { assert(name != nullptr); newname = ::strdup(name); } const char *get_newname() const { return newname; } /* * oldparent_ino/oldname will be non-zero/non-null only for silly rename * triggered by user requested rename operation. */ void set_oldparent_ino(fuse_ino_t parent) { oldparent_ino = parent; } fuse_ino_t get_oldparent_ino() const { return oldparent_ino; } void set_oldname(const char *name) { oldname = name ? ::strdup(name) : nullptr; } const char *get_oldname() const { return oldname; } void set_silly_rename(bool is_silly) { // For silly rename olddir and newdir must be same. assert(!is_silly || (parent_ino == newparent_ino)); silly_rename = is_silly; } bool get_silly_rename() const { // For silly rename olddir and newdir must be same. assert(!silly_rename || (parent_ino == newparent_ino)); return silly_rename; } void set_silly_rename_ino(fuse_ino_t _silly_rename_ino) { silly_rename_ino = _silly_rename_ino; assert(silly_rename == (silly_rename_ino != 0)); } fuse_ino_t get_silly_rename_ino() const { return silly_rename_ino; } bool get_rename_triggered_silly_rename() const { assert((oldparent_ino != 0) == (oldname != nullptr)); return (oldname != nullptr); } void release() { ::free(name); ::free(newname); ::free(oldname); } private: fuse_ino_t parent_ino; fuse_ino_t newparent_ino; fuse_ino_t oldparent_ino; char *name; char *newname; char *oldname; bool silly_rename; fuse_ino_t silly_rename_ino; }; struct readlink_rpc_task { void set_ino(fuse_ino_t _ino) { ino = _ino; } fuse_ino_t get_ino() const { return ino; } private: fuse_ino_t ino; }; struct readdir_rpc_task { public: void set_size(size_t sz) { size = sz; } void set_offset(off_t off) { offset = off; } void set_target_offset(off_t offset) { target_offset = offset; } void set_ino(fuse_ino_t ino) { inode = ino; } void set_fuse_file(fuse_file_info* fileinfo) { // The fuse can pass this as nullptr. if (fileinfo != nullptr) { file = *fileinfo; file_ptr = &file; } else { file_ptr = nullptr; } } /** * Unlike other set methods, this should not be called from * init_readdir{plus}(), but must be called from * fetch_readdir{plus}_entries_from_server() when we are about to send * READDIR{PLUS} RPC to the server and we have the correct cookieverf * value being sent to the server. * readdir{plus}_callback() will then look this up and assert that server * sent the same cookieverf in the response as we sent in the request, * unless request was a fresh one carrying cookieverf=0. */ void set_cookieverf(const cookieverf3& cookieverf) { ::memcpy(&cookie_verifier, &cookieverf, sizeof(cookie_verifier)); } fuse_ino_t get_ino() const { return inode; } off_t get_offset() const { return offset; } off_t get_target_offset() const { return target_offset; } size_t get_size() const { return size; } struct fuse_file_info *get_fuse_file() const { return file_ptr; } /** * We return the cookieverf converted to uint64_t as caller mostly wants * to compare this as an integer. */ uint64_t get_cookieverf() const { return cv2i(cookie_verifier); } private: // Inode of the directory. fuse_ino_t inode; // Maximum size of entries requested by the caller. size_t size; off_t offset; /* * Target offset to reach if this is a re-enumeration. * This is one more than the last cookie seen before we got a badcookie * error, so only cookies >= target_offset are of interest on * re-enumeration. Only those we can send in our response to fuse. */ off_t target_offset; // File info passed by the fuse layer. fuse_file_info file; fuse_file_info* file_ptr; cookieverf3 cookie_verifier; }; struct read_rpc_task { public: void set_size(size_t sz) { size = sz; } void set_offset(off_t off) { offset = off; } void set_ino(fuse_ino_t ino) { inode = ino; } void set_fuse_file(fuse_file_info* fileinfo) { // The fuse can pass this as nullptr. if (fileinfo != nullptr) { file = *fileinfo; file_ptr = &file; } else { file_ptr = nullptr; } } fuse_ino_t get_ino() const { return inode; } off_t get_offset() const { return offset; } size_t get_size() const { return size; } struct fuse_file_info *get_fuse_file() const { return file_ptr; } bool is_fe() const { return (file_ptr != nullptr); } bool is_be() const { return !is_fe(); } private: // Inode of the file. fuse_ino_t inode; // Size of data to be read. size_t size; // Offset from which the file data should be read. off_t offset; // File info passed by the fuse layer. fuse_file_info file; fuse_file_info *file_ptr; }; /** * RPC API specific task info. * This must be sufficient information needed to retry the task in case of * JUKEBOX failures. */ struct api_task_info { ~api_task_info() { /* * Don't call release() from here as it must have been called before * we reach here. Duplicate release() call might cause double free of * members in various *_rpc_task union members. * See rpc_task::free_rpc_task() and ~jukebox_seedinfo(). */ } /* * Fuse request structure. * This is the request structure passed from the fuse layer, on behalf of * which this RPC task is run. */ fuse_req *req = nullptr; /* * Only valid for FUSE_READ and FUSE_WRITE. * * This will refer to the parent task for a child task. * This will be nullptr for parent task. * * When do we need parent tasks? * Note that one fuse request is tracked using one rpc_task, so if we * have to issue multiple backend RPCs to serve a single fuse req, then * each of those multiple backend RPCs become a new (child) rpc_task and * the rpc_task tracking the fuse request becomes the parent task. * We do this for a couple of reasons, most importantly it helps RPC * accounting/stats (which needs to track every RPC issued to the backend) * and secondly it helps to use the RPC pool size for limiting the max * outstanding RPCs to the backend. * * When do we need multiple backend RPCs to serve a single fuse RPC? * We have the following known cases: * 1. Used by fuse read when bytes_chunk_cache::get() returns more than * one bytes_chunk for a given fuse read. Note that each of these * bytes_chunk is issued as an individual RPC READ to the NFS server. * We allocate child rpc_task structures for each of these READs and * the fuse read is tracked by the parent rpc_task. Note that we can * make use of rpc_nfs3_readv_task() API to issue a single RPC READ, * but if the to-be-read data is not contiguous inside the file (this * can happen if some data in the middle is already in the cache) we * may still need multiple RPC READs. * This should not be very common though. * 2. Used by fuse write in case where we copy a user write request into * the cache and find out that we are under memory pressure and need to * hold the fuse request till we flush enough dirty data to release the * memory pressure. Each of the dirty chunks will be issued as a child * rpc_task, while the frontend write RPC will be tracked by the parent * rpc_task. The parent RPC task will be completed, causing fuse callback * to be called, when all the child RPC tasks complete. */ rpc_task *parent_task = nullptr; /* * Only valid for FUSE_READ. * * This is the byte chunk where the data has to be read by this READ RPC. * Note that the parent task calls bytes_chunk_cache::get() and populates * the chunks in rpc_task::bc_vec[]. This points to one of those chunks. */ struct bytes_chunk *bc = nullptr; /* * User can use this to store anything that they want to be available with * the task. * Writes use it to store a pointer to bc_iovec, so that the write * context (offset, length and address to write to) is available across * partial writes, jukebox retries, etc. */ void *pvt = nullptr; /* * Operation type. * Used to access the following union. * Note that 0 is an invalid fuse opcode. */ enum fuse_opcode optype = (fuse_opcode) 0; /* * Proxy operation type. * If the RPC task is issued on behalf of another task, this will be set * to the optype of the original RPC task issuing this. */ enum fuse_opcode proxy_optype = (fuse_opcode) 0; /* * Unnamed union for easy access. */ union { struct lookup_rpc_task lookup_task; struct access_rpc_task access_task; struct write_rpc_task write_task; struct flush_rpc_task flush_task; struct getattr_rpc_task getattr_task; struct setattr_rpc_task setattr_task; struct statfs_rpc_task statfs_task; struct create_file_rpc_task create_task; struct mknod_rpc_task mknod_task; struct mkdir_rpc_task mkdir_task; struct unlink_rpc_task unlink_task; struct rmdir_rpc_task rmdir_task; struct symlink_rpc_task symlink_task; struct rename_rpc_task rename_task; struct readlink_rpc_task readlink_task; struct readdir_rpc_task readdir_task; struct read_rpc_task read_task; }; /** * Is this a directory operation? */ bool is_dirop() const { switch(optype) { case FUSE_LOOKUP: case FUSE_CREATE: case FUSE_MKNOD: case FUSE_MKDIR: case FUSE_SYMLINK: case FUSE_UNLINK: case FUSE_RMDIR: case FUSE_RENAME: return true; default: return false; } } /** * For ops that take an inode, this returns the inode number. */ fuse_ino_t get_ino() const { switch(optype) { case FUSE_ACCESS: return access_task.get_ino(); case FUSE_WRITE: return write_task.get_ino(); case FUSE_FLUSH: return flush_task.get_ino(); case FUSE_GETATTR: return getattr_task.get_ino(); case FUSE_SETATTR: return setattr_task.get_ino(); case FUSE_STATFS: return statfs_task.get_ino(); case FUSE_READLINK: return readlink_task.get_ino(); case FUSE_READDIR: case FUSE_READDIRPLUS: return readdir_task.get_ino(); case FUSE_READ: return read_task.get_ino(); default: assert(0); return 0; } } /** * For ops that take a parent directory and filename, this returns the * parent directory inode. */ fuse_ino_t get_parent_ino() const { switch(optype) { case FUSE_LOOKUP: return lookup_task.get_parent_ino(); case FUSE_CREATE: return create_task.get_parent_ino(); case FUSE_MKNOD: return mknod_task.get_parent_ino(); case FUSE_MKDIR: return mkdir_task.get_parent_ino(); case FUSE_SYMLINK: return symlink_task.get_parent_ino(); case FUSE_UNLINK: return unlink_task.get_parent_ino(); case FUSE_RMDIR: return rmdir_task.get_parent_ino(); case FUSE_RENAME: return rename_task.get_parent_ino(); default: assert(0); return 0; } } /** * For ops that take a parent directory and filename, this returns the * filename. */ const char *get_file_name() const { switch(optype) { case FUSE_LOOKUP: return lookup_task.get_file_name(); case FUSE_CREATE: return create_task.get_file_name(); case FUSE_MKNOD: return mknod_task.get_file_name(); case FUSE_MKDIR: return mkdir_task.get_dir_name(); case FUSE_SYMLINK: return symlink_task.get_name(); case FUSE_UNLINK: return unlink_task.get_file_name(); case FUSE_RMDIR: return rmdir_task.get_dir_name(); case FUSE_RENAME: return rename_task.get_name(); default: assert(0); return nullptr; } } /** * We cannot specify destructors for the <api>_rpc_task structures, since * they are part of a C union. Use release() method for performing any * cleanup. */ void release() { assert(optype > 0 && optype <= FUSE_OPCODE_MAX); req = nullptr; parent_task = nullptr; bc = nullptr; pvt = nullptr; switch(optype) { case FUSE_LOOKUP: lookup_task.release(); break; case FUSE_CREATE: create_task.release(); break; case FUSE_MKNOD: mknod_task.release(); break; case FUSE_MKDIR: mkdir_task.release(); break; case FUSE_UNLINK: unlink_task.release(); break; case FUSE_RMDIR: rmdir_task.release(); break; case FUSE_SYMLINK: symlink_task.release(); break; case FUSE_RENAME: rename_task.release(); break; default : break; } } }; #define RPC_TASK_MAGIC *((const uint32_t *)"RTSK") /** * This describes an RPC task which is created to handle a fuse request. * The RPC task tracks the progress of the RPC request sent to the server and * remains valid till the RPC request completes. */ struct rpc_task { friend class rpc_task_helper; const uint32_t magic = RPC_TASK_MAGIC; /* * The client for which the context is created. * This is initialized when the rpc_task is added to the free tasks list * and never changed afterwards, since we have just one nfs_client used by * all rpc tasks. */ struct nfs_client *const client; // This is the index of the object in the rpc_task_list vector. const int index; private: /* * Flag to identify async tasks. * All rpc_tasks start sync, but the caller can make an rpc_task * async by calling set_async_function(). The std::function object * passed to set_async_function() will be run asynchronously by the * rpc_task. That should call the run*() method at the least. */ std::atomic<bool> is_async_task = false; // Put a cap on how many async tasks we can start. static std::atomic<int> async_slots; /* * Connection scheduling type to be used for this RPC. * Issuer of the RPC will know what connection scheduling type is most * optimal for the RPC, it must set it and RPC layer will then honor that. */ conn_sched_t csched = CONN_SCHED_INVALID; /* * FH hash to be used for connection scheduling if/for CONN_SCHED_FH_HASH. */ uint32_t fh_hash = 0; public: /* * Issuing thread's tid. Useful for debugging. * Only set if ENABLE_PARANOID is defined. */ pid_t issuing_tid = -1; /* * Valid only for read RPC tasks. * To serve single client read call we may issue multiple NFS reads * depending on the chunks returned by bytes_chunk_cache::get(). * * num_ongoing_backend_reads tracks how many of the backend reads are * currently pending. We cannot complete the application read until all * reads complete (either success or failure). * * read_status is the final read status we need to send to fuse. * This is set when we get an error, so that even if later reads complete * successfully we fail the fuse read. */ std::atomic<int> num_ongoing_backend_reads = 0; std::atomic<int> read_status = 0; /* * Valid only for write RPC tasks. * When under memory pressure we do not complete fuse write requests * immediately after copying the user data to the cache, instead we want * to slow down application writes by delaying fuse callback till we flush * sufficient dirty chunks to relieve the memory pressure. To achieve that * we issue one or more write RPCs to flush the dirty data and complete the * fuse callback only when all these write RPCs complete. * * num_ongoing_backend_writes tracks how many of these backend writes are * currently pending. We complete the application write only after all * writes complete (either success or failure). */ std::atomic<int> num_ongoing_backend_writes = 0; /* * This is currently valid only for reads. * This contains vector of byte chunks which is returned by making a call * to bytes_chunk_cache::get(). * This is populated by only one thread that calls run_read() for this * task, once populated multiple parallel threads may read it, so we * don't need to synchronize access to this with a lock. */ std::vector<bytes_chunk> bc_vec; enum fuse_opcode optype = (fuse_opcode) 0; protected: /* * RPC stats. This has both stats specific to this RPC as well as * aggregated RPC stats for all RPCs of this type and also global stats * for all RPCs. */ rpc_stats_az stats; public: rpc_task(struct nfs_client *_client, int _index) : client(_client), index(_index) { } /* * RPC API specific task info. * This is a pointer so that it can be quickly xferred to jukebox_seedinfo. */ api_task_info *rpc_api = nullptr; // TODO: Add valid flag here for APIs? /** * Set a function to be run by this rpc_task asynchronously. * Calling set_async_function() makes an rpc_task async. * The callback function takes two parameters: * 1. rpc_task pointer. * 2. Optional arbitrary data that caller may want to pass. */ bool set_async_function( std::function<void(struct rpc_task*)> func) { // Must not already be async. assert(!is_async()); assert(func); if (--async_slots < 0) { ++async_slots; AZLogError("Too many async rpc tasks: {}", async_slots.load()); assert(0); /* * TODO: Add a condition_variable where caller can wait and * will be woken up once it can create an async task. * Till then caller can wait for try again. */ return false; } /* * Mark this task async. * This has to be done before calling func() as it may check it * for the rpc_task. */ is_async_task = true; std::thread thread([this, func]() { func(this); /* * We increase the async_slots here and not in free_rpc_task() * as the task is technically done, it just needs to be reaped. */ async_slots++; }); /* * We detach from this thread to avoid having to join it, as * that causes problems with some caller calling free_rpc_task() * from inside func(). Moreover this is more like the sync tasks * where the completion context doesn't worry whether the issuing * thread has completed. * Since we don't have a graceful exit scenario, waiting for the * async threads is not really necessary. */ thread.detach(); return true; } /** * Return a string representation of opcode for logging. */ static const std::string fuse_opcode_to_string(fuse_opcode opcode); /** * Check if this rpc_task is an async task. */ bool is_async() const { return is_async_task; } /* * init/run methods for the LOOKUP RPC. */ void init_lookup(fuse_req *request, const char *name, fuse_ino_t parent_ino); void run_lookup(); /* * Proxy lookup task is used for sending a LOOKUP RPC on behalf of some * other RPC which failed to return the filehandle in postop data, f.e., * CREATE may fail to return FH, this is fine for NFS but fuse expects us * to convey the FH in our create callback, hence we issue a LOOKUP RPC * to query the FH for the newly created file. * * do_proxy_lookup() is called on the original task, it creates a new proxy * lookup task properly initialized with data from the original task, sends * the LOOKUP RPC, and on completion calls the fuse callback of the original * fuse request. */ void do_proxy_lookup() const; /* * init/run methods for the ACCESS RPC. */ void init_access(fuse_req *request, fuse_ino_t ino, int mask); void run_access(); /* * init/run methods for the WRITE RPC. * * Note: init_write_fe() is used to initialize the application write (frontend write). * It has valid buffer, offset and size we received from the application. * * Note: init_write_be() is used to initialize the backend write on a inode. It is used * to flush cached application writes to the backend. */ void init_write_fe(fuse_req *request, fuse_ino_t ino, struct fuse_bufvec *buf, size_t size, off_t offset); void init_write_be(fuse_ino_t ino); void run_write(); /* * init/run methods for the FLUSH/RELEASE RPC. */ void init_flush(fuse_req *request, fuse_ino_t ino); void run_flush(); /* * init/run methods for the GETATTR RPC. */ void init_getattr(fuse_req *request, fuse_ino_t ino); void run_getattr(); /* * Ref do_proxy_lookup() for details. */ void do_proxy_getattr() const; /* * init/run methods for the SETATTR RPC. */ void init_setattr(fuse_req *request, fuse_ino_t ino, const struct stat *attr, int to_set, struct fuse_file_info *file); void run_setattr(); void init_statfs(fuse_req *request, fuse_ino_t ino); void run_statfs(); /* * init/run methods for the CREATE RPC. */ void init_create_file(fuse_req *request, fuse_ino_t parent_ino, const char *name, mode_t mode, struct fuse_file_info *file); void run_create_file(); /* * init/run methods for the MKNOD RPC. */ void init_mknod(fuse_req *request, fuse_ino_t parent_ino, const char *name, mode_t mode); void run_mknod(); /* * init/run methods for the MKDIR RPC. */ void init_mkdir(fuse_req *request, fuse_ino_t parent_ino, const char *name, mode_t mode); void run_mkdir(); /* * init/run methods for the REMOVE RPC. */ void init_unlink(fuse_req *request, fuse_ino_t parent_ino, const char *name, bool for_silly_rename); void run_unlink(); /* * init/run methods for the RMDIR RPC. */ void init_rmdir(fuse_req *request, fuse_ino_t parent_ino, const char *name); void run_rmdir(); /* * init/run methods for the SYMLINK RPC. */ void init_symlink(fuse_req *request, const char *link, fuse_ino_t parent_ino, const char *name); void run_symlink(); /* * init/run methods for the RENAME RPC. * Note that this can be called both for performing a user requested rename * and for silly rename. silly rename in turn can be called for a user * requested unlink or rename. * * For user requested rename: * - parent_ino/name -> newparent_ino/newname, and * - silly_rename silly_rename, silly_rename_ino, oldparent_ino, old_name * must not be provided (and their default values are used). * * For silly rename called in response to user requested unlink: * - parent_ino/name is the to-be-unlinked file. * - newparent_ino/newname is the silly rename file. Since silly rename file * is created in the same directory, newparent_ino MUST be same as * parent_ino. * - silly_rename must be true. * - silly_rename_ino must be the ino of the to-be-silly-renamed file * parent_ino/name. * - oldparent_ino/old_name must not be passed. * * For silly rename called in response to user requested rename: * - parent_ino/name is the outgoing file i.e., the newfile passed by the * user requested rename. * - newparent_ino/newname is the silly rename file. Since silly rename file * is created in the same directory, newparent_ino MUST be same as * parent_ino. * - silly_rename must be true. * - silly_rename_ino must be the ino of the to-be-silly-renamed file * parent_ino/name. * - oldparent_ino/old_name is the orignal to-be-renamed file, i.e., the * oldfile passed by the user requested rename. This is not used by silly * rename but is used to initiate the actual rename once silly rename * completes successfully. */ void init_rename(fuse_req *request, fuse_ino_t parent_ino, const char *name, fuse_ino_t newparent_ino, const char *newname, bool silly_rename = false, fuse_ino_t silly_rename_ino = 0, fuse_ino_t oldparent_ino = 0, const char *old_name = nullptr); void run_rename(); /* * init/run methods for the READLINK RPC. */ void init_readlink(fuse_req *request, fuse_ino_t ino); void run_readlink(); // This function is responsible for setting up the members of readdir_task. void init_readdir(fuse_req *request, fuse_ino_t inode, size_t size, off_t offset, off_t target_offset, struct fuse_file_info *file); void run_readdir(); // This function is responsible for setting up the members of readdirplus_task. void init_readdirplus(fuse_req *request, fuse_ino_t inode, size_t size, off_t offset, off_t target_offset, struct fuse_file_info *file); void run_readdirplus(); // This function is responsible for setting up the members of read task. void init_read_fe(fuse_req *request, fuse_ino_t inode, size_t size, off_t offset, struct fuse_file_info *file); void init_read_be(fuse_ino_t inode, size_t size, off_t offset); void run_read(); void set_csched(conn_sched_t _csched) { assert(_csched > CONN_SCHED_INVALID && _csched <= CONN_SCHED_FH_HASH); csched = _csched; } conn_sched_t get_csched() const { assert(csched > CONN_SCHED_INVALID && csched <= CONN_SCHED_FH_HASH); return csched; } uint32_t get_fh_hash() const { // When get_fh_hash() is called, fh_hash must be set. assert(fh_hash != 0); return fh_hash; } void set_fuse_req(fuse_req *request) { rpc_api->req = request; } struct fuse_req *get_fuse_req() const { return rpc_api->req; } void set_op_type(enum fuse_opcode _optype) { optype = rpc_api->optype = _optype; rpc_api->proxy_optype = (fuse_opcode) 0; } void set_proxy_op_type(enum fuse_opcode _optype) { rpc_api->proxy_optype = _optype; } /** * See rpc_task::free_rpc_task() for why we need rpc_task::optype along * with api_task_info::optype. */ enum fuse_opcode get_op_type() const { assert(!rpc_api || optype == rpc_api->optype); return optype; } enum fuse_opcode get_proxy_op_type() const { assert(rpc_api != nullptr); return rpc_api->proxy_optype; } rpc_stats_az& get_stats() { return stats; } struct nfs_context *get_nfs_context() const; struct rpc_context *get_rpc_ctx() const { return nfs_get_rpc_context(get_nfs_context()); } nfs_client *get_client() const { assert (client != nullptr); return client; } int get_index() const { return index; } // The task should not be accessed after this function is called. void free_rpc_task(); /* * This method will reply with error and free the rpc task. * rc is either 0 for success, or a +ve errno value. */ void reply_error(int rc) { assert(rc >= 0); const int fre = fuse_reply_err(get_fuse_req(), rc); if (fre != 0) { INC_GBL_STATS(fuse_reply_failed, 1); AZLogError("fuse_reply_err({}) failed: {}", fmt::ptr(get_fuse_req()), fre); assert(0); } else { DEC_GBL_STATS(fuse_responses_awaited, 1); } free_rpc_task(); } void reply_statfs(const struct statvfs *statbuf) { const int fre = fuse_reply_statfs(get_fuse_req(), statbuf); if (fre != 0) { INC_GBL_STATS(fuse_reply_failed, 1); AZLogError("fuse_reply_statfs({}) failed: {}", fmt::ptr(get_fuse_req()), fre); assert(0); } else { DEC_GBL_STATS(fuse_responses_awaited, 1); } free_rpc_task(); } void reply_readlink(const char *linkname) { const int fre = fuse_reply_readlink(get_fuse_req(), linkname); if (fre != 0) { INC_GBL_STATS(fuse_reply_failed, 1); AZLogError("fuse_reply_readlink({}) failed: {}", fmt::ptr(get_fuse_req()), fre); assert(0); } else { DEC_GBL_STATS(fuse_responses_awaited, 1); } free_rpc_task(); } void reply_attr(const struct stat& attr, double attr_timeout) { const int fre = fuse_reply_attr(get_fuse_req(), &attr, attr_timeout); if (fre != 0) { INC_GBL_STATS(fuse_reply_failed, 1); AZLogError("fuse_reply_attr({}) failed: {}", fmt::ptr(get_fuse_req()), fre); assert(0); } else { DEC_GBL_STATS(fuse_responses_awaited, 1); } free_rpc_task(); } void reply_write(size_t count) { /* * Currently fuse sends max 1MiB write requests, so we should never * be responding more than that. * This is a sanity assert for catching unintended bugs, update if * fuse max write size changes. */ assert(count <= 1048576); INC_GBL_STATS(app_bytes_written, count); /* * We should not respond to fuse when there are still ongoing * backend writes. */ assert(num_ongoing_backend_writes == 0); const int fre = fuse_reply_write(get_fuse_req(), count); if (fre != 0) { INC_GBL_STATS(fuse_reply_failed, 1); AZLogError("fuse_reply_write({}) failed: {}", fmt::ptr(get_fuse_req()), fre); assert(0); } else { DEC_GBL_STATS(fuse_responses_awaited, 1); } free_rpc_task(); } void reply_iov(struct iovec *iov, size_t count) { // If count is non-zero iov must be valid and v.v. assert((iov == nullptr) == (count == 0)); /* * fuse_reply_iov() cannot handle count > FUSE_REPLY_IOV_MAX_COUNT, * o/w it fails with "fuse: writing device: Invalid argument" and fuse * communication stalls. * * Note: Though the callers are aware of this and they won't send more * than FUSE_REPLY_IOV_MAX_COUNT iov elements, but to be safe we * cap it here also as the final gatekeeper. */ if (count > FUSE_REPLY_IOV_MAX_COUNT) { AZLogError("[BUG] reply_iov called with count ({}) > {}, " "capping at {}", count, FUSE_REPLY_IOV_MAX_COUNT, FUSE_REPLY_IOV_MAX_COUNT); count = FUSE_REPLY_IOV_MAX_COUNT; assert(0); } const int fre = fuse_reply_iov(get_fuse_req(), iov, count); if (fre != 0) { INC_GBL_STATS(fuse_reply_failed, 1); AZLogError("fuse_reply_iov({}) failed (count={}): {}", fmt::ptr(get_fuse_req()), count, fre); assert(0); } else { DEC_GBL_STATS(fuse_responses_awaited, 1); } free_rpc_task(); } void reply_entry(const struct fuse_entry_param *e) { struct nfs_inode *inode = nullptr; /* * As per fuse on a successful call to fuse_reply_create() the * inode's lookup count must be incremented. We increment the * inode's lookupcnt in get_nfs_inode(), this lookup count will * be transferred to fuse on successful fuse_reply_create() call, * but if that fails then we need to drop the ref. * "ino == 0" implies a failed lookup call, so we don't have a valid * inode number to return. */ if (e->ino != 0) { inode = client->get_nfs_inode_from_ino(e->ino); assert(inode->lookupcnt >= 1); assert(e->generation == inode->get_generation()); /* * This might be an existing inode from inode_map which we didn't * free earlier as it was in use when fuse called forget and then * some other thread looked up the inode. It could be a fresh inode * too. In any case increment forget_expected as we are now letting * fuse know about this inode. */ inode->forget_expected++; assert(inode->lookupcnt >= (uint64_t) inode->forget_expected); } assert((int64_t) e->generation <= get_current_usecs()); const int fre = fuse_reply_entry(get_fuse_req(), e); if (fre != 0) { INC_GBL_STATS(fuse_reply_failed, 1); AZLogError("fuse_reply_entry({}) failed: {}", fmt::ptr(get_fuse_req()), fre); assert(0); if (inode) { /* * Not able to convey to fuse, drop forget_expected count * incremented above. */ assert(inode->forget_expected > 0); inode->forget_expected--; inode->decref(); assert(inode->lookupcnt >= (uint64_t) inode->forget_expected); } } else { DEC_GBL_STATS(fuse_responses_awaited, 1); } free_rpc_task(); } void reply_create( const struct fuse_entry_param *entry, const struct fuse_file_info *file) { // inode number cannot be 0 in a create response(). assert(entry->ino != 0); /* * As per fuse on a successful call to fuse_reply_create() the * inode's lookup count must be incremented. We increment the * inode's lookupcnt in get_nfs_inode(), this lookup count will * be transferred to fuse on successful fuse_reply_create() call, * but if that fails then we need to drop the ref. */ struct nfs_inode *inode = client->get_nfs_inode_from_ino(entry->ino); assert(inode->lookupcnt >= 1); assert(entry->generation == inode->get_generation()); assert((int64_t) entry->generation <= get_current_usecs()); /* * See comment in reply_entry(). */ inode->forget_expected++; assert(inode->lookupcnt >= (uint64_t) inode->forget_expected); /* * nfs_client::reply_entry()->on_fuse_open() must have incremented * the inode opencnt. * Note that it's important to increment opencnt before calling * fuse_reply_create() as once we respond with the inode to fuse, it * may call release for that inode and we have an assert in * aznfsc_ll_release() that opencnt must be non-zero. If we fail to * convey to fuse we decrement the opencnt. */ assert(inode->opencnt > 0); const int fre = fuse_reply_create(get_fuse_req(), entry, file); if (fre != 0) { INC_GBL_STATS(fuse_reply_failed, 1); AZLogError("[{}] fuse_reply_create({}) failed: {}", inode->get_fuse_ino(), fmt::ptr(get_fuse_req()), fre); assert(0); /* * Not able to convey to fuse, drop forget_expected count * incremented above. */ assert(inode->forget_expected > 0); inode->forget_expected--; /* * Drop opencnt incremented in * nfs_client::reply_entry()->on_fuse_open(). */ inode->opencnt--; inode->decref(); assert(inode->lookupcnt >= (uint64_t) inode->forget_expected); } else { DEC_GBL_STATS(fuse_responses_awaited, 1); } free_rpc_task(); } /** * Check RPC and NFS status to find completion status of the RPC task. * Returns 0 if rpc_task succeeded execution at the server, else returns * a +ve errno value. * If user has passed the last argument errstr as non-null, then it'll * additionally store an error string there. */ static int status(int rpc_status, int nfs_status, const char **errstr = nullptr) { if (rpc_status != RPC_STATUS_SUCCESS) { if (errstr) { *errstr = "RPC error"; } /* * TODO: libnfs only returns the following RPC errors * RPC status can indicate access denied error too, * need to support that. */ assert(rpc_status == RPC_STATUS_ERROR || rpc_status == RPC_STATUS_TIMEOUT || rpc_status == RPC_STATUS_CANCEL); // For now just EIO. return EIO; } if (nfs_status == NFS3_OK) { if (errstr) { *errstr = "Success"; } return 0; } if (errstr) { *errstr = nfsstat3_to_str(nfs_status); } return -nfsstat3_to_errno(nfs_status); } void send_readdir_or_readdirplus_response( const std::vector<std::shared_ptr<const directory_entry>>& readdirentries); void get_readdir_entries_from_cache(); void fetch_readdir_entries_from_server(); void fetch_readdirplus_entries_from_server(); void send_read_response(); void read_from_server(struct bytes_chunk &bc); /* * Flush RPC related methods. * Flush supports vectored writes so caller can use add_bc() to add * bytes_chunk to the flush task till the supported wsize. If the bc can be * safely added to the vector it's added to the bytes_chunk queue and * add_bc() returns true, else bc is not added and it returns false. On a * false return the caller must call issue_write_rpc() to dispatch all the * queued bytes_chunks, and add the remaining bytes_chunks to a new flush * task. */ bool add_bc(const bytes_chunk& bc); void issue_write_rpc(); void issue_commit_rpc(); #ifdef ENABLE_NO_FUSE /* * In nofuse mode we re-define these fuse_reply functions to copy the * result into the response buffer (passed by the POSIX API) and notify * the issuer thread. */ int fuse_reply_none(fuse_req_t req); int fuse_reply_iov(fuse_req_t req, const struct iovec *iov, int count); int fuse_reply_err(fuse_req_t req, int err); int fuse_reply_entry(fuse_req_t req, const struct fuse_entry_param *e); int fuse_reply_create(fuse_req_t req, const struct fuse_entry_param *e, const struct fuse_file_info *f); int fuse_reply_attr(fuse_req_t req, const struct stat *attr, double attr_timeout); int fuse_reply_readlink(fuse_req_t req, const char *linkname); int fuse_reply_open(fuse_req_t req, const struct fuse_file_info *f); int fuse_reply_write(fuse_req_t req, size_t count); int fuse_reply_buf(fuse_req_t req, const char *buf, size_t size); int fuse_reply_data(fuse_req_t req, struct fuse_bufvec *bufv, enum fuse_buf_copy_flags flags); size_t fuse_add_direntry_plus(fuse_req_t req, char *buf, size_t bufsize, const char *name, const struct fuse_entry_param *e, off_t off); size_t fuse_add_direntry(fuse_req_t req, char *buf, size_t bufsize, const char *name, const struct stat *stbuf, off_t off); const struct fuse_ctx *fuse_req_ctx(fuse_req_t req); #endif }; class rpc_task_helper { private: // Mutex for synchronizing access to free_task_index stack. std::shared_mutex task_index_lock_41; // Stack containing index into the rpc_task_list vector. std::stack<int> free_task_index; #ifdef ENABLE_PARANOID // Set for catching double free. std::set<int> free_task_index_set; #endif /* * List of RPC tasks which is used to run the task. * Making this a vector of rpc_task* instead of rpc_task saves any * restrictions on the members of rpc_task. With rpc_task being the * element type, it needs to be move constructible, so we cannot have * atomic members f.e. * Anyway these rpc_task once allocated live for the life of the program. */ std::vector<struct rpc_task*> rpc_task_list; // Condition variable to wait for free task index availability. std::condition_variable_any cv; // This is a singleton class, hence make the constructor private. rpc_task_helper(struct nfs_client *client) { assert(client != nullptr); // There should be no elements in the stack. assert(free_task_index.empty()); // Initialize the index stack. for (int i = 0; i < MAX_OUTSTANDING_RPC_TASKS; i++) { free_task_index.push(i); #ifdef ENABLE_PARANOID const auto p = free_task_index_set.insert(i); assert(p.second); assert(free_task_index_set.size() == free_task_index.size()); #endif rpc_task_list.emplace_back(new rpc_task(client, i)); } // There should be MAX_OUTSTANDING_RPC_TASKS index available. assert(free_task_index.size() == MAX_OUTSTANDING_RPC_TASKS); } public: ~rpc_task_helper() { AZLogInfo("~rpc_task_helper() called"); #ifdef ENABLE_PARANOID assert(free_task_index_set.size() == free_task_index.size()); #endif /* * We should be called when there are no outstanding tasks. */ assert(free_task_index.size() == MAX_OUTSTANDING_RPC_TASKS); while (!free_task_index.empty()) { free_task_index.pop(); } for (int i = 0; i < MAX_OUTSTANDING_RPC_TASKS; i++) { assert(rpc_task_list[i]); delete rpc_task_list[i]->rpc_api; delete rpc_task_list[i]; } rpc_task_list.clear(); } static rpc_task_helper *get_instance(struct nfs_client *client = nullptr) { static rpc_task_helper helper(client); return &helper; } /** * This returns a free rpc task instance from the pool of rpc tasks. * This call will block till a free rpc task is available. * * Also see alloc_rpc_task_reserved(). */ struct rpc_task *alloc_rpc_task(fuse_opcode optype, bool use_reserved = false) { // get_free_idx() can block, collect start time before that. const uint64_t start_usec = get_current_usecs(); const int free_index = get_free_idx(use_reserved); struct rpc_task *task = rpc_task_list[free_index]; assert(task->magic == RPC_TASK_MAGIC); assert(task->client != nullptr); assert(task->index == free_index); // Every rpc_task starts as sync. assert(!task->is_async()); /* * Only first time around rpc_api will be null for a rpc_task, after * that it can be null only if the task failed with a JUKEBOX error in * which case the rpc_api would have been xferred to jukebox_seedinfo * by nfs_client::jukebox_retry(). */ if (!task->rpc_api) { task->rpc_api = new api_task_info(); } task->set_op_type(optype); task->stats.on_rpc_create(optype, start_usec); // No task starts as a child task. assert(task->rpc_api->parent_task == nullptr); #ifndef ENABLE_NON_AZURE_NFS assert(task->client->mnt_options.nfs_port == 2047 || task->client->mnt_options.nfs_port == 2048); #endif /* * Set the default connection scheduling type based on the NFS port * used. Later init_*() method can set it to a more appropriate value. */ task->csched = (task->client->mnt_options.nfs_port == 2047) ? CONN_SCHED_RR_W : CONN_SCHED_FH_HASH; #ifdef ENABLE_PARANOID task->issuing_tid = ::gettid(); #endif // New task must have these as 0. assert(task->num_ongoing_backend_writes == 0); assert(task->num_ongoing_backend_reads == 0); INC_GBL_STATS(rpc_tasks_allocated, 1); return task; } /** * Use this when allocating rpc_task in a libnfs callback context. * This will avoid blocking the caller by reaching into the reserved pool * if the regular pool of rpc_task is exhausted. * Note that it's important to not block libnfs threads as they help * complete RPC requests and thus free up rpc_task structures. */ struct rpc_task *alloc_rpc_task_reserved(fuse_opcode optype) { return alloc_rpc_task(optype, true /* use_reserved */); } int get_free_idx(bool use_reserved = false) { /* * If caller is special allow them to eat into the reserved pool * of tasks. We should keep as many tasks in the reserved pool as * there are libnfs threads, so that in the worst case if every libnfs * callback allocates a task they don't have to block. * Don't allow more than 25% of total tasks as reserved tasks. */ static const size_t RESERVED_TASKS = 1000; static_assert(RESERVED_TASKS < MAX_OUTSTANDING_RPC_TASKS / 4); const size_t spare_count = use_reserved ? 0 : RESERVED_TASKS; std::unique_lock<std::shared_mutex> lock(task_index_lock_41); // Wait until a free rpc task is available. while (free_task_index.size() <= spare_count) { #ifdef ENABLE_PARANOID assert(free_task_index_set.size() == free_task_index.size()); #endif if (!cv.wait_for(lock, std::chrono::seconds(30), [this, spare_count] { return free_task_index.size() > spare_count; })) { AZLogError("Timed out waiting for free rpc_task ({}), " "re-trying!", free_task_index.size()); } } const int free_index = free_task_index.top(); free_task_index.pop(); #ifdef ENABLE_PARANOID // Must also be free as per free_task_index_set. const size_t cnt = free_task_index_set.erase(free_index); assert(cnt == 1); assert(free_task_index_set.size() == free_task_index.size()); #endif // Must be a valid index. assert(free_index >= 0 && free_index < MAX_OUTSTANDING_RPC_TASKS); return free_index; } void release_free_index(int index) { // Must be a valid index. assert(index >= 0 && index < MAX_OUTSTANDING_RPC_TASKS); { std::unique_lock<std::shared_mutex> lock(task_index_lock_41); #ifdef ENABLE_PARANOID assert(free_task_index_set.size() == free_task_index.size()); // Must not already be free. const auto p = free_task_index_set.insert(index); assert(p.second); #endif free_task_index.push(index); } // Notify any waiters blocked in alloc_rpc_task(). cv.notify_one(); } void free_rpc_task(struct rpc_task *task) { assert(task->magic == RPC_TASK_MAGIC); task->is_async_task = false; release_free_index(task->get_index()); DEC_GBL_STATS(rpc_tasks_allocated, 1); } }; /** * Seed info needed to re-run a task that had failed with JUKEBOX error. */ struct jukebox_seedinfo { ~jukebox_seedinfo() { /* * This will also call api_task_info::release(). */ rpc_api->release(); delete rpc_api; } /* * Information needed to restart the task. */ api_task_info *rpc_api; /* * When to rerun the task. */ int64_t run_at_msecs; jukebox_seedinfo(api_task_info *_rpc_api) : rpc_api(_rpc_api), run_at_msecs(get_current_msecs() + JUKEBOX_DELAY_SECS*1000) { assert(rpc_api != nullptr); } }; #endif /*__RPC_TASK_H__*/