in turbonfs/src/readahead.cpp [122:422]
static void readahead_callback (
struct rpc_context *rpc,
int rpc_status,
void *data,
void *private_data)
{
struct ra_context *ctx = (struct ra_context*) private_data;
struct rpc_task *task = ctx->task;
struct bytes_chunk *bc = &ctx->bc;
auto res = (READ3res*) data;
const char *errstr = nullptr;
assert(task->magic == RPC_TASK_MAGIC);
assert(bc->length > 0);
assert(task->rpc_api->read_task.get_offset() >= (off_t) bc->offset);
assert(task->rpc_api->read_task.get_size() <= bc->length);
// Cannot have read more than requested.
assert(res->READ3res_u.resok.count <= bc->length);
/*
* This callback would be called for some backend call that we must have
* issued.
*/
assert(bc->num_backend_calls_issued >= 1);
/*
* If we have already finished reading the entire bytes_chunk, why are we
* here.
*/
assert(bc->pvt < bc->length);
const int status = task->status(rpc_status, NFS_STATUS(res), &errstr);
const fuse_ino_t ino = task->rpc_api->read_task.get_ino();
struct nfs_inode *inode = task->get_client()->get_nfs_inode_from_ino(ino);
assert(inode->has_filecache());
const auto read_cache = inode->get_filecache();
assert(read_cache != nullptr);
assert(ino == inode->get_fuse_ino());
/*
* Now that the request has completed, we can query libnfs for the
* dispatch time.
*/
task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUS(res));
/*
* Offset and length for the actual read request for which this callback
* is called. Note that the entire read may not be satisfied, it may be
* a partial read response.
*/
const uint64_t issued_offset = bc->offset + bc->pvt;
const uint64_t issued_length = bc->length - bc->pvt;
if (status != 0) {
/*
* Readahead read failed? Nothing to do, unlock the membuf, release
* the byte range and pretend as if we never issued this read.
* We may have successfully read some part of it, as some prior read
* calls may have completed partially, but we cannot mark the membuf
* uptodate unless we read it fully, so we have to just drop it.
* Note that those prio successful reads would have caused the RPC
* stats to be updated, but that's fine.
*/
bc->get_membuf()->clear_locked();
bc->get_membuf()->clear_inuse();
// Release the buffer since we did not fill it.
read_cache->release(bc->offset, bc->length);
AZLogWarn("[{}] <{}> readahead_callback [FAILED] for offset: {} size: {} "
"total bytes read till now: {} of {} for [{}, {}) "
"num_backend_calls_issued: {}, rpc_status: {}, nfs_status: {}, "
"error: {}",
ino, task->issuing_tid,
issued_offset,
issued_length,
bc->pvt,
bc->length,
bc->offset,
bc->offset + bc->length,
bc->num_backend_calls_issued,
rpc_status,
(int) NFS_STATUS(res),
errstr);
goto delete_ctx;
} else {
UPDATE_INODE_ATTR(inode, res->READ3res_u.resok.file_attributes);
INC_GBL_STATS(server_bytes_read, res->READ3res_u.resok.count);
INC_GBL_STATS(server_read_reqs, 1);
/*
* Only first read call would have bc->pvt == 0, for subsequent calls
* we will have num_backend_calls_issued > 1.
*/
assert((bc->pvt == 0) || (bc->num_backend_calls_issued > 1));
// We should never get more data than what we requested.
assert(res->READ3res_u.resok.count <= issued_length);
const bool is_partial_read = !res->READ3res_u.resok.eof &&
(res->READ3res_u.resok.count < issued_length);
// Update bc->pvt with fresh bytes read in this call.
bc->pvt += res->READ3res_u.resok.count;
assert(bc->pvt <= bc->length);
INC_GBL_STATS(bytes_read_ahead, res->READ3res_u.resok.count);
AZLogDebug("[{}] <{}> readahead_callback: {}Read completed for offset: {} "
" size: {} Bytes read: {} eof: {}, total bytes read till "
"now: {} of {} for [{}, {}) num_backend_calls_issued: {}",
ino, task->issuing_tid,
is_partial_read ? "Partial " : "",
issued_offset,
issued_length,
res->READ3res_u.resok.count,
res->READ3res_u.resok.eof,
bc->pvt,
bc->length,
bc->offset,
bc->offset + bc->length,
bc->num_backend_calls_issued);
/*
* In case of partial read, issue read for the remaining.
*/
if (is_partial_read) {
assert(bc->pvt < bc->length);
const off_t new_offset = bc->offset + bc->pvt;
const size_t new_size = bc->length - bc->pvt;
bool rpc_retry = false;
READ3args new_args;
new_args.file = inode->get_fh();
new_args.offset = new_offset;
new_args.count = new_size;
// Create a new child task to carry out this request.
struct rpc_task *partial_read_tsk =
task->get_client()->get_rpc_task_helper()->alloc_rpc_task(FUSE_READ);
partial_read_tsk->init_read_be(
task->rpc_api->read_task.get_ino(),
new_size,
new_offset);
ctx->task = partial_read_tsk;
bc->num_backend_calls_issued++;
assert(bc->num_backend_calls_issued > 1);
AZLogDebug("[{}] Issuing partial read at offset: {} size: {}"
" for [{}, {})",
ino,
new_offset,
new_size,
bc->offset,
bc->offset + bc->length);
do {
rpc_retry = false;
/*
* We have identified partial read case where the
* server has returned fewer bytes than requested.
* Hence we will issue read for the remaining.
*
* Note: It is okay to issue a read call directly here
* as we are holding all the needed locks and refs.
*/
partial_read_tsk->get_stats().on_rpc_issue();
if (rpc_nfs3_read_task(
partial_read_tsk->get_rpc_ctx(),
readahead_callback,
bc->get_buffer() + bc->pvt,
new_size,
&new_args,
(void *) ctx) == NULL) {
partial_read_tsk->get_stats().on_rpc_cancel();
/*
* This call fails due to internal issues like OOM
* etc and not due to an actual error, hence retry.
*/
AZLogWarn("rpc_nfs3_read_task failed to issue, retrying "
"after 5 secs!");
::sleep(5);
rpc_retry = true;
}
} while (rpc_retry);
// Free the current RPC task as it has done its bit.
task->free_rpc_task();
/*
* Return from the callback here. The rest of the callback
* will be processed once this partial read completes.
*/
return;
}
}
/*
* We come here only after the complete readahead read has successfully
* completed.
* We should never return lesser bytes than requested, unless eof is
* encountered.
*/
assert(status == 0);
assert((bc->length == bc->pvt) || res->READ3res_u.resok.eof);
if (bc->maps_full_membuf() && (bc->length == bc->pvt)) {
/*
* Only if this bytes_chunk maps the entire membuf and the read has
* completed, we can mark the membuf uptodate.
*/
AZLogDebug("[{}] Setting uptodate flag for membuf [{}, {})",
ino, bc->offset, bc->offset + bc->length);
bc->get_membuf()->set_uptodate();
} else {
bool set_uptodate = false;
/*
* If we got eof in a partial read, release the non-existent
* portion of the chunk.
*/
if (bc->maps_full_membuf() && (bc->length > bc->pvt) &&
res->READ3res_u.resok.eof) {
assert(res->READ3res_u.resok.count < issued_length);
/*
* We need to clear the inuse count held by this thread, else
* release() will not be able to release. We drop and then
* promptly grab the inuse count after the release(), so that
* set_uptodate() can be called.
*/
bc->get_membuf()->clear_inuse();
const uint64_t released_bytes =
read_cache->release(bc->offset + bc->pvt,
bc->length - bc->pvt);
bc->get_membuf()->set_inuse();
/*
* If we are able to successfully release all the extra bytes
* from the bytes_chunk, that means there's no other thread
* actively performing IOs to the underlying membuf, so we can
* mark it uptodate.
*/
assert(released_bytes <= (bc->length - bc->pvt));
if (released_bytes == (bc->length - bc->pvt)) {
AZLogWarn("[{}] Setting uptodate flag for membuf [{}, {}), "
"after readahead hit eof, requested [{}, {}), "
"got [{}, {})",
ino,
bc->offset, bc->offset + bc->length,
issued_offset,
issued_offset + issued_length,
issued_offset,
issued_offset + res->READ3res_u.resok.count);
bc->get_membuf()->set_uptodate();
set_uptodate = true;
}
}
if (!set_uptodate) {
AZLogDebug("[{}] Not setting uptodate flag for membuf "
"[{}, {}), maps_full_membuf={}, is_new={}, "
"bc->length={}, bc->pvt={}",
ino, bc->offset, bc->offset + bc->length,
bc->maps_full_membuf(), bc->is_new, bc->length,
bc->pvt);
}
}
/*
* Release the lock that we held on the membuf since the data is now
* written and ready to be read.
*/
bc->get_membuf()->clear_locked();
bc->get_membuf()->clear_inuse();
delete_ctx:
/*
* Success or failure, report readahead completion.
* This MUST be called after dropping the membuf lock and inuse count.
*/
inode->get_rastate()->on_readahead_complete(bc->offset, bc->length);
// Free the readahead RPC task.
task->free_rpc_task();
// Free the context.
delete ctx;
// Decrement the extra ref taken on inode at the time read was issued.
inode->decref();
}