in bistro/worker/BistroWorkerHandler.cpp [217:371]
void BistroWorkerHandler::runTask(
const cpp2::RunningTask& rt,
const string& config,
const vector<string>& cmd,
const cpp2::BistroInstanceID& scheduler,
const cpp2::BistroInstanceID& worker,
int64_t notify_if_tasks_not_running_sequence_num,
const cpp2::TaskSubprocessOptions& opts) {
throwIfSuicidal();
// 100 ms per log => 10 tasks/sec
AutoTimer<> timer("runTask was slow", std::chrono::milliseconds{100});
bool isHealthcheck = *rt.job_ref() == kHealthcheckTaskJob;
// Tells the scheduler that we aren't even going to try running this. Run
// healthchecks even if we're unhealthy, because otherwise we'd never be
// able to recover from missing a healthcheck.
if (state_->state_ != RemoteWorkerState::State::HEALTHY && !isHealthcheck) {
throw BistroWorkerException(
"Unhealthy, not running task: ", *rt.job_ref(), ", ", *rt.node_ref());
}
// Accept "new worker" healthchecks even if the scheduler ID doesn't
// match. This is important since those healthchecks often arrive just
// before the initial heartbeat response (so we still have the old
// scheduler ID). Accepting them prevents unnecessary worker loss, and
// speeds up failovers.
//
// If multiple schedulers concurrently try to talk to this worker (e.g.
// due to operational errors), we don't want to blindly accept
// healthchecks from both, as 50+% of the checks would fail. Since we
// only accept "new worker" healthchecks here, this risk is very low.
//
// TODO: An alternative is for every heartbeat to generate a fresh
// 'alternative' scheduler ID on the worker side, which serves to
// authenticate the scheduler in the common case of the the healthcheck's
// runTask arriving before the next heartbeat.
if (isHealthcheck && worker == *worker_.id_ref() &&
*rt.node_ref() == kHealthcheckTaskNewWorkerNode &&
scheduler != *schedulerState_->id_ref()) {
LOG(INFO) << "Accepting a 'new worker' healthcheck from an unknown "
<< "scheduler, in the hopes that its heartbeat response is just about "
<< "to arrive.";
// It may be reasonable to set *scheduler into schedulerState_->id here...
} else {
throwOnInstanceIDMismatch("runTask", scheduler, worker);
}
if (isHealthcheck) {
// This means that a worker will not know its usable physical resources
// until it receives the first healthcheck, and that any cgroup config
// changes (which should be extremely rare) will only propagate to
// workers during healthchecks. On the bright side, this means that
// cgroup configs *can* be changed at runtime, and that the lock on
// systemCGroupOpts_ causes a minimal amount of contention, since
// healthchecks should not be too frequent.
SYNCHRONIZED(usablePhysicalResources_) {
// Future: Ideally, the refresh interval could also be changed at
// run-time, and it would be possible to add a "log" callback to
// display the new system resources whenever the cgroups change.
// However, both are too much hassle with the current PeriodicPoller,
// and it's too much hassle to roll a custom poller.
if (!usablePhysicalResources_.monitor_) {
LOG(WARNING) << "CGroups set: "
<< debugString(*opts.cgroupOptions_ref());
usablePhysicalResources_.monitor_ =
std::make_unique<UsablePhysicalResourceMonitor>(
CGroupPaths(*opts.cgroupOptions_ref(), folly::none),
FLAGS_physical_resources_subprocess_timeout_ms,
std::chrono::seconds(
FLAGS_refresh_usable_physical_resources_sec));
usablePhysicalResources_.cgroupOpts_ = *opts.cgroupOptions_ref();
} else if (
*opts.cgroupOptions_ref() != usablePhysicalResources_.cgroupOpts_) {
LOG(WARNING) << "CGroups changed: "
<< debugString(*opts.cgroupOptions_ref());
usablePhysicalResources_.monitor_->updateCGroupPaths(
CGroupPaths(*opts.cgroupOptions_ref(), folly::none));
usablePhysicalResources_.cgroupOpts_ = *opts.cgroupOptions_ref();
}
}
LOG(INFO) << "Queueing healthcheck started at "
<< *rt.invocationID_ref()->startTime_ref();
} else {
LOG(INFO) << "Queueing task: " << debugString(rt);
}
SYNCHRONIZED_CONST(state_) { SYNCHRONIZED(runningTasks_) {
// Don't run tasks sent before the most recent notifyIfTasksNotRunning,
// except healthchecks, which don't use notifyIfTasksNotRunning anyway.
if (
!isHealthcheck && notify_if_tasks_not_running_sequence_num <
state_.notifyIfTasksNotRunningSequenceNum_
) {
throw BistroWorkerException(
"Not running task ", debugString(rt), " since its "
"notifyIfTasksNotRunning sequence number is ",
notify_if_tasks_not_running_sequence_num, " < ",
state_.notifyIfTasksNotRunningSequenceNum_
);
}
// Mark the task as "running", if it was not already running.
auto it_and_success =
runningTasks_.emplace(TaskID{*rt.job_ref(), *rt.node_ref()}, rt);
if (!it_and_success.second) {
if (isHealthcheck) {
// The scheduler doesn't track healtchecks, it might send duplicates.
throw BistroWorkerException("Another healtcheck is already running.");
}
CHECK(
*it_and_success.first->second.invocationID_ref() !=
*rt.invocationID_ref())
<< "RunningTask was runTask'd more than once: " << debugString(rt);
// This can happen if updateStatus succeeds on the scheduler, and
// fails on the worker. In this case, the scheduler might send the
// next invocation of the same task before the next updateStatus retry.
throw BistroWorkerException(
"Tried to runTask when another invocation of this task is running: ",
debugString(rt)
);
}
} }
timer.log("runTask setup was slow");
taskQueue_.runTask(
rt,
cmd.empty() ? vector<string>{workerCommand_} : cmd,
config, // Job config argument -- DO: elide the extra copy?
jobsDir_ / *rt.job_ref(), // Working directory for the task
[this](const cpp2::RunningTask& runningTask, TaskStatus&& status) noexcept {
// 10 tasks / sec
folly::AutoTimer<> updateQueueTimer(
"Task update queue was slow", std::chrono::milliseconds{100});
notifyFinishedQueue_.blockingWrite(std::make_unique<NotifyData>(
TaskID{*runningTask.job_ref(), *runningTask.node_ref()},
std::move(status)));
logStateTransitionFn_("completed_task", worker_, &runningTask);
},
[this](
const cpp2::RunningTask& runningTask, cpp2::TaskPhysicalResources&& res
) noexcept {
SYNCHRONIZED(runningTasks_) {
auto it = runningTasks_.find({*runningTask.job_ref(), *runningTask.node_ref()});
CHECK (it != runningTasks_.end()) << "Bad task: "
<< debugString(runningTask);
it->second.physicalResources_ref().value_unchecked() = std::move(res);
apache::thrift::ensure_isset_unsafe(it->second.physicalResources_ref());
}
},
opts
);
logStateTransitionFn_("queued_task", worker_, &rt);
}