in demo_example/asio/asio/detail/impl/kqueue_reactor.ipp [193:263]
void kqueue_reactor::start_op(int op_type, socket_type descriptor,
kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
bool is_continuation, bool allow_speculative)
{
if (!descriptor_data)
{
op->ec_ = asio::error::bad_descriptor;
post_immediate_completion(op, is_continuation);
return;
}
mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
if (descriptor_data->shutdown_)
{
post_immediate_completion(op, is_continuation);
return;
}
if (descriptor_data->op_queue_[op_type].empty())
{
static const int num_kevents[max_ops] = { 1, 2, 1 };
if (allow_speculative
&& (op_type != read_op
|| descriptor_data->op_queue_[except_op].empty()))
{
if (op->perform())
{
descriptor_lock.unlock();
scheduler_.post_immediate_completion(op, is_continuation);
return;
}
if (descriptor_data->num_kevents_ < num_kevents[op_type])
{
struct kevent events[2];
ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
if (::kevent(kqueue_fd_, events, num_kevents[op_type], 0, 0, 0) != -1)
{
descriptor_data->num_kevents_ = num_kevents[op_type];
}
else
{
op->ec_ = asio::error_code(errno,
asio::error::get_system_category());
scheduler_.post_immediate_completion(op, is_continuation);
return;
}
}
}
else
{
if (descriptor_data->num_kevents_ < num_kevents[op_type])
descriptor_data->num_kevents_ = num_kevents[op_type];
struct kevent events[2];
ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
}
}
descriptor_data->op_queue_[op_type].push(op);
scheduler_.work_started();
}