void schedule()

in dispenso/detail/pipeline_impl.h [43:77]


    void schedule(F&& fPipe) {
      outstanding_.fetch_add(1, std::memory_order_acq_rel);

      if (unlimited_) {
        tasks_.schedule([this, fPipe = std::move(fPipe)]() mutable {
          fPipe([]() {});
          outstanding_.fetch_sub(1, std::memory_order_acq_rel);
        });
        return;
      }

      DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_BEGIN();
      queue_.enqueue([this, fPipe = std::move(fPipe)]() mutable {
        fPipe([this]() {
          OnceFunction func;
          if (queue_.try_dequeue(func)) {
            tasks_.schedule(std::move(func));
          } else {
            resources_.fetch_add(1, std::memory_order_acq_rel);
          }
        });
        outstanding_.fetch_sub(1, std::memory_order_acq_rel);
      });
      DISPENSO_TSAN_ANNOTATE_IGNORE_WRITES_END();

      while (resources_.fetch_sub(1, std::memory_order_acq_rel) > 0) {
        OnceFunction func;
        if (queue_.try_dequeue(func)) {
          tasks_.schedule(std::move(func));
        } else {
          break;
        }
      }
      resources_.fetch_add(1, std::memory_order_acq_rel);
    }