in dispenso/detail/pipeline_impl.h [43:77]
void schedule(F&& fPipe) {
outstanding_.fetch_add(1, std::memory_order_acq_rel);
if (unlimited_) {
tasks_.schedule([this, fPipe = std::move(fPipe)]() mutable {
fPipe([]() {});
outstanding_.fetch_sub(1, std::memory_order_acq_rel);
});
return;
}
DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
queue_.enqueue([this, fPipe = std::move(fPipe)]() mutable {
fPipe([this]() {
OnceFunction func;
if (queue_.try_dequeue(func)) {
tasks_.schedule(std::move(func));
} else {
resources_.fetch_add(1, std::memory_order_acq_rel);
}
});
outstanding_.fetch_sub(1, std::memory_order_acq_rel);
});
DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();
while (resources_.fetch_sub(1, std::memory_order_acq_rel) > 0) {
OnceFunction func;
if (queue_.try_dequeue(func)) {
tasks_.schedule(std::move(func));
} else {
break;
}
}
resources_.fetch_add(1, std::memory_order_acq_rel);
}