void execute()

in dispenso/detail/pipeline_impl.h [254:271]


  void execute() {
    ssize_t numThreads = std::min(tasks_.numPoolThreads(), StageLimits<CurStage>::limit(stage_));
    completion_ = std::make_unique<CompletionEventImpl>(static_cast<int>(numThreads));
    for (ssize_t i = 0; i < numThreads; ++i) {
      tasks_.schedule([this]() {
        while (auto op = stage_()) {
          pipeNext_.execute(std::move(op.value()));
        }
        // fetch_sub returns the previous value, so if it was 1, that means no items are left.
        // notify wouldn't technically require the value to be set, since the underlying status
        // is already zero, but we just use the current notify interface, as this is unlikely to be
        // any kind of bottleneck.
        if (completion_->intrusiveStatus().fetch_sub(1, std::memory_order_acq_rel) == 1) {
          completion_->notify(0);
        }
      });
    }
  }