turbonfs/inc/rpc_stats.h (190 lines of code) (raw):

#ifndef __RPCSTATS_H__ #define __RPCSTATS_H__ #include <atomic> #include <mutex> #include "aznfsc.h" #include "libnfs-raw.h" namespace aznfsc { /* * This should not match any valid nfsstat3 enum value. * This special value is used to convey "RPC error" to on_rpc_complete(). */ static const int NFS3ERR_RPC_ERROR = 999999; #define NFS_STATUS(r) ((r) ? (r)->status : NFS3ERR_SERVERFAULT) #define NFS_STATUSX(rpc_status, r) (((rpc_status) == RPC_STATUS_SUCCESS) ? NFS_STATUS(r) : (nfsstat3) NFS3ERR_RPC_ERROR) /** * Stats for a specific RPC (actually FUSE_*) type. */ struct rpc_opstat { /* * How many RPCs of this type completed? */ std::atomic<uint64_t> count = 0; /* * How many RPCs of this type are issued to libnfs and awaiting completion * callback. */ std::atomic<uint64_t> pending = 0; /* * Cumulative request bytes. * This includes the RPC header and payload bytes. */ std::atomic<uint64_t> bytes_sent = 0; /* * Cumulative response bytes. * This includes the RPC header and payload bytes. */ std::atomic<uint64_t> bytes_rcvd = 0; /* * Cumulative time taken by the server. */ std::atomic<uint64_t> rtt_usec = 0; /* * Cumulative time spent waiting to be dispatched over the socket, * in libnfs. */ std::atomic<uint64_t> dispatch_usec = 0; /* * Cumulative time taken for request processing. * This includes times taken by server and any other delay on the client. * Most prominent client delays include: * - Time waiting for a free RPC task to be available. * - Scheduling delays causing delay in sending the request and processing * the response. */ std::atomic<uint64_t> total_usec = 0; /* * Cumulative time spent by the fuse request handler from the time fuse * called our registered handler and till we returned from the handler. * Note that this is the time fuse thread was occupied, and not the time * taken to serve the request, the fuse req will mostly be completed later * asynchronously. * If this time is high that would mean handler for the specific request * type is blocking fuse threads and this would mean increased latency for * other requests as fuse has limited threads. */ std::atomic<uint64_t> fuse_handler_usec = 0; /* * Error map to store all the errors encountered by the given api. * This is guarded by rpc_stats_az::stats_lock_42. */ std::map<int /*error status*/, uint64_t /*error count*/> error_map; }; /** * Class for maintaining RPC stats. * An object of this must be included in rpc_task and user must call designated * event handler methods at appropriate times in the life of the RPC task * processing. */ class rpc_stats_az { friend struct fuse_req_stats; public: rpc_stats_az() = default; /** * Event handler method to be called right after the RPC is created. * start_usec is the time when the fuse request handler was called. * It can be different from create time if new RPC creation had to wait * as we may have run out of RPC slots. */ void on_rpc_create(enum fuse_opcode _optype, uint64_t start_usec) { // 0 is not a valid fuse_opcode; assert(_optype > 0 && _optype <= FUSE_OPCODE_MAX); /* * FUSE_RELEASE is sent as FUSE_FLUSH. */ assert(_optype != FUSE_RELEASE); optype = _optype; stamp.start = start_usec; stamp.create = get_current_usecs(); stamp.issue = 0; stamp.dispatch = 0; stamp.complete = 0; stamp.free = 0; req_size = 0; resp_size = 0; assert(stamp.create >= stamp.start); } /** * Event handler method to be called when the RPC request is issued, i.e., * when the libnfs async method is called. * Call it just before the libnfs async method. If the async method fails * to issue, call on_rpc_cancel() to undo what this does. * * Note: Callback can get called before the async method returns to the * caller. */ void on_rpc_issue() { assert(stamp.issue == 0); stamp.issue = get_current_usecs(); assert(stamp.issue >= stamp.create); assert(optype > 0 && optype <= FUSE_OPCODE_MAX); opstats[optype].pending++; } /** * Call this to undo the effects of on_rpc_issue() if the async request * fails to issue for some reason. */ void on_rpc_cancel() { assert(stamp.issue != 0); assert((int64_t) stamp.issue <= get_current_usecs()); assert(stamp.dispatch == 0); assert(stamp.complete == 0); stamp.issue = 0; assert(optype > 0 && optype <= FUSE_OPCODE_MAX); assert(opstats[optype].pending > 0); opstats[optype].pending--; } /** * Event handler method to be called when the RPC completes, i.e., when * the libnfs callback is called. * This MUST be called *only* from the libnfs callback, since only there * the pdu is valid and we can call the rpc_pdu_get_*() methods. */ void on_rpc_complete(struct rpc_pdu *pdu, nfsstat3 status) { assert((status == NFS3ERR_RPC_ERROR) || (nfsstat3_to_errno(status) != -ERANGE)); req_size = rpc_pdu_get_req_size(pdu); // 40 is the size of NFS NULL RPC request. assert(req_size >= 40); resp_size = rpc_pdu_get_resp_size(pdu); // 24 is the size of NFS NULL RPC response. assert(resp_size >= 24); assert(stamp.dispatch == 0); assert(stamp.issue != 0); stamp.dispatch = rpc_pdu_get_dispatch_usecs(pdu); assert(stamp.dispatch >= stamp.issue); stamp.complete = get_current_usecs(); assert(stamp.complete > stamp.dispatch); assert(optype > 0 && optype <= FUSE_OPCODE_MAX); assert(opstats[optype].pending > 0); opstats[optype].pending--; if (status != NFS3_OK) { /* * This thread will block till it obtains the lock. * This can result in delayed response to the fuse as * on_rpc_complete will be called before sending response to fuse. * This should be okay as this will happen only in error state. */ std::unique_lock<std::mutex> _lock(stats_lock_42); auto result = opstats[optype].error_map.emplace(status, 1); if (!result.second) { // If the key already exists, increment the error count. ++(result.first->second); } } } /** * Event handler method to be called right before the RPC is freed. */ void on_rpc_free() { /* * stamp.issue won't be set for requests which were not sent to * the server. Most likely reason is that the request was served from * the cache. * stamp.complete won't be set (while stamp.issue is set) for * requests which don't get a response. Even those we don't count for * stats. */ if (stamp.issue != 0 && stamp.complete != 0) { assert(stamp.complete > stamp.dispatch); assert(stamp.dispatch >= stamp.issue); stamp.free = get_current_usecs(); assert(optype > 0 && optype <= FUSE_OPCODE_MAX); opstats[optype].count++; opstats[optype].bytes_sent += req_size; opstats[optype].bytes_rcvd += resp_size; opstats[optype].rtt_usec += (stamp.complete - stamp.dispatch); opstats[optype].dispatch_usec += (stamp.dispatch - stamp.issue); opstats[optype].total_usec += (stamp.complete - stamp.start); } else if (stamp.issue == 0) { /* * Requests not issued. * See skip_mtime_update() for how we can come here for * FUSE_SETATTR. * * Note: FUSE_FLUSH is never issued as an RPC to the server, * so all FUSE_FLUSH tasks come here. */ assert(stamp.dispatch == 0); assert(stamp.complete == 0); assert(optype == FUSE_READDIR || optype == FUSE_READDIRPLUS || optype == FUSE_READ || optype == FUSE_WRITE || optype == FUSE_FLUSH || optype == FUSE_GETATTR || optype == FUSE_LOOKUP || optype == FUSE_SETATTR); } else { /* * Requests issued but not completed. */ assert(stamp.issue != 0); assert(stamp.dispatch == 0); assert(stamp.complete == 0); AZLogWarn("Didn't get response for RPC request type {}", (int) optype); } } /** * TODO: See if we need to track retries. */ void on_rpc_retry(int num_retries); /** * Dump the cumulative stats collected till now. * Note that it tries to mimic the o/p of the Linux mountstats(8) command * for better readability. */ static void dump_stats(); private: enum fuse_opcode optype = (fuse_opcode) 0; size_t req_size = 0; size_t resp_size = 0; /* * Timestamp in microseconds for various stages of the RPC. * * start: When the alloc_rpc_task() was called. * create: When the rpc_task was actually created. * Note that if we run out of rpc_tasks then alloc_rpc_task() * will have to wait for some ongoing RPC to complete. * issue: When the libnfs async method is called. * dispatch: When libnfs successfully writes the complete request along * with data (if any) over the socket. * Note that due to TCP connection b/w and sndbuf size, this * may be much later than the issue time. * (dispatch - issue) is the time the request was queued in * libnfs waiting to be dispatched over the socket. * complete: When the libnfs async method completes and the callback is * called. (complete - dispatch) is the time taken by the * server to process the RPC. * free: When free_rpc_task() was called. */ struct { uint64_t start = 0; uint64_t create = 0; uint64_t issue = 0; uint64_t dispatch = 0; uint64_t complete = 0; uint64_t free = 0; } stamp; /* * Aggregated per-RPC-type stats, for all RPCs issued of a given type. */ static struct rpc_opstat opstats[FUSE_OPCODE_MAX + 1]; /* * Lock for synchronizing dumping stats and for inserting into error_map. */ static std::mutex stats_lock_42; public: /* * Misc global stats. * app_read_reqs: Total fuse read requests. * server_read_reqs: Total read requests we sent to the server. * failed_read_reqs: How many of app_read_reqs we responded with a failed * status. * zero_reads: How many of app_read_reqs we responded with 0 bytes, either * application requested 0 bytes or it was beyond eof. * app_bytes_read: Total bytes read by application. * server_bytes_read: Total bytes that we actually read from * server. Every byte read by application must come from server, * unless application is reading a hole in the cache (see * bytes_zeroed_from_cache). If we are reading more bytes from the * server than what application requested it may mean that we are * reading ahead data but are having to drop it before application * could read it, mostly due to memory pressure (though we try very * hard not to drop read ahead data). * bytes_read_from_cache: How many bytes were read from the cache. * This will indicate our readahead effectiveness. * bytes_zeroed_from_cache: How many bytes were read from unmapped parts * of the cache and hence were zero filled. * num_readhead: Number of readahead calls made. * bytes_read_ahead: How many bytes were read ahead using num_readhead * calls. * tot_getattr_reqs: How many getattr requests were received from fuse. * getattr_served_from_cache: How many were served from inode->attr cache. * tot_lookup_reqs: How many lookup requests were received from fuse. * lookup_served_from_cache: How many were served from dnlc/lookup cache. * app_write_reqs: Total fuse writes requests. * failed_write_reqs: How many of app_write_reqs we responded with a failed * status. * app_bytes_written: Total bytes written by application. * writes_np: How many writes did not see any kind of cache pressure. * inline_writes: How many of app_write_reqs have to be held for inline * write due to cache pressure. * inline_writes_lp: How many of inline_writes were due to local/per-file * cache pressure. * inline_writes_gp: How many of inline_writes were due to global * cache pressure. * flush_seq: How many time flush was issued as we had enough sequential * data to write. * flush_lp: How many time flush was issued as local/per-file cache grew * beyond configured limit. * flush_gp: How many time flush was issued as global cache grew beyond * configured limit. * commit_lp: How many time commit was issued as local/per-file cache grew * beyond configured limit. * commit_gp: How many time commit was issued as global cache grew beyond * configured limit. * num_sync_membufs: How many times sync_membufs() was called? * tot_bytes_sync_membufs: Total bytes flushed by sync_membufs(). */ static std::atomic<uint64_t> app_read_reqs; static std::atomic<uint64_t> server_read_reqs; static std::atomic<uint64_t> failed_read_reqs; static std::atomic<uint64_t> zero_reads; static std::atomic<uint64_t> app_bytes_read; static std::atomic<uint64_t> server_bytes_read; static std::atomic<uint64_t> bytes_read_from_cache; static std::atomic<uint64_t> bytes_zeroed_from_cache; static std::atomic<uint64_t> bytes_read_ahead; static std::atomic<uint64_t> num_readhead; static std::atomic<uint64_t> tot_getattr_reqs; static std::atomic<uint64_t> getattr_served_from_cache; static std::atomic<uint64_t> tot_lookup_reqs; static std::atomic<uint64_t> lookup_served_from_cache; static std::atomic<uint64_t> app_write_reqs; static std::atomic<uint64_t> server_write_reqs; static std::atomic<uint64_t> failed_write_reqs; static std::atomic<uint64_t> app_bytes_written; static std::atomic<uint64_t> server_bytes_written; static std::atomic<uint64_t> writes_np; static std::atomic<uint64_t> inline_writes; static std::atomic<uint64_t> inline_writes_lp; static std::atomic<uint64_t> inline_writes_gp; static std::atomic<uint64_t> flush_seq; static std::atomic<uint64_t> flush_lp; static std::atomic<uint64_t> flush_gp; static std::atomic<uint64_t> commit_lp; static std::atomic<uint64_t> commit_gp; static std::atomic<uint64_t> num_sync_membufs; static std::atomic<uint64_t> tot_bytes_sync_membufs; static std::atomic<uint64_t> rpc_tasks_allocated; static std::atomic<uint64_t> fuse_responses_awaited; static std::atomic<uint64_t> fuse_reply_failed; }; #define INC_GBL_STATS(var, inc) rpc_stats_az::var += (inc) #define DEC_GBL_STATS(var, dec) {assert(rpc_stats_az::var >= dec); rpc_stats_az::var -= (dec);} #define GET_GBL_STATS(var) rpc_stats_az::var.load() struct fuse_req_stats { fuse_req_stats(enum fuse_opcode _optype) : optype(_optype), issue(get_current_usecs()) { } ~fuse_req_stats() { const uint64_t handler_usec = get_current_usecs() - issue; assert((int64_t) handler_usec >= 0); rpc_stats_az::opstats[optype].fuse_handler_usec += handler_usec; } const enum fuse_opcode optype = (fuse_opcode) 0; const uint64_t issue; }; #define FUSE_STATS_TRACKER(optype) struct fuse_req_stats _frs(optype) } #endif /* __RPCSTATS_H__ */