void TaskSubprocessQueue::waitForSubprocessAndPipes()

in bistro/processes/TaskSubprocessQueue.cpp [132:317]


void TaskSubprocessQueue::waitForSubprocessAndPipes(
    const cpp2::RunningTask& rt,
    // Mutated: we add a per-task log line rate-limiter
    std::shared_ptr<detail::TaskSubprocessState> state,
    folly::Subprocess&& proc,
    TaskSubprocessQueue::StatusCob status_cob) noexcept {
    // Below, the `noexcept` guarantees that we don't delete from tasks_ twice

  // The pipe & subprocess handlers must run in the same EventBase thread,
  // since they mutate the same state.  Also, the rate-limiter
  // initialization below depends on `evb` running the present function.
  auto evb = folly::EventBaseManager::get()->getExistingEventBase();
  CHECK(evb);  // The thread pool should have made the EventBase.

  // Take ownership of proc's pipes, and set up their log processing on the
  // EventBase.  Produce 'pipe closed' futures.
  //
  // Technically, this block could be outside of the `noexcept`, but it
  // should not throw anyhow, so leave it in the same function.
  std::vector<folly::Future<folly::Unit>> pipe_closed_futures;
  std::vector<std::shared_ptr<AsyncReadPipe>> pipes;  // For the rate-limiter
  for (auto&& p : proc.takeOwnershipOfPipes()) {
    int child_fd = p.childFd;
    auto job = *rt.job_ref();
    auto node = *rt.node_ref();
    pipes.emplace_back(asyncReadPipe(
      evb,
      std::move(p.pipe),
      readPipeLinesCallback([
        this, job, node, child_fd, state
      ](AsyncReadPipe*, folly::StringPiece s) {
        if (s.empty()) {
          // Skip the empty line at the end of \n-terminated files
          return true;
        }

        // Enforce log line rate limits per task, rather than per job, since
        // it's cleaner.  For hacky per-job limits, treat log throughput as
        // a statically-allocated job resource.
        auto* rate_lim = state->pipeRateLimiter_.get();
        // This cannot fire, since these AsyncReadPipes are being created
        // from the same EventBase that will run them, and therefore
        // state->pipeRateLimiter_ will be set first, below.
        CHECK(rate_lim);
        if (child_fd == STDERR_FILENO) {
          logWriter_->write(LogTable::STDERR, job, node, s);
          rate_lim->reduceQuotaBy(1);
        } else if (child_fd == STDOUT_FILENO) {
          logWriter_->write(LogTable::STDOUT, job, node, s);
          rate_lim->reduceQuotaBy(1);
        } else if (child_fd == childStatusPipePlaceholder_.fd()) {
          // Discard delimiter since status parsing assumes it's gone.
          s.removeSuffix("\n");
          // Only use the last status line, no disk IO -- no line quota
          state->rawStatus_.assign(s.data(), s.size());
        }
        // Else: we don't care about other FDs -- in fact, the only other FD
        // it could reasonably be is the canary pipe, and we won't indulge
        // the children by paying attention to the junk they write there.
        return true;  // Keep reading
      },
      65000)  // Limit line length to protect the log DB
    ));
    pipe_closed_futures.emplace_back(pipes.back()->pipeClosed());
  }
  // Gets to run before any line callbacks, since the current scope was
  // running on the callbacks' EventBase before the callbacks even existed.
  state->pipeRateLimiter_.reset(new AsyncReadPipeRateLimiter(
      evb,
      pollMs(state->opts()), // Poll at the Subprocess's rate
      *state->opts().maxLogLinesPerPollInterval_ref(),
      std::move(pipes)));

  // Add callbacks to wait for the subprocess and pipes on the same EventBase.
  collectAll(
      // 1) The moral equivalent of waitpid(), followed by wait for cgroup.
      asyncSubprocess( // Never yields an exception
          evb,
          std::move(proc),
          // Unlike std::bind, explicitly keeps the state alive.
          [rt, state](folly::Subprocess& p) {
            state->asyncSubprocessCallback(rt, p);
          },
          pollMs(state->opts()))
          .thenValue([ this, rt, state, evb ](
              folly::ProcessReturnCode && rc) noexcept {
            logEvent(
                google::INFO,
                rt,
                state.get(),
                "process_exited",
                folly::dynamic::object("message", rc.str())); // noexcept
            // Now that the child has exited, optionally use cgroups to kill,
            // and/or wait for, its left-over descendant processes.
            if (state->opts().cgroupOptions_ref()->subsystems_ref()->empty()) {
              // No cgroups exist: shortcut early to avoid logging
              // "cgroups_reaped".
              return folly::Future<folly::Unit>();
            }
            // Usually, well-behaved descendants will have exited by this point.
            // This reaper will write logspam, and wait until all processes in
            // each of the task's cgroups have exited.  If the `freezer`
            // subsystem is available, or if `killWithoutFreezer` is set, the
            // reaper will also repeatedly send them SIGKILL.
            //
            // This future fires only when the task cgroups lose all their
            // tasks.
            return asyncCGroupReaper(
                       evb,
                       *state->opts().cgroupOptions_ref(),
                       state->cgroupName(),
                       pollMs(state->opts()))
                .thenTry(
                    [ this, rt, state ](folly::Try<folly::Unit> t) noexcept {
                      t.throwUnlessValue(); // The reaper never thows, crash if it
                                         // does.
                      logEvent(google::INFO, rt, state.get(), "cgroups_reaped");
                    });
          }),
      // 2) Wait for the child to close all pipes
      collectAll(pipe_closed_futures)
          .toUnsafeFuture()
          .thenValue([ this, rt, state ](
              std::vector<folly::Try<folly::Unit>> &&
              all_closed) noexcept { // Logs and swallows all exceptions
            for (auto& try_pipe_closed : all_closed) {
              try {
                // DO: Use folly::exception_wrapper once wangle supports it.
                try_pipe_closed.throwUnlessValue();
              } catch (const std::exception& e) {
                // Carry on, the pipe is known to be closed. Logging is
                // noexcept.
                logEvent(
                    google::ERROR,
                    rt,
                    state.get(),
                    "task_pipe_error",
                    folly::dynamic::object("message", e.what())); // noexcept
              }
            }
            logEvent(
                google::INFO, rt, state.get(), "task_pipes_closed"); // noexcept
          }))
      .toUnsafeFuture()
      .thenValue([ this, rt, status_cob, state ](
          // Safe to ignore exceptions, since the above callbacks are noexcept.
          std::tuple<folly::Try<folly::Unit>, folly::Try<folly::Unit>> &&
          // `noexcept` since this is the final handler -- nothing inspects
          // its result.
          ) noexcept {
        // Parse the task's status string, if it produced one. Note that in
        // the absence of a status, our behavior is actually racy.  The task
        // might have spontaneously exited just before Bistro tried to kill
        // it.  In that case, wasKilled() would be true, and we would
        // "incorrectly" report incompleteBackoff().  This is an acceptable
        // tradeoff, since we *need* both sides of the wasKilled() branch:
        //  - We must decrement the retry counter on spontaneous exits
        //    with no status (e.g. C++ program segfaults).
        //  - We must *not* decrement the retry counter for killed tasks that
        //    do not output a status (i.e. no SIGTERM handler), since
        //    preemption should be maximally transparent.
        auto status = parseStatus(state.get());
        // Log and report the status
        logEvent(
            google::INFO,
            rt,
            state.get(),
            "got_status",
            // The NoTime variety omits "backoff_duration", but we lack it
            // anyway, since only TaskStatusSnapshot calls TaskStatus::update().
            folly::dynamic::object("status", status.toDynamicNoTime()));
        status_cob(rt, std::move(status)); // noexcept
        // logEvent ignores healthcheck messages; print a short note instead.
        if (*rt.job_ref() == kHealthcheckTaskJob) {
          LOG(INFO) << "Healthcheck started at "
                    << *rt.invocationID_ref()->startTime_ref()
                    << " quit with status '" << status.toJson() << "'";
        }
        SYNCHRONIZED(
            tasks_) { // Remove the completed task's state from the map.
          if (tasks_.erase(makeInvocationID(rt)) == 0) {
            LOG(FATAL) << "Missing task: " << apache::thrift::debugString(rt);
          }
        }
      });
}