in Sources/NIOPosix/SelectorUring.swift [200:326]
func whenReady0(strategy: SelectorStrategy, onLoopBegin loopStart: () -> Void, _ body: (SelectorEvent<R>) throws -> Void) throws -> Void {
assert(self.myThread == NIOThread.current)
guard self.lifecycleState == .open else {
throw IOError(errnoCode: EBADF, reason: "can't call whenReady for selector as it's \(self.lifecycleState).")
}
var ready: Int = 0
// flush reregisteration of pending modifications if needed (nop in SQPOLL mode)
// basically this elides all reregistrations and deregistrations into a single
// syscall instead of one for each. Future improvement would be to also merge
// the pending pollmasks (now each change will be queued, but we could also
// merge the masks for reregistrations) - but the most important thing is to
// only trap into the kernel once for the set of changes, so needs to be measured.
if deferReregistrations && self.deferredReregistrationsPending {
self.deferredReregistrationsPending = false
ring.io_uring_flush()
}
switch strategy {
case .now:
_debugPrint("whenReady.now")
ready = Int(ring.io_uring_peek_batch_cqe(events: events, maxevents: UInt32(eventsCapacity), multishot:multishot))
case .blockUntilTimeout(let timeAmount):
_debugPrint("whenReady.blockUntilTimeout")
ready = try Int(ring.io_uring_wait_cqe_timeout(events: events, maxevents: UInt32(eventsCapacity), timeout:timeAmount, multishot:multishot))
case .block:
_debugPrint("whenReady.block")
ready = Int(ring.io_uring_peek_batch_cqe(events: events, maxevents: UInt32(eventsCapacity), multishot:multishot)) // first try to consume any existing
if (ready <= 0) // otherwise block (only single supported, but we will use batch peek cqe next run around...
{
ready = try ring.io_uring_wait_cqe(events: events, maxevents: UInt32(eventsCapacity), multishot:multishot)
}
}
loopStart()
for i in 0..<ready {
let event = events[i]
switch event.fd {
case self.eventFD: // we don't run these as multishots to avoid tons of events when many wakeups are done
_debugPrint("wakeup successful for event.fd [\(event.fd)]")
var val = EventFd.eventfd_t()
ring.io_uring_prep_poll_add(fileDescriptor: self.eventFD,
pollMask: URing.POLLIN,
registrationID: SelectorRegistrationID(rawValue: 0),
submitNow: false,
multishot: false)
do {
_ = try EventFd.eventfd_read(fd: self.eventFD, value: &val) // consume wakeup event
_debugPrint("read val [\(val)] from event.fd [\(event.fd)]")
} catch {
}
default:
if let registration = registrations[Int(event.fd)] {
_debugPrint("We found a registration for event.fd [\(event.fd)]") // \(registration)
// The io_uring backend only has 16 bits available for the registration id
guard event.registrationID == UInt16(truncatingIfNeeded:registration.registrationID.rawValue) else {
_debugPrint("The event.registrationID [\(event.registrationID)] != registration.selectableregistrationID [\(registration.registrationID)], skipping to next event")
continue
}
var selectorEvent = SelectorEventSet(uringEvent: event.pollMask)
_debugPrint("selectorEvent [\(selectorEvent)] registration.interested [\(registration.interested)]")
// we only want what the user is currently registered for & what we got
selectorEvent = selectorEvent.intersection(registration.interested)
_debugPrint("intersection [\(selectorEvent)]")
if selectorEvent.contains(.readEOF) {
_debugPrint("selectorEvent.contains(.readEOF) [\(selectorEvent.contains(.readEOF))]")
}
if multishot == false { // must be before guard, otherwise lost wake
ring.io_uring_prep_poll_add(fileDescriptor: event.fd,
pollMask: registration.interested.uringEventSet,
registrationID: registration.registrationID,
submitNow: false,
multishot: false)
if event.pollCancelled {
_debugPrint("Received event.pollCancelled")
}
}
guard selectorEvent != ._none else {
_debugPrint("selectorEvent != ._none / [\(selectorEvent)] [\(registration.interested)] [\(SelectorEventSet(uringEvent: event.pollMask))] [\(event.pollMask)] [\(event.fd)]")
continue
}
// This is only needed due to the edge triggered nature of liburing, possibly
// we can get away with only updating (force triggering an event if available) for
// partial reads (where we currently give up after N iterations)
if multishot && self.shouldRefreshPollForEvent(selectorEvent:selectorEvent) { // can be after guard as it is multishot
ring.io_uring_poll_update(fileDescriptor: event.fd,
newPollmask: registration.interested.uringEventSet,
oldPollmask: registration.interested.uringEventSet,
registrationID: registration.registrationID,
submitNow: false)
}
_debugPrint("running body [\(NIOThread.current)] \(selectorEvent) \(SelectorEventSet(uringEvent: event.pollMask))")
try body((SelectorEvent(io: selectorEvent, registration: registration)))
} else { // remove any polling if we don't have a registration for it
_debugPrint("We had no registration for event.fd [\(event.fd)] event.pollMask [\(event.pollMask)] event.registrationID [\(event.registrationID)], it should be deregistered already")
if multishot == false {
ring.io_uring_prep_poll_remove(fileDescriptor: event.fd,
pollMask: event.pollMask,
registrationID: SelectorRegistrationID(rawValue: UInt32(event.registrationID)),
submitNow: false)
}
}
}
}
self.deferredReregistrationsPending = false // none pending as we will flush here
ring.io_uring_flush() // flush reregisteration of the polls if needed (nop in SQPOLL mode)
growEventArrayIfNeeded(ready: ready)
}