in turbonfs/src/rpc_task.cpp [3382:3776]
void rpc_task::run_read()
{
const fuse_ino_t ino = rpc_api->read_task.get_ino();
struct nfs_inode *inode = get_client()->get_nfs_inode_from_ino(ino);
int64_t cfsize, sfsize;
// run_read() must be called only for an fe task.
assert(rpc_api->read_task.is_fe());
assert(inode->is_regfile());
/*
* aznfsc_ll_read() can only be called after aznfsc_ll_open() so filecache
* and readahead state must have been allocated when we reach here.
*/
assert(inode->has_filecache());
assert(inode->has_rastate());
std::shared_ptr<bytes_chunk_cache>& filecache_handle =
inode->get_filecache();
/*
* run_read() is called once for a fuse read request and must not be
* called for a child task.
*/
assert(rpc_api->parent_task == nullptr);
/*
* Get server and effective file size estimates.
* We use these to find holes that we should zero fill.
*/
inode->get_file_sizes(cfsize, sfsize);
/*
* get_client_file_size() returns a fairly recent estimate of the file
* size taking into account cto consistency, attribute cache timeout, etc,
* so it's a size we can "trust". If READ offset is more than this size
* then we can avoid sending the call to the server and generate eof
* locally. This is an optimization that saves an extra READ call to the
* server. The server will correctly return 0+eof for this READ call so we
* are functionally correct, barring the small possibility that the file
* can grow and if we send the READ to the server we will get the new data,
* but this is permissible with our weak/cto cache consistency guarantees.
*
* TODO: Shall we put it behind a config?
*/
if (cfsize != -1) {
if (rpc_api->read_task.get_offset() >= cfsize) {
/*
* Entire request lies beyond cfsize.
*/
AZLogDebug("[{}] Read returning 0 bytes (eof) as requested "
"offset ({}) >= file size ({})",
ino, rpc_api->read_task.get_offset(), cfsize);
INC_GBL_STATS(zero_reads, 1);
reply_iov(nullptr, 0);
return;
} else if ((rpc_api->read_task.get_offset() +
rpc_api->read_task.get_size()) > (uint64_t) cfsize) {
/*
* Request extends beyond cfsize, trim it so that we don't have
* to worry about excluding it later.
*/
const uint64_t trim_len =
(rpc_api->read_task.get_offset() +
rpc_api->read_task.get_size()) - cfsize;
const uint64_t trimmed_size =
rpc_api->read_task.get_size() - trim_len;
assert(trimmed_size < rpc_api->read_task.get_size());
AZLogDebug("[{}] Trimming application read beyond cfsize: {} "
"({} -> {})",
ino, cfsize, rpc_api->read_task.get_size(), trimmed_size);
rpc_api->read_task.set_size(trimmed_size);
}
}
/*
* Get bytes_chunks covering the region caller wants to read.
* The bytes_chunks returned could be any mix of old (already cached) or
* new (cache allocated but yet to be read from blob). Note that reads
* don't need any special protection. The caller just wants to read the
* current contents of the blob, these can change immediately after or
* even while we are reading, resulting in any mix of old or new data.
*/
assert(bc_vec.empty());
bc_vec = filecache_handle->get(
rpc_api->read_task.get_offset(),
rpc_api->read_task.get_size());
/*
* inode->in_ra_window() (called from inline_prune()) considers anything
* before max_byte_read as non-useful and can purge that from the cache.
* Now that we have called get() we will have an inuse count on any
* membuf(s) used by this read, so they won't be purged even if
* in_ra_window() suggests that, so we can safely update max_byte_read now.
* We don't want to wait for read_callback() to update max_byte_read, as
* that would mean many readers may be waiting to be read and we won't
* perform sufficient readahead.
*/
inode->get_rastate()->on_application_read(
rpc_api->read_task.get_offset(),
rpc_api->read_task.get_size());
/*
* send_read_response() will later convey this read completion to fuse
* using fuse_reply_iov() which can send max FUSE_REPLY_IOV_MAX_COUNT
* vector elements.
*/
const size_t size = std::min((int) bc_vec.size(), FUSE_REPLY_IOV_MAX_COUNT);
assert(size > 0 && size <= FUSE_REPLY_IOV_MAX_COUNT);
// There should not be any reads running for this RPC task initially.
assert(num_ongoing_backend_reads == 0);
AZLogDebug("[{}] run_read: offset {}, size: {}, chunks: {}{}, "
"sfsize: {}, cfsize: {}, csfsize: {}",
ino,
rpc_api->read_task.get_offset(),
rpc_api->read_task.get_size(),
bc_vec.size(),
size != bc_vec.size() ? " (capped at 1023)" : "",
sfsize, cfsize, inode->get_cached_filesize());
/*
* Now go through the byte chunk vector to see if the chunks are
* uptodate. Uptodate chunks already have the data we need, while non
* uptodate chunks need to be populated with data. These will mostly be
* READ from the server, with certain exceptions. See below.
* Chunks that are READ from the server are issued in parallel. Once all
* chunks are uptodate we can complete the read to the caller.
*
* Here are the rules that we follow to decide if a non-uptodate chunk
* must be READ from the server.
* - (bc.offset + bc.length) > get_client_file_size()
* These MUST NOT occur as we trim the read request before making the
* bytes_chunk_cache::get() call. We assert for these.
* - bc.offset < get_server_file_size()
* Part or whole of the bc need to be read from the server, so we MUST
* issue READ to the server for such bcs. If part of the bc lies after
* the server file size, read_callback() will fill 0s for that part.
* This is correct as that part corresponds to hole in our cache.
* - bc.offset >= get_server_file_size()
* Server does not know about these so we MUST NOT ask the server about
* these. Given that the bc is non-uptodate this means our cache also
* doesn't have it, which means these correspond to holes in our cache
* and we MUST return 0s for these bcs.
*
* Note that we bump num_ongoing_backend_reads by 1 before issuing
* the first backend read. This is done to make sure if read_callback()
* is called before we could issues all reads, we don't mistake it for
* "all issued reads have completed". It is ok to update this without a lock
* since this is the only thread at this point which will access this.
*
* Note: Membufs which are found uptodate here shouldn't suddenly become
* non-uptodate when the other reads complete, o/w we have a problem.
* An uptodate membuf doesn't become non-uptodate but it can be
* written by some writer thread, while we are waiting for other
* chunk(s) to be read from backend or even while we are reading
* them while sending the READ response.
*/
[[maybe_unused]] size_t total_length = 0;
bool found_in_cache = true;
num_ongoing_backend_reads = 1;
for (size_t i = 0; i < bc_vec.size(); i++) {
/*
* Every bytes_chunk returned by get() must have its inuse count
* bumped. Also they must have pvt set to the initial value of 0
* and num_backend_calls_issued set to initial value of 0.
*/
assert(bc_vec[i].get_membuf()->is_inuse());
assert(bc_vec[i].pvt == 0);
assert(bc_vec[i].num_backend_calls_issued == 0);
// We trim the read size to not exceed cfsize.
assert((cfsize == -1) ||
((int64_t) (bc_vec[i].offset + bc_vec[i].length) <= cfsize));
total_length += bc_vec[i].length;
if (i >= size) {
AZLogDebug("[{}] Skipping read beyond vector count {}, "
"offset: {}, length: {}",
ino, size,
bc_vec[i].offset,
bc_vec[i].length);
assert(size == FUSE_REPLY_IOV_MAX_COUNT);
bc_vec[i].get_membuf()->clear_inuse();
continue;
}
if (!bc_vec[i].get_membuf()->is_uptodate()) {
/*
* Now we are going to call read_from_server() which will issue
* an NFS read that will read the data from the NFS server and
* update the buffer. Grab the membuf lock, this will be unlocked
* in read_callback() once the data has been read into the buffer
* and it's marked uptodate.
*
* Note: This will block till the lock is obtained.
*/
bc_vec[i].get_membuf()->set_locked();
// Check if the buffer got updated by the time we got the lock.
if (bc_vec[i].get_membuf()->is_uptodate()) {
/*
* Release the lock since we no longer intend on writing
* to this buffer.
*/
bc_vec[i].get_membuf()->clear_locked();
bc_vec[i].get_membuf()->clear_inuse();
/*
* Set "bytes read" to "bytes requested" since the data is read
* from the cache.
*/
bc_vec[i].pvt = bc_vec[i].length;
INC_GBL_STATS(bytes_read_from_cache, bc_vec[i].length);
AZLogDebug("[{}] Data read from cache. offset: {}, length: {}",
ino, bc_vec[i].offset, bc_vec[i].length);
#ifdef RELEASE_CHUNK_AFTER_APPLICATION_READ
/*
* Since the data is read from the cache, the chances of reading
* it again from cache is negligible since this is a sequential
* read pattern.
* Free such chunks to reduce the memory utilization.
*/
filecache_handle->release(bc_vec[i].offset, bc_vec[i].length);
#endif
continue;
}
/*
* Ok, non-uptodate buffer, see if we should read from server or
* return 0s. We read from server only if at least one byte from
* the bc can be served from the server, else it's the case of
* unmapped chunk within the cached portion (case of sparse cache
* due to sparse writes beyond file size) which should correspond
* with holes aka 0s.
*/
const bool read_from_server =
((sfsize == -1) || (int64_t) bc_vec[i].offset < sfsize);
if (!read_from_server) {
AZLogDebug("[{}] Hole in cache. offset: {}, length: {}",
ino, bc_vec[i].offset, bc_vec[i].length);
/*
* Set "bytes read" to "bytes requested" since we provide all
* the data as 0s.
*/
bc_vec[i].pvt = bc_vec[i].length;
::memset(bc_vec[i].get_buffer(), 0, bc_vec[i].length);
bc_vec[i].get_membuf()->set_uptodate();
bc_vec[i].get_membuf()->clear_locked();
bc_vec[i].get_membuf()->clear_inuse();
INC_GBL_STATS(bytes_zeroed_from_cache, bc_vec[i].length);
#ifdef RELEASE_CHUNK_AFTER_APPLICATION_READ
filecache_handle->release(bc_vec[i].offset, bc_vec[i].length);
#endif
continue;
}
found_in_cache = false;
/*
* TODO: If we have just 1 bytes_chunk to fill, which is the most
* common case, avoid creating child task and process
* everything in this same task.
* Also for contiguous reads use the libnfs vectored read API.
*/
/*
* Create a child rpc task to issue the read RPC to the backend.
*/
struct rpc_task *child_tsk =
get_client()->get_rpc_task_helper()->alloc_rpc_task_reserved(FUSE_READ);
child_tsk->init_read_be(
rpc_api->read_task.get_ino(),
bc_vec[i].length,
bc_vec[i].offset);
// Set the parent task of the child to the current RPC task.
child_tsk->rpc_api->parent_task = this;
/*
* Set "bytes read" to 0 and this will be updated as data is read,
* likely in partial read calls. So at any time bc.pvt will be the
* total data read.
*/
bc_vec[i].pvt = 0;
// Set the byte chunk that this child task is incharge of updating.
child_tsk->rpc_api->bc = &bc_vec[i];
/*
* Child task should always read a subset of the parent task.
*/
assert(child_tsk->rpc_api->read_task.get_offset() >=
rpc_api->read_task.get_offset());
assert(child_tsk->rpc_api->read_task.get_size() <=
rpc_api->read_task.get_size());
child_tsk->read_from_server(bc_vec[i]);
} else {
AZLogDebug("[{}] Data read from cache. offset: {}, length: {}",
ino, bc_vec[i].offset, bc_vec[i].length);
bc_vec[i].get_membuf()->clear_inuse();
/*
* Set "bytes read" to "bytes requested" since the data is read
* from the cache.
*/
bc_vec[i].pvt = bc_vec[i].length;
INC_GBL_STATS(bytes_read_from_cache, bc_vec[i].length);
#ifdef RELEASE_CHUNK_AFTER_APPLICATION_READ
/*
* Data read from cache. For the most common sequential read
* pattern this cached data won't be needed again, release
* it promptly to ease memory pressure.
* Note that this is just a suggestion to release the buffer.
* The buffer may not be released if it's in use by any other
* user.
*/
const uint64_t released =
filecache_handle->release(bc_vec[i].offset, bc_vec[i].length);
assert(released <= bc_vec[i].length);
/*
* If we could not release this buffer, try to release all cached
* data upto release_till, which is a safe offset returned by the
* readahead state machine.
*/
if (released != bc_vec[i].length) {
const uint64_t release_till =
inode->get_rastate()->release_till();
if (release_till != 0) {
filecache_handle->release(0, release_till);
}
}
#endif
}
}
if (bc_vec.size() > FUSE_REPLY_IOV_MAX_COUNT) {
bc_vec.resize(FUSE_REPLY_IOV_MAX_COUNT);
}
// get() must return bytes_chunks exactly covering the requested range.
assert(total_length == rpc_api->read_task.get_size());
/*
* Decrement the read ref incremented above.
* Each completing child task will also update the parent task's
* num_ongoing_backend_reads, so we check for that.
*/
assert(num_ongoing_backend_reads >= 1);
if (--num_ongoing_backend_reads != 0) {
assert(!found_in_cache);
/*
* Not all backend reads have completed yet. When the last backend
* read completes read_callback() will arrange to send the read
* response to fuse.
* This is the more common case as backend READs will take time to
* complete.
*/
return;
}
/*
* Either no chunk needed backend read (likely) or all backend reads issued
* above completed (unlikely).
*/
if (found_in_cache) {
AZLogDebug("[{}] Data read from cache, offset: {}, size: {}",
ino,
rpc_api->read_task.get_offset(),
rpc_api->read_task.get_size());
}
// Send the response.
send_read_response();
}