in dispenso/detail/pipeline_impl.h [254:271]
void execute() {
ssize_t numThreads = std::min(tasks_.numPoolThreads(), StageLimits<CurStage>::limit(stage_));
completion_ = std::make_unique<CompletionEventImpl>(static_cast<int>(numThreads));
for (ssize_t i = 0; i < numThreads; ++i) {
tasks_.schedule([this]() {
while (auto op = stage_()) {
pipeNext_.execute(std::move(op.value()));
}
// fetch_sub returns the previous value, so if it was 1, that means no items are left.
// notify wouldn't technically require the value to be set, since the underlying status
// is already zero, but we just use the current notify interface, as this is unlikely to be
// any kind of bottleneck.
if (completion_->intrusiveStatus().fetch_sub(1, std::memory_order_acq_rel) == 1) {
completion_->notify(0);
}
});
}
}