in src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c [134:209]
static int ringio_get_events(io_context_t aio_ctx, long min_nr, long max,
struct io_event *events, struct timespec *timeout) {
struct aio_ring *ring = to_aio_ring(aio_ctx);
//checks if it could be completed in user space, saving a sys call
if (RING_REAPER && !forceSysCall && has_usable_ring(ring)) {
const unsigned ring_nr = ring->nr;
// We're assuming to be the exclusive writer to head, so we just need a compiler barrier
unsigned head = ring->head;
mem_barrier();
const unsigned tail = ring->tail;
int available = tail - head;
if (available < 0) {
//a wrap has occurred
available += ring_nr;
}
#ifdef DEBUG
fprintf(stdout, "tail = %d head= %d nr = %d available = %d\n", tail, head, ring_nr, available);
#endif
if ((available >= min_nr) || (timeout && timeout->tv_sec == 0 && timeout->tv_nsec == 0)) {
if (!available) {
return 0;
}
if (available >= max) {
// This is to trap a possible bug from the kernel:
// https://bugzilla.redhat.com/show_bug.cgi?id=1845326
// https://issues.apache.org/jira/browse/ARTEMIS-2800
//
// On the race available would eventually be >= max, while ring->tail was invalid
// we could work around by waiting ring-tail to change:
// while (ring->tail == tail) mem_barrier();
//
// however eventually we could have available==max in a legal situation what could lead to infinite loop here
return io_getevents(aio_ctx, min_nr, max, events, timeout);
// also: I could have called io_getevents to the one at the end of this method
// but I really hate goto, so I would rather have a duplicate code here
// and I did not want to create another memory flag to stop the rest of the code
}
//the kernel has written ring->tail from an interrupt:
//we need to load acquire the completed events here
read_barrier();
const int available_nr = available < max? available : max;
//if isn't needed to wrap we can avoid % operations that are quite expansive
const int needMod = ((head + available_nr) >= ring_nr) ? 1 : 0;
for (int i = 0; i<available_nr; i++) {
events[i] = ring->io_events[head];
if (needMod == 1) {
head = (head + 1) % ring_nr;
} else {
head = (head + 1);
}
}
//it allow the kernel to build its own view of the ring buffer size
//and push new events if there are any
store_barrier();
ring->head = head;
#ifdef DEBUG
fprintf(stdout, "consumed non sys-call = %d\n", available_nr);
#endif
return available_nr;
}
} else {
#ifdef DEBUG
fprintf(stdout, "The kernel is not supoprting the ring buffer any longer\n");
#endif
}
// if this next line ever needs to be changed, beware of a duplicate code on this method
// I explain why I duplicated the call instead of reuse it there ^^^^
int sys_call_events = io_getevents(aio_ctx, min_nr, max, events, timeout);
#ifdef DEBUG
fprintf(stdout, "consumed sys-call = %d\n", sys_call_events);
#endif
return sys_call_events;
}