in src/kernel_aio.cc [203:256]
int submit(int group_id) {
if (!started) assert(!"IOManager not started!");
groups_mutex.lock();
auto group = groups[group_id];
groups_mutex.unlock();
// create the array
auto array = std::make_shared<CbArray>();
array->size = group->op_queue.size();
array->items_left = array->size;
array->array = (iocb**)malloc(sizeof(iocb*)*array->size);
int i = 0;
for (auto& op : group->op_queue) {
op->cb_array = array; // copy pointer to the array struct so we can find it later
// add the op to the in_flight map with a unique id
while(group->in_flight.count(group->next_flight)) {
group->next_flight = (void *)((size_t)(group->next_flight) + 1);
}
op->cb.data = group->next_flight;
group->in_flight[group->next_flight] = op;
group->next_flight = (void *)((size_t)(group->next_flight) + 1);
// add the ops's cb to the array
array->array[i++] = &(op->cb);
}
// do the actual io submission!
int err = io_submit(group->ctx, array->size, array->array);
// clean up and return an error
if (err != array->size) {
// clean up all the ops we tried to submit
for (int j = 0; j < array->size; ++j) {
io_cancel(group->ctx, array->array[j], nullptr);// cancel the op, ignore errors
void * in_flight_id = array->array[j]->data; // get the in flight id
group->in_flight.erase(in_flight_id); // remove from map
}
free(array->array); // free the malloc'd memory
group->arrays.erase(array); // remove from the set of cbarrays
d_printf("IOManager failed: io_submit returned %d instead of %lu\n", err, array->size);
errno = -err; // libaio returns the error code negated so we convert it
return err;
}
// clear the queued ops
group->op_queue.clear();
return 0;
}