in source/neuropod/multiprocess/multiprocess_worker.cc [35:135]
void multiprocess_worker_loop(const std::string &control_queue_name)
{
// Open the control channels
IPCControlChannel control_channel(control_queue_name, WORKER_PROCESS);
// A pointer to a neuropod (that will be loaded)
std::unique_ptr<Neuropod> neuropod;
std::shared_ptr<NeuropodTensorAllocator> allocator;
// A map to store the inputs
NeuropodValueMap inputs;
while (true)
{
// Get a message
auto received = control_channel.recv_message();
auto msg_type = received.get_payload_type();
try
{
if (msg_type == LOAD_NEUROPOD)
{
ope_load_config config;
received.get(config);
// Override some options
auto &opts = config.opts;
opts.load_model_at_construction = true;
opts.use_ope = false;
// Load a neuropod
neuropod = stdx::make_unique<Neuropod>(config.neuropod_path, config.default_backend_overrides, opts);
allocator = neuropod->get_tensor_allocator();
inputs.clear();
control_channel.send_message(LOAD_SUCCESS);
}
else if (msg_type == ADD_INPUT)
{
NeuropodValueMap tmp;
received.get(tmp);
for (auto &item : tmp)
{
// Wrap in a tensor type that this neuropod expects
inputs[item.first] =
wrap_existing_tensor(*allocator, std::dynamic_pointer_cast<NeuropodTensor>(item.second));
}
}
else if (msg_type == INFER)
{
// Get the requested tensor names
std::vector<std::string> requested_outputs;
received.get(requested_outputs);
// Run inference
auto outputs = neuropod->infer(inputs, requested_outputs);
// Turn these "native" tensors into shm tensors
NeuropodValueMap transformed_outputs;
for (const auto &entry : *outputs)
{
// Unfortunately, this requires a copy (done within SHMNeuropodTensor)
auto shm_tensor = wrap_existing_tensor<SHMNeuropodTensor>(
std::dynamic_pointer_cast<NeuropodTensor>(entry.second));
// This ensures that the tensor stays around long enough for the other process to load it
transformed_outputs[entry.first] = shm_tensor;
}
control_channel.send_message_move(RETURN_OUTPUT, std::move(transformed_outputs));
// Clean up any unused shm tensors that haven't been reused
shm_allocator.free_unused_shm_blocks();
// Empty the inputs set. This is done after sending outputs back to the main process
// because this takes a nontrivial amount of time
inputs.clear();
}
else if (msg_type == SHUTDOWN)
{
break;
}
else
{
NEUROPOD_ERROR("OPE: Unhandled message type: {}", msg_type);
}
}
catch (const std::exception &e)
{
// Send the exception info back to the main process
std::string msg = e.what();
control_channel.send_message(EXCEPTION, msg);
}
catch (...)
{
control_channel.send_message(EXCEPTION, "An unknown exception occurred during inference");
}
SPDLOG_TRACE("OPE: BOTTOM OF WORKER LOOP");
}
}