void TaskSubprocessQueue::runTask()

in bistro/processes/TaskSubprocessQueue.cpp [347:465]


void TaskSubprocessQueue::runTask(
    const cpp2::RunningTask& rt,
    const std::vector<std::string>& cmd,
    const std::string& job_arg,
    const boost::filesystem::path& working_dir,
    StatusCob status_cob,
    ResourceCob resource_cob,
    cpp2::TaskSubprocessOptions opts) {

  // Set up the task's map entry up-front, because it is a usage error to
  // start two tasks with the same invocation ID.  It also must be done
  // before setting up the callbacks, since those expect tasks_ to be ready,
  // and *may* run immediately.  Also, the destructor relies on tasks_ being
  // added to synchronously.
  auto state = std::make_shared<detail::TaskSubprocessState>(
    rt,
    std::move(opts),
    &tasksResourceMonitor_,  // Outlives task states
    std::move(resource_cob)
  );
  SYNCHRONIZED(tasks_) {
    if (!tasks_.emplace(makeInvocationID(rt), state).second) {
      throw BistroException(
        "Task already running: ", apache::thrift::debugString(rt)
      );
    }
  }

  // Passing 'this' is not too scary, since the EventBase threads are the
  // first to be stopped / destroyed on this object.
  threadPool_.add([this, rt, cmd, job_arg, working_dir, status_cob, state]() {
    std::string debug_cmd{"unknown"};
    try {
      // Populate `debug_cmd` first, since this shouldn't throw, and it's
      // required for nice exception logging.
      auto pipe_filename =  // Unlike /proc, /dev/fd works on Linux and OS X
        folly::to<std::string>("/dev/fd/", childStatusPipePlaceholder_.fd());
      std::vector<std::string> args{*rt.node_ref(), pipe_filename, job_arg};
      debug_cmd = folly::to<std::string>(
        '[', escapeShellArgsInsecure(cmd), "] + [",
        escapeShellArgsInsecure(args), ']'
      );

      boost::system::error_code ec;
      if (!boost::filesystem::exists(working_dir, ec)) {
        boost::filesystem::create_directories(working_dir, ec);
      }
      if (ec) {
        throw BistroException(
            "Failed to make working directory: ", ec.message());
      }

      CHECK(cmd.size() >= 1);
      std::vector<std::string> full_cmd{cmd};
      full_cmd.insert(full_cmd.end(), args.begin(), args.end());
      logEvent(google::INFO, rt, state.get(), "running", folly::dynamic::object
        ("command", folly::dynamic(full_cmd.begin(), full_cmd.end()))
      );

      auto opts = folly::Subprocess::Options().pipeStdout().pipeStderr()
        .chdir(working_dir.native())
        .fd(childStatusPipePlaceholder_.fd(), folly::Subprocess::PIPE_OUT);
      if (*state->opts().useCanaryPipe_ref()) {
        // It's much easier for us to get the read end of the pipe, since we
        // can just use AsyncReadPipe to track its closing.
        opts.fd(childCanaryPipePlaceholder_.fd(), folly::Subprocess::PIPE_OUT);
      }
      if (*state->opts().parentDeathSignal_ref() != 0) {
        opts.parentDeathSignal(*state->opts().parentDeathSignal_ref());
      }
      if (*state->opts().processGroupLeader_ref()) {
        opts.processGroupLeader();
      }
      // folly::Subprocess() can throw, and that's ok. Do not make `proc`
      // inline with waitForSubprocessAndPipes to ensure that `state` is
      // safe to use in the `catch` below.
      auto proc = [&state, &full_cmd](
        // Pass by r-value, since opts will contain an invalid pointer after
        // add_to_cgroups is destroyed below.
        folly::Subprocess::Options&& options
      ) {
        if (state->cgroupName().empty()) {
          return folly::Subprocess(full_cmd, options);
        }
        LOG(INFO) << "Making task cgroups named " << state->cgroupName();
        // This must live only until folly::Subprocess's constructor exits.
        AddChildToCGroups add_to_cgroups(cgroupSetup(
            state->cgroupName(), *state->opts().cgroupOptions_ref()));
        options.dangerousPostForkPreExecCallback(&add_to_cgroups);
        return folly::Subprocess(full_cmd, options);
      }(std::move(opts));
      // IMPORTANT: This call must be last in the block and `noexcept` to
      // ensure that the below "remove from tasks_" cannot race the "remove
      // from tasks_" in the "subprocess exited and pipes closed" callback.
      waitForSubprocessAndPipes(
        rt, std::move(state), std::move(proc), std::move(status_cob)
      );
    } catch (const std::exception& e) {
      SYNCHRONIZED(tasks_) {  // Remove the task -- its callback won't run.
        if (tasks_.erase(makeInvocationID(rt)) == 0) {
          LOG(FATAL) << "Missing task: " << apache::thrift::debugString(rt);
        }
      }
      auto msg = folly::to<std::string>(
        "Failed to start task ", apache::thrift::debugString(rt), " / ",
        debug_cmd, ": ", e.what()
      );
      auto status = TaskStatus::errorBackoff(msg);
      // Although `state` is moved above, this is ok, since our move
      // constructors and waitForSubprocessAndPipes must be noexcept.
      logEvent(
        google::ERROR, rt, state.get(), "task_failed_to_start",
        folly::dynamic::object("status", status.toDynamicNoTime())
      );  // noexcept
      // Advances the backoff/retry counter, so we eventually give up.
      status_cob(rt, std::move(status));  // noexcept
    }
  });
}