in src/simulator/task_utils_parallel.cpp [73:198]
std::vector<::task::TaskSimulation> simulateTasksInParallel(
const std::vector<::task::Task>& tasks, const int num_workers,
const int num_steps, const int stride) {
if (num_workers <= 0) {
// Run single-process version.
std::vector<::task::TaskSimulation> simulations;
for (const ::task::Task& task : tasks) {
simulations.push_back(simulateTask(task, num_steps, stride));
}
return simulations;
}
std::vector<int> pids;
// We need to create a shared memory where simulation results will be
// written. The size of the buffer should be static. So we assume that the
// sizes of scenes do not change during simulation, we need space for all
// objects fields in TaskSimulation up to num_steps plus the actual number of
// steps that were simulated until solution.
// sharedBuffers layout looks likes the following:
// SerializedScene scenes[num_steps];
// bool solvedStates[num_steps];
// bool isSolution;
// int actualNumSteps;
// int stepsSimulated;
std::vector<uint8_t*> sharedBuffers;
std::vector<SerializedTaskSimulation> sharedBufferLayouts;
std::vector<size_t> sceneSizes;
std::vector<size_t> bufferSizes;
for (const auto& task : tasks) {
const size_t sceneSize = serialize(task.scene).size();
const size_t sz =
(sceneSize + sizeof(uint8_t)) * num_steps + sizeof(uint8_t) * 2;
sceneSizes.push_back(sceneSize);
bufferSizes.push_back(sz);
sharedBuffers.push_back(static_cast<uint8_t*>(sharedMalloc(sz)));
SerializedTaskSimulation layout;
layout.scenes = sharedBuffers.back();
layout.solvedStates = layout.scenes + num_steps * sceneSizes.back();
layout.isSolution = layout.solvedStates + num_steps * sizeof(bool);
layout.actualNumSteps =
reinterpret_cast<int*>(layout.isSolution + sizeof(bool));
layout.stepsSimulated =
reinterpret_cast<int*>(layout.actualNumSteps + sizeof(int));
sharedBufferLayouts.push_back(layout);
}
for (size_t workerId = 0; workerId < num_workers; ++workerId) {
const int pid = fork();
if (pid == 0) {
// Worker. Run simulations and save to the shared buffer.
for (size_t taskId = workerId; taskId < tasks.size();
taskId += num_workers) {
const ::task::TaskSimulation simulation =
simulateTask(tasks[taskId], num_steps, stride);
const int actualNumSteps = simulation.sceneList.size();
const SerializedTaskSimulation& layout = sharedBufferLayouts[taskId];
for (size_t step = 0; step < actualNumSteps; ++step) {
const ::scene::Scene& scene = simulation.sceneList[step];
const std::vector<uint8_t> serializedScene = serialize(scene);
if (serializedScene.size() != sceneSizes[taskId]) {
exit(3);
}
std::copy_n(serializedScene.data(), serializedScene.size(),
layout.scenes + sceneSizes[taskId] * step);
}
for (size_t step = 0; step < actualNumSteps; ++step) {
layout.solvedStates[step] =
static_cast<uint8_t>(simulation.solvedStateList[step]);
}
*layout.isSolution = static_cast<uint8_t>(simulation.isSolution);
*layout.actualNumSteps = actualNumSteps;
*layout.stepsSimulated = simulation.stepsSimulated;
}
exit(0);
} else if (pid < 0) {
// Error.
std::cout << "FATAL: Fork failed!" << std::endl;
exit(2);
} else {
// Parent. Save pid and carry on.
pids.push_back(pid);
}
}
for (const int pid : pids) {
int status;
if (waitpid(pid, &status, 0) != -1) {
if (WIFEXITED(status)) {
int returned = WEXITSTATUS(status);
if (returned != 0) {
std::cout << "FATAL: Worker exited with failure status: " << returned
<< std::endl;
exit(5);
}
} else {
std::cout << "FATAL: Worker died unexpectedly" << std::endl;
exit(5);
}
} else {
std::perror("FATAL: waitpid() failed");
exit(5);
}
}
std::vector<::task::TaskSimulation> simulationBatch(tasks.size());
for (size_t i = 0; i < simulationBatch.size(); ++i) {
const SerializedTaskSimulation& layout = sharedBufferLayouts[i];
const int actualNumSteps = *layout.actualNumSteps;
std::vector<::scene::Scene> scenes(actualNumSteps);
for (int step = 0; step < actualNumSteps; ++step) {
const uint8_t* start = layout.scenes + sceneSizes[i] * step;
scenes[step] =
deserialize(std::vector<uint8_t>(start, start + sceneSizes[i]));
}
const std::vector<bool> solvedStates(
reinterpret_cast<bool*>(layout.solvedStates),
reinterpret_cast<bool*>(layout.solvedStates + actualNumSteps));
const bool solved = *layout.isSolution;
simulationBatch[i].__set_sceneList(scenes);
simulationBatch[i].__set_solvedStateList(solvedStates);
simulationBatch[i].__set_isSolution(solved);
simulationBatch[i].__set_stepsSimulated(*layout.stepsSimulated);
sharedFree(sharedBuffers[i], bufferSizes[i]);
}
return simulationBatch;
}