in turbonfs/src/readahead.cpp [498:701]
int ra_state::issue_readaheads()
{
int64_t ra_offset;
/*
* issue_readaheads() MUST only be called for open files which will have
* the file cache initialized.
*/
assert(inode->has_filecache());
auto& read_cache = inode->get_filecache();
int ra_issued = 0;
/*
* If userspace data cache is disabled, don't do readaheads.
*/
if (!aznfsc_cfg.cache.data.user.enable) {
return 0;
}
/*
* Issue all readaheads allowed by this ra_state.
*/
while ((ra_offset = get_next_ra()) > 0) {
AZLogDebug("[{}] Issuing readahead at off: {} len: {}: ongoing: {} "
"sfsize: {} cfsize: {} csfsize: {} ({})",
inode->get_fuse_ino(), ra_offset, def_ra_size,
ra_ongoing.load(),
inode->get_server_file_size(),
inode->get_client_file_size(),
inode->get_cached_filesize(),
get_ra_bytes());
/*
* Get bytes_chunk representing the byte range we want to readahead
* and issue READ RPCs for all.
*/
std::vector<bytes_chunk> bcv = read_cache->get(ra_offset, def_ra_size);
for (bytes_chunk& bc : bcv) {
// Every bytes_chunk must lie within the readahead.
assert(bc.offset >= (uint64_t) ra_offset);
assert((bc.offset + bc.length) <= (ra_offset + def_ra_size));
// get() must grab the inuse count.
assert(bc.get_membuf()->is_inuse());
/*
* Before we issue READ to populate the bytes_chunk, take the
* membuf lock. We use try_lock() and skip readahead if we don't
* get the lock. It's ok to skip readahead rather than holding the
* caller. Mostly if there is a single reader we will get the lock.
* This lock will be released in the readahead_callback() after the
* buffer is populated.
* Note that if the membuf is already locked it means some other
* context is already performing IO to it. We should not release
* the buffer.
*/
if (!bc.get_membuf()->try_lock()) {
AZLogWarnNR("[{}] Skipping readahead at off: {} len: {}. "
"Could not get membuf lock!",
inode->get_fuse_ino(), bc.offset, bc.length);
on_readahead_complete(bc.offset, bc.length);
bc.get_membuf()->clear_inuse();
continue;
}
/*
* If the buffer is already uptodate, skip readahead.
*/
if (bc.get_membuf()->is_uptodate()) {
AZLogDebug("[{}] Skipping readahead at off: {} len: {}. "
"Membuf already uptodate!",
inode->get_fuse_ino(), bc.offset, bc.length);
on_readahead_complete(bc.offset, bc.length);
bc.get_membuf()->clear_locked();
bc.get_membuf()->clear_inuse();
continue;
}
/*
* Ok, now issue READ RPCs to read this byte range.
*/
struct rpc_task *tsk =
client->get_rpc_task_helper()->alloc_rpc_task(FUSE_READ);
/*
* fuse_req is needed to send the fuse response, since we don't
* need to send response for readahead reads, it can be null.
* fuse_file_info is not used too.
*/
tsk->init_read_be(inode->get_fuse_ino(), /* ino */
bc.length, /* size */
bc.offset); /* offset */
// No reads should be issued to backend at this point.
assert(bc.num_backend_calls_issued == 0);
bc.num_backend_calls_issued++;
assert(bc.pvt == 0);
/*
* bc holds a ref on the membuf so we can safely access membuf
* only till we have bc in the scope. In readahead_callback() we
* need to access bc, hence we transfer ownership to the ra_context
* object allocated below.
*/
struct ra_context *ctx = new ra_context(tsk, bc);
assert(ctx->bc.num_backend_calls_issued == 1);
READ3args args;
::memset(&args, 0, sizeof(args));
args.file = inode->get_fh();
args.offset = bc.offset;
args.count = bc.length;
/*
* Grab a ref on this inode so that it is not freed when the
* readahead reads are going on. Since the fuse layer does not
* know of this readahead operation, it is possible that the fuse
* may release this inode soon after the application read returns.
* We do not want to be in that state and hence grab an extra ref
* on this inode.
* This should be decremented in readahead_callback()
*/
inode->incref();
AZLogDebug("[{}] Issuing readahead read to backend at "
"off: {} len: {}",
inode->get_fuse_ino(),
args.offset,
args.count);
/*
* tsk->get_rpc_ctx() call below will round robin readahead
* requests across all available connections.
*
* TODO: See if issuing a batch of reads over one connection
* before moving to the other connection helps.
*/
tsk->get_stats().on_rpc_issue();
if (rpc_nfs3_read_task(
tsk->get_rpc_ctx(),
readahead_callback,
bc.get_buffer(),
bc.length,
&args,
ctx) == NULL) {
tsk->get_stats().on_rpc_cancel();
/*
* This call failed due to internal issues like OOM etc
* and not due to an actual RPC/NFS error, anyways pretend
* as if we never issued this.
*/
AZLogWarn("[{}] Skipping readahead at off: {} len: {}. "
"rpc_nfs3_read_task() failed!",
inode->get_fuse_ino(), args.offset, args.count);
on_readahead_complete(bc.offset, bc.length);
bc.get_membuf()->clear_locked();
bc.get_membuf()->clear_inuse();
// Release the buffer since we did not fill it.
read_cache->release(bc.offset, bc.length);
tsk->free_rpc_task();
delete ctx;
// Decrement the extra ref that was taken.
inode->decref();
continue;
}
ra_issued++;
AZLogDebug("[{}] rpc_nfs3_read_task() successfully dispatched "
"#{} readahead at off: {} len: {}. ",
inode->get_fuse_ino(),
ra_issued,
args.offset,
args.count);
}
}
if (ra_issued == 0) {
static std::atomic<uint64_t> num_no_readahead;
// Log once every 1000 failed calls.
if ((++num_no_readahead % 1000) == 0) {
AZLogDebug("[{}] num_no_readahead={}, reason={}",
inode->get_fuse_ino(),
num_no_readahead.load(), ra_offset);
}
} else {
INC_GBL_STATS(num_readhead, 1);
}
return ra_issued;
}