in turbonfs/src/rpc_task.cpp [3879:4308]
static void read_callback(
struct rpc_context *rpc,
int rpc_status,
void *data,
void *private_data)
{
struct read_context *ctx = (read_context*) private_data;
rpc_task *task = ctx->task;
assert(task->magic == RPC_TASK_MAGIC);
// This must be a BE task.
assert(task->rpc_api->read_task.is_be());
/*
* Parent task corresponds to the fuse request that intiated the read.
* This will be used to complete the request to fuse.
*/
rpc_task *parent_task = task->rpc_api->parent_task;
/*
* Only BR tasks can issue the read RPC, hence the callback should
* be called only for them.
*/
assert(parent_task != nullptr);
assert(parent_task->magic == RPC_TASK_MAGIC);
/*
* num_ongoing_backend_reads is updated only for the parent task and it
* counts how many child rpc tasks are ongoing for this parent task.
* num_ongoing_backend_reads will always be 0 for child tasks.
*/
assert(parent_task->num_ongoing_backend_reads > 0);
assert(task->num_ongoing_backend_reads == 0);
struct bytes_chunk *bc = ctx->bc;
assert(bc->length > 0);
// We are in the callback, so at least one backend call was issued.
assert(bc->num_backend_calls_issued > 0);
/*
* If we have already finished reading the entire bytes_chunk, why are we
* here. We must have locked the membuf and marked inuse before we issued
* the read.
*/
assert(bc->pvt < bc->length);
assert(bc->get_membuf()->is_inuse());
assert(bc->get_membuf()->is_locked());
const char* errstr;
auto res = (READ3res*)data;
INJECT_JUKEBOX(res, task);
const int status = (task->status(rpc_status, NFS_STATUS(res), &errstr));
fuse_ino_t ino = task->rpc_api->read_task.get_ino();
struct nfs_inode *inode = task->get_client()->get_nfs_inode_from_ino(ino);
/*
* Applications can only issue reads on an open fd and we ensure filecache
* is created on file open.
*/
assert(inode->has_filecache());
auto filecache_handle = inode->get_filecache();
/*
* read_callback() must only be called for read done from fuse for which
* we must have allocated the cache.
*/
assert(filecache_handle);
const uint64_t issued_offset = bc->offset + bc->pvt;
const uint64_t issued_length = bc->length - bc->pvt;
/*
* It is okay to free the context here as we do not access it after this
* point.
*/
delete ctx;
/*
* Now that the request has completed, we can query libnfs for the
* dispatch time.
*/
task->get_stats().on_rpc_complete(rpc_get_pdu(rpc), NFS_STATUSX(rpc_status, res));
if (status == 0) {
/*
* Post-op attributes are optional, if server returns then
* update the inode attribute cache.
*/
UPDATE_INODE_ATTR(inode, res->READ3res_u.resok.file_attributes);
/*
* Reads are counted in the callback as that's when the read
* responses have just come on the wire.
*/
INC_GBL_STATS(server_bytes_read, res->READ3res_u.resok.count);
INC_GBL_STATS(server_read_reqs, 1);
#ifdef ENABLE_PRESSURE_POINTS
/*
* Short read pressure point, skip when eof received.
*/
if (inject_error() && !res->READ3res_u.resok.eof) {
// Set read size to a random percent of the actual size.
const uint32_t pct = random_number(1, 100);
const uint32_t adj_size =
std::max((res->READ3res_u.resok.count * pct) / 100, 1U);
assert(adj_size <= res->READ3res_u.resok.count);
AZLogWarn("[{}] PP: short read {} -> {}",
ino, res->READ3res_u.resok.count, adj_size);
res->READ3res_u.resok.count = adj_size;
}
#endif
assert((bc->pvt == 0) || (bc->num_backend_calls_issued > 1));
// We should never get more data than what we requested.
assert(res->READ3res_u.resok.count <= issued_length);
const bool is_partial_read = !res->READ3res_u.resok.eof &&
(res->READ3res_u.resok.count < issued_length);
// Update bc->pvt with fresh bytes read in this call.
bc->pvt += res->READ3res_u.resok.count;
assert(bc->pvt <= bc->length);
AZLogDebug("[{}] <{}> read_callback: {}Read completed for [{}, {}), "
"Bytes read: {} eof: {}, total bytes read till "
"now: {} of {} for [{}, {}) num_backend_calls_issued: {}",
ino, task->issuing_tid,
is_partial_read ? "Partial " : "",
issued_offset,
issued_offset + issued_length,
res->READ3res_u.resok.count,
res->READ3res_u.resok.eof,
bc->pvt,
bc->length,
bc->offset,
bc->offset + bc->length,
bc->num_backend_calls_issued);
/*
* There's a special case that we need to handle.
* Since we support sparse writes (beyond the server file size) we can
* have a case where we have a bc which spans the server file size
* boundary. This will happen when inode->get_server_file_size() is
* less than inode->get_cached_filesize() and the portion of the file
* immediately after server file size is not cached. For such a bc we
* issue the READ call to the server and we expect the server to return
* partial bc data + eof. Now we should set rest of the bc to 0s as it
* corresponds to hole in the file.
*/
const uint64_t csfsize = inode->get_cached_filesize();
if (res->READ3res_u.resok.eof &&
(res->READ3res_u.resok.count < issued_length) &&
((bc->offset + bc->pvt) < csfsize)) {
void *const zb = bc->get_buffer() + bc->pvt;
const int64_t zb_len =
std::min(csfsize, bc->offset + bc->length) - (bc->offset + bc->pvt);
AZLogDebug("[{}] <{}> read_callback: bc [{}, {}) spans across "
"server file size boundary, filling remaining {} bytes "
"@ offset {}, with 0s. cfsize: {}, sfsize: {}, csfsize: {}",
ino, task->issuing_tid,
issued_offset,
issued_offset + issued_length,
zb_len, bc->offset + bc->pvt,
inode->get_client_file_size(),
inode->get_server_file_size(),
csfsize);
assert(zb_len > 0);
::memset(zb, 0, zb_len);
// Added zb_len more bytes to bc.
bc->pvt += zb_len;
assert(bc->pvt <= bc->length);
}
/*
* In case of partial read, issue read for the remaining.
*/
if (is_partial_read) {
const off_t new_offset = bc->offset + bc->pvt;
const size_t new_size = bc->length - bc->pvt;
// Create a new child task to carry out this request.
struct rpc_task *child_tsk =
task->get_client()->get_rpc_task_helper()->alloc_rpc_task_reserved(FUSE_READ);
child_tsk->init_read_be(
task->rpc_api->read_task.get_ino(),
new_size,
new_offset);
/*
* Set the parent task of the child to the parent of the
* current RPC task. This is required if the current task itself
* is one of the child tasks running part of the fuse read request.
*/
child_tsk->rpc_api->parent_task = parent_task;
/*
* Child task must continue to fill the same bc.
*/
child_tsk->rpc_api->bc = bc;
/*
* TODO: To avoid allocating a new read_context we can reuse the
* existing contest but we have to update the task member.
*/
struct read_context *new_ctx = new read_context(child_tsk, bc);
bool rpc_retry;
READ3args new_args;
new_args.file = inode->get_fh();
new_args.offset = new_offset;
new_args.count = new_size;
// One more backend call issued to fill this bc.
bc->num_backend_calls_issued++;
AZLogDebug("[{}] Issuing partial read at offset: {} size: {}"
" for [{}, {})",
ino,
new_offset,
new_size,
bc->offset,
bc->offset + bc->length);
do {
/*
* We have identified partial read case where the
* server has returned fewer bytes than requested.
* Fuse cannot accept fewer bytes than requested,
* unless it's an eof or error.
* Hence we will issue read for the remaining.
*
* Note: It is okay to issue a read call directly here
* as we are holding all the needed locks and refs.
*/
rpc_retry = false;
child_tsk->get_stats().on_rpc_issue();
if (rpc_nfs3_read_task(
child_tsk->get_rpc_ctx(),
read_callback,
bc->get_buffer() + bc->pvt,
new_size,
&new_args,
(void *) new_ctx) == NULL) {
child_tsk->get_stats().on_rpc_cancel();
/*
* Most common reason for this is memory allocation failure,
* hence wait for some time before retrying. Also block the
* current thread as we really want to slow down things.
*
* TODO: For soft mount should we fail this?
*/
rpc_retry = true;
AZLogWarn("rpc_nfs3_read_task failed to issue, retrying "
"after 5 secs!");
::sleep(5);
}
} while (rpc_retry);
// Free the current RPC task as it has done its bit.
task->free_rpc_task();
/*
* Return from the callback here. The rest of the callback
* will be processed once this partial read completes.
*/
return;
}
/*
* We should never return lesser bytes to the fuse than requested,
* unless error or eof is encountered after this point.
*/
assert((bc->length == bc->pvt) || res->READ3res_u.resok.eof);
if (bc->maps_full_membuf() && (bc->length == bc->pvt)) {
/*
* If this bc maps the entire chunkmap bytes_chunk and we have
* read the entire range represented by this bc, then we can
* mark it uptodate.
*/
#ifdef ENABLE_PRESSURE_POINTS
if (inject_error()) {
AZLogDebug("[{}] PP: Not setting uptodate flag for membuf "
"[{}, {})",
ino, bc->offset, bc->offset + bc->length);
} else
#endif
{
AZLogDebug("[{}] Setting uptodate flag for membuf [{}, {})",
ino, bc->offset, bc->offset + bc->length);
bc->get_membuf()->set_uptodate();
}
} else {
bool set_uptodate = false;
/*
* If we got eof in a partial read, release the non-existent
* portion of the chunk.
*/
if (bc->maps_full_membuf() && (bc->length > bc->pvt) &&
res->READ3res_u.resok.eof) {
assert(res->READ3res_u.resok.count < issued_length);
/*
* We need to clear the inuse count held by this thread, else
* release() will not be able to release. We drop and then
* promptly grab the inuse count after the release(), so that
* set_uptodate() can be called.
*/
bc->get_membuf()->clear_inuse();
const uint64_t released_bytes =
filecache_handle->release(bc->offset + bc->pvt,
bc->length - bc->pvt);
bc->get_membuf()->set_inuse();
/*
* If we are able to successfully release all the extra bytes
* from the bytes_chunk, that means there's no other thread
* actively performing IOs to the underlying membuf, so we can
* mark it uptodate as future readers will get the trimmed
* membuf which *is* uptodate.
*/
assert(released_bytes <= (bc->length - bc->pvt));
if (released_bytes == (bc->length - bc->pvt)) {
AZLogDebug("[{}] Setting uptodate flag for membuf [{}, {}) "
"after read hit eof, requested [{}, {}), "
"got [{}, {})",
ino,
bc->offset, bc->offset + bc->length,
issued_offset,
issued_offset + issued_length,
issued_offset,
issued_offset + res->READ3res_u.resok.count);
bc->get_membuf()->set_uptodate();
set_uptodate = true;
}
}
if (!set_uptodate) {
AZLogDebug("[{}] Not setting uptodate flag for membuf "
"[{}, {}), maps_full_membuf={}, is_new={}, "
"bc->length={}, bc->pvt={}",
ino, bc->offset, bc->offset + bc->length,
bc->maps_full_membuf(), bc->is_new, bc->length,
bc->pvt);
}
}
} else if (NFS_STATUS(res) == NFS3ERR_JUKEBOX) {
task->get_client()->jukebox_retry(task);
/*
* Note: The lock on the membuf will be held till the task is retried.
* This lock will be released only if the retry passes or fails
* with error other than NFS3ERR_JUKEBOX.
*/
return;
} else {
AZLogError("[{}] <{}> Read failed for offset: {} size: {} "
"total bytes read till now: {} of {} for [{}, {}) "
"num_backend_calls_issued: {} error: {}",
ino, task->issuing_tid,
issued_offset,
issued_length,
bc->pvt,
bc->length,
bc->offset,
bc->offset + bc->length,
bc->num_backend_calls_issued,
errstr);
}
/*
* Release the lock that we held on the membuf since the data is now
* written to it.
* The lock is needed only to write the data and not to just read it.
* Hence it is safe to read this membuf even beyond this point.
*/
bc->get_membuf()->clear_locked();
bc->get_membuf()->clear_inuse();
#ifdef RELEASE_CHUNK_AFTER_APPLICATION_READ
/*
* Since we come here only for client reads, we will not cache the data,
* hence release the chunk.
* This can safely be done for both success and failure case.
*/
filecache_handle->release(bc->offset, bc->length);
#endif
// For failed status we must never mark the buffer uptodate.
assert(!status || !bc->get_membuf()->is_uptodate());
// Once failed, read_status remains at failed.
int expected = 0;
parent_task->read_status.compare_exchange_weak(expected, status);
/*
* Decrement the number of reads issued atomically and if it becomes zero
* it means this is the last read completing. We send the response if all
* the reads have completed or the read failed.
*/
if (--parent_task->num_ongoing_backend_reads == 0) {
/*
* Parent task must send the read response to fuse.
* This will also free parent_task.
*/
parent_task->send_read_response();
// Free the child task after sending the response.
task->free_rpc_task();
} else {
AZLogDebug("No response sent, waiting for more reads to complete."
" num_ongoing_backend_reads: {}",
parent_task->num_ongoing_backend_reads.load());
/*
* This task has completed its part of the read, free it here.
* When all reads complete, the parent task will be completed.
*/
task->free_rpc_task();
return;
}
}