in bistro/runners/RemoteWorkerRunner.cpp [574:769]
TaskRunnerResponse RemoteWorkerRunner::runTaskImpl(
const std::shared_ptr<const Job>& job,
const Node& node,
cpp2::RunningTask& rt,
folly::dynamic& job_args,
function<void(const cpp2::RunningTask& rt, TaskStatus&& status)> cb
) noexcept {
if (inInitialWait_.load(std::memory_order_relaxed)) {
return DoNotRunMoreTasks;
}
auto config = config_.copy();
CHECK(config) << "Cannot runTask before updateConfig";
// Make a copy so we don't have to lock workers_ while waiting for Thrift.
cpp2::BistroWorker worker;
// unused initialization to hush a Clang warning:
int64_t did_not_run_sequence_num = 0;
// These will modify the task's cgroupOptions if we find a worker.
int16_t cgroup_cpu_shares = 0;
int64_t cgroup_memory_limit_in_bytes = 0;
// Always lock workerResources_ first, then workers_
SYNCHRONIZED(workerResources_) {
SYNCHRONIZED(workers_) {
CHECK(workerLevel_ != StringTable::NotFound)
<< "updateConfig must always be invoked before runTask";
// Whichever selector is currently in use is protected by the locks on
// WorkerResources and RemoteWorkers, so no extra mutex is needed.
auto response = RemoteWorkerSelector::getSingleton(
// Since we use a memoized config, the selector type might be a bit
// stale, but that's no big deal. We need the stale config anyway,
// since its worker level ID is the correct index into
// workerResources_.
config->remoteWorkerSelectorType
)->findWorker(
config.get(),
*job,
node,
workerLevel_, // Needed for checking worker-level job filters.
monitor_.get(),
&workerResources_,
&workers_,
&worker,
&did_not_run_sequence_num
);
if (response != RanTask) {
return response;
}
*rt.workerShard_ref() = *worker.shard_ref();
job_args["worker_node"] = *rt.workerShard_ref();
job_args["worker_host"] = *worker.machineLock_ref()->hostname_ref();
// Add worker resources to rt **before** recording this task as running.
// Future: support this in LocalRunner mode as well.
auto& resources_by_node = job_args["resources_by_node"];
CHECK(
resources_by_node.find(*rt.workerShard_ref()) ==
resources_by_node.items().end())
<< "Cannot have both a node and a worker named: "
<< *rt.workerShard_ref()
<< " -- if you are running a worker and the central scheduler on the "
<< "same host, you should specify --instance_node_name global.";
if (const auto* nr = addNodeResourcesToRunningTask(
&rt,
&resources_by_node,
// This config is stale, but it's consistent with
// workerResources_.
*config,
*rt.workerShard_ref(),
workerLevel_,
job->resources())) {
for (const auto& r : *nr->resources_ref()) {
auto phys_it = config->logicalToPhysical.find(*r.name_ref());
if (phys_it != config->logicalToPhysical.end()) {
const auto& p = // at() is like CHECK, since this is `noexcept`
config->physicalResourceConfigs.at(phys_it->second);
if (*p.physical_ref() == cpp2::PhysicalResource::RAM_MBYTES &&
*p.enforcement_ref() ==
cpp2::PhysicalResourceEnforcement::HARD) {
logicalToIntPhysicalResource(
&cgroup_memory_limit_in_bytes, p, r
);
} else if (
*p.physical_ref() == cpp2::PhysicalResource::CPU_CORES &&
*p.enforcement_ref() ==
cpp2::PhysicalResourceEnforcement::SOFT) {
logicalToIntPhysicalResource(&cgroup_cpu_shares, p, r);
// Multiply by 2 here since cgroup cpu.shares must be >= 2.
clampPhysical(&cgroup_cpu_shares, 2*cgroup_cpu_shares, r);
}
}
}
}
// Mark the task 'running' before we unlock workerResources_, because
// otherwise an intervening updateConfig() could free the resources we
// just grabbed.
auto status = TaskStatus::running(
// Pass some additional metadata to TaskStatusObservers that log
make_shared<folly::dynamic>(
folly::dynamic::object("shard", *worker.shard_ref())(
"hostname", *worker.machineLock_ref()->hostname_ref())(
"port", *worker.machineLock_ref()->port_ref())));
workers_.mutableWorkerOrAbort(*worker.shard_ref())
->recordRunningTaskStatus(rt, status);
// IMPORTANT: Update TaskStatuses **inside** the workers_ lock,
// otherwise e.g. a worker could get lost before cb runs, failing the
// TaskStatusSnapshot::updateStatus() check for "when a task is not
// running, the received status is not overwritable".
cb(rt, std::move(status));
}
}
// Now we have a healthy worker with resources we already grabbed. It may
// get marked unhealthy or even lost, since workers_ is now unlocked, but
// we just have to try our luck.
eventBase_->runInEventBaseThread([
cb, this, job, rt, job_args, worker, did_not_run_sequence_num,
cgroup_cpu_shares, cgroup_memory_limit_in_bytes
]() noexcept {
try {
shared_ptr<cpp2::BistroWorkerAsyncClient> client =
workerClientFn_(eventBase_.get(), *worker.addr_ref());
auto task_subproc_opts = job->taskSubprocessOptions();
*task_subproc_opts.cgroupOptions_ref()->cpuShares_ref() =
cgroup_cpu_shares;
*task_subproc_opts.cgroupOptions_ref()->memoryLimitInBytes_ref() =
cgroup_memory_limit_in_bytes;
client->runTask(
unique_ptr<RequestCallback>(new FunctionReplyCallback(
[ this, cb, client, rt, worker, did_not_run_sequence_num ](
ClientReceiveState && state) noexcept {
// TODO(#5025478): Convert this, and the other recv_* calls in
// this file to use recv_wrapped_*.
try {
client->recv_runTask(state);
} catch (const cpp2::BistroWorkerException& e) {
LOG(ERROR) << "Worker never started task " << debugString(rt)
<< ": " << *e.message_ref();
// Okay to mark the task "not running" since we know for sure
// that the worker received & processed our request, and
// decided not to run it.
SYNCHRONIZED(workers_) {
auto status = TaskStatus::neverStarted(*e.message_ref());
workers_.mutableWorkerOrAbort(*worker.shard_ref())
->recordFailedTask(rt, status);
// IMPORTANT: Update TaskStatuses **inside** the workers_
// lock
cb(rt, std::move(status));
}
} catch (const std::exception& e) {
// The task may or may not be running, so do NOT invoke the
// callback, or the scheduler might schedule duplicate tasks,
// exceed resource limits, etc.
LOG(ERROR) << "The runTask request hit an error on "
<< debugString(rt)
<< ", will have to poll to find if the "
"task is running: "
<< e.what();
SYNCHRONIZED(workers_) {
workers_.mutableWorkerOrAbort(*worker.shard_ref())
->addUnsureIfRunningTask(rt);
}
}
})),
rt,
folly::toJson(job_args),
job->command().empty()
? std::vector<std::string>{/*--worker_command*/}
: job->command(),
schedulerID_,
*worker.id_ref(),
// The real sequence number may have been incremented after we found
// the worker, but this is okay, the worker simply rejects the task.
did_not_run_sequence_num,
std::move(task_subproc_opts));
} catch (const exception& e) {
// We can get here if client creation failed (e.g. AsyncSocket could
// not resolve the hostname), or if the runTask request creation
// failed. The latter can __probably__ only happen if the connection
// is dead at the time of the request.
//
// The key part is that in both cases, it ought to be true that the
// request never reached the worker, and since the task is not
// running, we can invoke the callback.
LOG(ERROR) << "Error connecting to the worker: " << e.what();
SYNCHRONIZED(workers_) {
auto status = TaskStatus::neverStarted(e.what());
workers_.mutableWorkerOrAbort(*worker.shard_ref())
->recordFailedTask(rt, status);
// IMPORTANT: Update TaskStatuses **inside** the workers_ lock
cb(rt, std::move(status));
}
}
});
return RanTask;
}