static int ringio_get_events()

in src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c [134:209]


static int ringio_get_events(io_context_t aio_ctx, long min_nr, long max,
                                                       struct io_event *events, struct timespec *timeout) {
    struct aio_ring *ring = to_aio_ring(aio_ctx);
    //checks if it could be completed in user space, saving a sys call
    if (RING_REAPER && !forceSysCall && has_usable_ring(ring)) {
        const unsigned ring_nr = ring->nr;
        // We're assuming to be the exclusive writer to head, so we just need a compiler barrier
        unsigned head = ring->head;
        mem_barrier();
        const unsigned tail = ring->tail;
        int available = tail - head;
        if (available < 0) {
            //a wrap has occurred
            available += ring_nr;
        }
        #ifdef DEBUG
            fprintf(stdout, "tail = %d head= %d nr = %d available = %d\n", tail, head, ring_nr, available);
        #endif
        if ((available >= min_nr) || (timeout && timeout->tv_sec == 0 && timeout->tv_nsec == 0)) {
            if (!available) {
                return 0;
            }

            if (available >= max) {
               // This is to trap a possible bug from the kernel:
               //       https://bugzilla.redhat.com/show_bug.cgi?id=1845326
               //       https://issues.apache.org/jira/browse/ARTEMIS-2800
               //
               // On the race available would eventually be >= max, while ring->tail was invalid
               // we could work around by waiting ring-tail to change:
               // while (ring->tail == tail) mem_barrier();
               //
               // however eventually we could have available==max in a legal situation what could lead to infinite loop here
               return io_getevents(aio_ctx, min_nr, max, events, timeout);

               // also: I could have called io_getevents to the one at the end of this method
               //       but I really hate goto, so I would rather have a duplicate code here
               //       and I did not want to create another memory flag to stop the rest of the code
            }

            //the kernel has written ring->tail from an interrupt:
            //we need to load acquire the completed events here
            read_barrier();
            const int available_nr = available < max? available : max;
            //if isn't needed to wrap we can avoid % operations that are quite expansive
            const int needMod = ((head + available_nr) >= ring_nr) ? 1 : 0;
            for (int i = 0; i<available_nr; i++) {
                events[i] = ring->io_events[head];
                if (needMod == 1) {
                    head = (head + 1) % ring_nr;
                } else {
                    head = (head + 1);
                }
            }
            //it allow the kernel to build its own view of the ring buffer size
            //and push new events if there are any
            store_barrier();
            ring->head = head;
            #ifdef DEBUG
                fprintf(stdout, "consumed non sys-call = %d\n", available_nr);
            #endif
            return available_nr;
        }
    } else {
        #ifdef DEBUG
            fprintf(stdout, "The kernel is not supoprting the ring buffer any longer\n");
        #endif
    }
    // if this next line ever needs to be changed, beware of a duplicate code on this method
    // I explain why I duplicated the call instead of reuse it there ^^^^
    int sys_call_events = io_getevents(aio_ctx, min_nr, max, events, timeout);
    #ifdef DEBUG
        fprintf(stdout, "consumed sys-call = %d\n", sys_call_events);
    #endif
    return sys_call_events;
}