in demo_example/asio/asio/detail/impl/io_uring_service.ipp [405:496]
void io_uring_service::run(long usec, op_queue<operation>& ops)
{
__kernel_timespec ts;
int local_ops = 0;
if (usec > 0)
{
ts.tv_sec = usec / 1000000;
ts.tv_nsec = (usec % 1000000) * 1000;
mutex::scoped_lock lock(mutex_);
if (::io_uring_sqe* sqe = get_sqe())
{
++local_ops;
::io_uring_prep_timeout(sqe, &ts, 0, 0);
::io_uring_sqe_set_data(sqe, &ts);
submit_sqes();
}
}
::io_uring_cqe* cqe = 0;
int result = (usec == 0)
? ::io_uring_peek_cqe(&ring_, &cqe)
: ::io_uring_wait_cqe(&ring_, &cqe);
if (result == 0 && usec > 0)
{
if (::io_uring_cqe_get_data(cqe) != &ts)
{
mutex::scoped_lock lock(mutex_);
if (::io_uring_sqe* sqe = get_sqe())
{
++local_ops;
::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&ts), 0);
submit_sqes();
}
}
}
bool check_timers = false;
int count = 0;
while (result == 0)
{
if (void* ptr = ::io_uring_cqe_get_data(cqe))
{
if (ptr == this)
{
// The io_uring service was interrupted.
}
else if (ptr == &timer_queues_)
{
check_timers = true;
}
else if (ptr == &timeout_)
{
check_timers = true;
timeout_.tv_sec = 0;
timeout_.tv_nsec = 0;
}
else if (ptr == &ts)
{
--local_ops;
}
else
{
io_queue* io_q = static_cast<io_queue*>(ptr);
io_q->set_result(cqe->res);
ops.push(io_q);
}
}
::io_uring_cqe_seen(&ring_, cqe);
result = (++count < complete_batch_size || local_ops > 0)
? ::io_uring_peek_cqe(&ring_, &cqe) : -EAGAIN;
}
decrement(outstanding_work_, count);
if (check_timers)
{
mutex::scoped_lock lock(mutex_);
timer_queues_.get_ready_timers(ops);
if (timeout_.tv_sec == 0 && timeout_.tv_nsec == 0)
{
timeout_ = get_timeout();
if (::io_uring_sqe* sqe = get_sqe())
{
::io_uring_prep_timeout(sqe, &timeout_, 0, 0);
::io_uring_sqe_set_data(sqe, &timeout_);
push_submit_sqes_op(ops);
}
}
}
}