inline void FishStore::CompleteIoPendingRequests()

in src/core/fishstore.h [1060:1097]


inline void FishStore<D, A>::CompleteIoPendingRequests(ExecutionContext& context) {
  AsyncIOContext* ctxt;
  // Clear this thread's I/O response queue. (Does not clear I/Os issued by this thread that have
  // not yet completed.)
  while(context.io_responses.try_pop(ctxt)) {
    CallbackContext<AsyncIOContext> io_context{ ctxt };
    CallbackContext<pending_context_t> pending_context{ io_context->caller_context };
    // This I/O is no longer pending, since we popped its response off the queue.
    auto pending_io = context.pending_ios.find(io_context->io_id);
    assert(pending_io != context.pending_ios.end() ||
           pending_context.get()->type == OperationType::FullScan);
    if(pending_io != context.pending_ios.end())
      context.pending_ios.erase(pending_io);
    --context.pending_io_cnt;

    // Issue the continue command
    OperationStatus internal_status;
    if(pending_context->type == OperationType::Read) {
      internal_status = InternalContinuePendingRead(context, *io_context.get());
    } else {
      assert(pending_context->type == OperationType::Scan ||
             pending_context->type == OperationType::FullScan);
      internal_status = InternalContinuePendingScan(context, *io_context.get());
    }
    Status result;
    if(internal_status == OperationStatus::SUCCESS) {
      result = Status::Ok;
    } else if(internal_status == OperationStatus::NOT_FOUND) {
      result = Status::NotFound;
    } else {
      result = HandleOperationStatus(context, *pending_context.get(), internal_status,
                                     pending_context.async);
    }
    if(!pending_context.async) {
      pending_context->caller_callback(pending_context->caller_context, result);
    }
  }
}