in src/core/fishstore.h [1060:1097]
inline void FishStore<D, A>::CompleteIoPendingRequests(ExecutionContext& context) {
AsyncIOContext* ctxt;
// Clear this thread's I/O response queue. (Does not clear I/Os issued by this thread that have
// not yet completed.)
while(context.io_responses.try_pop(ctxt)) {
CallbackContext<AsyncIOContext> io_context{ ctxt };
CallbackContext<pending_context_t> pending_context{ io_context->caller_context };
// This I/O is no longer pending, since we popped its response off the queue.
auto pending_io = context.pending_ios.find(io_context->io_id);
assert(pending_io != context.pending_ios.end() ||
pending_context.get()->type == OperationType::FullScan);
if(pending_io != context.pending_ios.end())
context.pending_ios.erase(pending_io);
--context.pending_io_cnt;
// Issue the continue command
OperationStatus internal_status;
if(pending_context->type == OperationType::Read) {
internal_status = InternalContinuePendingRead(context, *io_context.get());
} else {
assert(pending_context->type == OperationType::Scan ||
pending_context->type == OperationType::FullScan);
internal_status = InternalContinuePendingScan(context, *io_context.get());
}
Status result;
if(internal_status == OperationStatus::SUCCESS) {
result = Status::Ok;
} else if(internal_status == OperationStatus::NOT_FOUND) {
result = Status::NotFound;
} else {
result = HandleOperationStatus(context, *pending_context.get(), internal_status,
pending_context.async);
}
if(!pending_context.async) {
pending_context->caller_callback(pending_context->caller_context, result);
}
}
}