void io_uring_service::run()

in demo_example/asio/asio/detail/impl/io_uring_service.ipp [405:496]


void io_uring_service::run(long usec, op_queue<operation>& ops)
{
  __kernel_timespec ts;
  int local_ops = 0;

  if (usec > 0)
  {
    ts.tv_sec = usec / 1000000;
    ts.tv_nsec = (usec % 1000000) * 1000;
    mutex::scoped_lock lock(mutex_);
    if (::io_uring_sqe* sqe = get_sqe())
    {
      ++local_ops;
      ::io_uring_prep_timeout(sqe, &ts, 0, 0);
      ::io_uring_sqe_set_data(sqe, &ts);
      submit_sqes();
    }
  }

  ::io_uring_cqe* cqe = 0;
  int result = (usec == 0)
    ? ::io_uring_peek_cqe(&ring_, &cqe)
    : ::io_uring_wait_cqe(&ring_, &cqe);

  if (result == 0 && usec > 0)
  {
    if (::io_uring_cqe_get_data(cqe) != &ts)
    {
      mutex::scoped_lock lock(mutex_);
      if (::io_uring_sqe* sqe = get_sqe())
      {
        ++local_ops;
        ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&ts), 0);
        submit_sqes();
      }
    }
  }

  bool check_timers = false;
  int count = 0;
  while (result == 0)
  {
    if (void* ptr = ::io_uring_cqe_get_data(cqe))
    {
      if (ptr == this)
      {
        // The io_uring service was interrupted.
      }
      else if (ptr == &timer_queues_)
      {
        check_timers = true;
      }
      else if (ptr == &timeout_)
      {
        check_timers = true;
        timeout_.tv_sec = 0;
        timeout_.tv_nsec = 0;
      }
      else if (ptr == &ts)
      {
        --local_ops;
      }
      else
      {
        io_queue* io_q = static_cast<io_queue*>(ptr);
        io_q->set_result(cqe->res);
        ops.push(io_q);
      }
    }
    ::io_uring_cqe_seen(&ring_, cqe);
    result = (++count < complete_batch_size || local_ops > 0)
      ? ::io_uring_peek_cqe(&ring_, &cqe) : -EAGAIN;
  }

  decrement(outstanding_work_, count);

  if (check_timers)
  {
    mutex::scoped_lock lock(mutex_);
    timer_queues_.get_ready_timers(ops);
    if (timeout_.tv_sec == 0 && timeout_.tv_nsec == 0)
    {
      timeout_ = get_timeout();
      if (::io_uring_sqe* sqe = get_sqe())
      {
        ::io_uring_prep_timeout(sqe, &timeout_, 0, 0);
        ::io_uring_sqe_set_data(sqe, &timeout_);
        push_submit_sqes_op(ops);
      }
    }
  }
}