in src/runtime/simulator.cc [328:631]
float Simulator::simulate_runtime(const FFModel* model,
const std::map<Op*, ParallelConfig>& global,
CompMode comp_mode,
std::string const &export_file_name)
{
// printf("%s\n", machine->to_string().c_str());
task_manager->reset();
// Step 1: register forward and backward tasks
for (size_t l = 0; l < model->layers.size(); l++) {
Op* op = model->layers[l];
ParallelConfig config = global.find(op)->second;
CostMetrics cost_metrics = measure_operator_cost(op, config);
float forward_time = cost_metrics.forward_time;
float backward_time = cost_metrics.backward_time;
for (int j = 0; j < config.num_parts(); j++) {
SimTask* task1 = task_manager->new_forward_task(op, j);
task1->device = machine->get_gpu(config.device_ids[j]);
task1->mem = machine->get_gpu_fb_mem(config.device_ids[j]);
task1->run_time = forward_time;
if (comp_mode == COMP_MODE_TRAINING) {
SimTask* task2 = task_manager->new_backward_task(op, j);
task2->device = machine->get_gpu(config.device_ids[j]);
task2->mem = machine->get_gpu_fb_mem(config.device_ids[j]);
task2->run_time = backward_time;
task1->add_next_task(task2);
}
}
}
// Step 2: insert dependencies and comm. tasks before compute tasks
for (size_t l = 0; l < model->layers.size(); l++) {
Op* op = model->layers[l];
ParallelConfig config = global.find(op)->second;
for (int j = 0; j < op->numInputs; j++) {
Tensor t = op->inputs[j];
Op* pre_op = t.owner_op;
if (pre_op == NULL)
continue;
ParallelConfig pre_config = global.find(pre_op)->second;
size_t element_size = data_type_size(t.data_type);
for (int dstId = 0; dstId < config.num_parts(); dstId ++) {
Domain dstR = op->get_input_tensor_shape(config, j, dstId);
for (int srcId = 0; srcId < pre_config.num_parts(); srcId ++) {
Domain srcR = pre_op->get_output_tensor_shape(pre_config, t.owner_idx, srcId);
if (dstR.intersection(srcR).get_volume() > 0) {
// Forward dependency
{
SimTask* dstT = task_manager->get_forward_task(op, dstId);
SimTask* srcT = task_manager->get_forward_task(pre_op, srcId);
add_task_dependencies_with_xfer(srcT, dstT, dstR.intersection(srcR).get_volume() * element_size);
}
// Backward dependency
if (comp_mode == COMP_MODE_TRAINING) {
SimTask* dstT = task_manager->get_backward_task(op, dstId);
SimTask* srcT = task_manager->get_backward_task(pre_op, srcId);
add_task_dependencies_with_xfer(dstT, srcT, dstR.intersection(srcR).get_volume() * element_size);
}
}
}
}
}
}
#ifdef FF_USE_NCCL
// Do nothing since we will calculate NCCL cost at the end
#else
// Step 2.5: add finals tasks for each compute device to capture the returning comm tasks
// from parameter servers
std::vector<SimTask*> finals;
for (int d = 0; d < machine->get_num_gpus(); d++) {
SimTask* t = task_manager->new_barrier_task();
t->device = machine->get_gpu(d);
t->mem = machine->get_gpu_fb_mem(d);
t->run_time = 0;
finals.push_back(t);
}
if (model->config.search_overlap_backward_update && comp_mode == COMP_MODE_TRAINING) {
// Step 3a: consider backpropagation and weight update are overlapped
for (int l = model->layers.size()-1; l >= 0; l--) {
Op* op = model->layers[l];
size_t element_size = data_type_size(DT_FLOAT); // assume all weights have float elements
ParallelConfig pc = global.find(op)->second;
for (int j = 0; j < op->numWeights; j++) {
std::set<int> synched;
for (int firstId = 0; firstId < pc.num_parts(); firstId++)
if (synched.find(firstId) == synched.end()) {
synched.insert(firstId);
Domain firstR = op->get_weight_tensor_shape(pc, j, firstId);
// Add a compute task for parameter update
SimTask* updateT = task_manager->new_update_task();
updateT->device = machine->get_gpu(pc.device_ids[firstId]);
updateT->mem = machine->get_gpu_fb_mem(pc.device_ids[firstId]);
// TODO add parameter synchronization time
updateT->run_time = 0.0f; // Assume update task takes no time
for (int nextId = firstId+1; nextId < pc.num_parts(); nextId++) {
Domain nextR = op->get_weight_tensor_shape(pc, j, nextId);
if (firstR.intersection(nextR).get_volume() > 0) {
// Assert all or nothing:
// The two weights must be fully overlapped or not at all
assert(firstR == nextR);
assert(synched.find(nextId) == synched.end());
synched.insert(nextId);
// Add comm. tasks from backT to updateT
SimTask* backT = task_manager->get_backward_task(op, nextId);
add_task_dependencies_with_xfer(backT, updateT, firstR.get_volume() * element_size);
// Add comm. tasks from updateT to finalT
SimTask* finalT = finals[backT->device->device_id];
add_task_dependencies_with_xfer(updateT, finalT, firstR.get_volume() * element_size);
}
}
}
}
}
} else if (comp_mode == COMP_MODE_TRAINING) {
// Step 3b: Bulk Synchronous Model
// Add a per-device barrier before weight update
std::vector<SimTask*> barriers;
for (int d = 0; d < machine->get_num_gpus(); d++) {
SimTask* t = task_manager->new_barrier_task();
t->device = machine->get_gpu(d);
t->mem = machine->get_gpu_fb_mem(d);
t->run_time = 0;
barriers.push_back(t);
}
for (size_t l = 0; l < model->layers.size(); l++) {
Op* op = model->layers[l];
ParallelConfig pc = global.find(op)->second;
for (int j = 0; j < pc.num_parts(); j++) {
SimTask* backT = task_manager->get_backward_task(op, j);
backT->add_next_task(barriers[backT->device->device_id]);
}
}
for (size_t l = 0; l < model->layers.size(); l++) {
Op* op = model->layers[l];
ParallelConfig pc = global.find(op)->second;
size_t element_size = data_type_size(DT_FLOAT); // assume all weights have float elements
for (int j = 0; j < op->numWeights; j++) {
std::set<int> synched;
for (int firstId = 0; firstId < pc.num_parts(); firstId++)
if (synched.find(firstId) == synched.end()) {
synched.insert(firstId);
Domain firstR = op->get_weight_tensor_shape(pc, j, firstId);
// Add a compute task for parameter update
SimTask* updateT = task_manager->new_update_task();
updateT->device = machine->get_gpu(pc.device_ids[firstId]);
updateT->mem = machine->get_gpu_fb_mem(pc.device_ids[firstId]);
updateT->run_time = 0.0f; // Assume update task takes no time
barriers[updateT->device->device_id]->add_next_task(updateT);
for (int nextId = firstId+1; nextId < pc.num_parts(); nextId++) {
Domain nextR = op->get_weight_tensor_shape(pc, j, nextId);
if (firstR.intersection(nextR).get_volume() > 0) {
// Assert all or nothing:
// The two weights must be fully overlapped or not at all
assert(firstR == nextR);
assert(synched.find(nextId) == synched.end());
synched.insert(nextId);
SimTask* backT = task_manager->get_backward_task(op, nextId);
assert(backT->device->device_id == pc.device_ids[nextId]);
SimTask* barrierT = barriers[backT->device->device_id];
// Add comm. tasks from barrierT to updateT
add_task_dependencies_with_xfer(barrierT, updateT, firstR.get_volume() * element_size);
// Add comm. tasks from updateT to finalT
SimTask* finalT = finals[backT->device->device_id];
add_task_dependencies_with_xfer(updateT, finalT, firstR.get_volume() * element_size);
}
}
}
}
}
} else {
assert(comp_mode == COMP_MODE_INFERENCE);
}
#endif
// Step 4: add ready tasks into ready_queue
std::priority_queue<SimTask*, std::vector<SimTask*>, SimTaskCompare> ready_queue;
for (size_t i = 0; i < task_manager->global_task_id; i++)
if (task_manager->tasks[i]->counter == 0)
ready_queue.push(task_manager->tasks[i]);
// Step 5: perform simulation
float sim_time = 0.0f;
std::map<Device*, float> device_times;
size_t idx = 0;
DotFile<SimTask *> taskGraph;
bool export_taskgraph = (export_file_name != "");
if (export_taskgraph) {
taskGraph.set_filename(export_file_name);
}
while (!ready_queue.empty()) {
// Find the task with the earliest start time
SimTask* cur_task = ready_queue.top();
ready_queue.pop();
float ready_time = 0;
if (device_times.find(cur_task->device) != device_times.end()) {
ready_time = device_times[cur_task->device];
}
float start_time = std::max(ready_time, cur_task->ready_time);
float end_time = start_time + cur_task->run_time;
device_times[cur_task->device] = end_time;
if (export_taskgraph) {
std::map<std::string, std::string> nodeAttrs;
std::ostringstream label;
label << "\"{ ";
if (!(cur_task->name).empty()) {
label << cur_task->name << " | ";
}
label << cur_task->get_type_str() << " | ";
label << "{ " << start_time << " | " << end_time << " }";
label << " }\"";
nodeAttrs["label"] = label.str();
nodeAttrs["shape"] = "record";
taskGraph.add_node(cur_task, nodeAttrs);
}
// printf("task[%lu] type(%d) run_time(%.4lf) ready_time(%.4lf) start_time(%.4lf) device(%s)\n",
// idx, cur_task->type, cur_task->run_time, ready_time, start_time, (cur_task->device->name).c_str());
if (end_time > sim_time)
sim_time = end_time;
for (size_t i = 0; i < cur_task->next_tasks.size(); i++) {
SimTask* next = cur_task->next_tasks[i];
if (export_taskgraph) {
taskGraph.add_edge(cur_task, next);
}
next->ready_time = std::max(next->ready_time, end_time);
next->counter --;
if (next->counter == 0) {
ready_queue.push(next);
}
}
idx++;
}
if (export_taskgraph) {
taskGraph.close();
}
// Assert all tasks were processed
assert(idx == task_manager->global_task_id);
#ifdef FF_USE_NCCL
if (comp_mode == COMP_MODE_TRAINING) {
for (size_t l = 0; l < model->layers.size(); l++) {
Op* op = model->layers[l];
size_t element_size = data_type_size(DT_FLOAT); // assume all weights have float elements
ParallelConfig pc = global.find(op)->second;
// Since all NCCL calls are blocking, we can add the NCCL cost
// sequentially
for (int j = 0; j < op->numWeights; j++) {
std::set<int> synched;
for (int firstId = 0; firstId < pc.num_parts(); firstId++)
if (synched.find(firstId) == synched.end()) {
synched.insert(firstId);
Domain firstR = op->get_weight_tensor_shape(pc, j, firstId);
Device* firstDevice = machine->get_gpu(pc.device_ids[firstId]);
float nccl_time = 0.0f;
for (int nextId = firstId+1; nextId < pc.num_parts(); nextId++) {
Domain nextR = op->get_weight_tensor_shape(pc, j, nextId);
if (firstR.intersection(nextR).get_volume() > 0) {
// Assert all or nothing:
// The two weights must be fully overlapped or not at all
assert(firstR == nextR);
assert(synched.find(nextId) == synched.end());
synched.insert(nextId);
Device* nextDevice = machine->get_gpu(pc.device_ids[nextId]);
// Compute the bandwidth between firstDevice/nextDevice
float bandwidth = 0.0f;
if (firstDevice->node_id == nextDevice->node_id) {
bandwidth = machine->get_intra_node_gpu_bandwidth();
} else {
bandwidth = machine->get_inter_node_gpu_bandwidth();
}
nccl_time = std::max(nccl_time, (float)firstR.get_volume() * element_size / bandwidth);
}
}
// Add ncclTime to sim_time given nccl calls are blocking
sim_time += nccl_time;
}
}
}
} else {
assert(comp_mode == COMP_MODE_INFERENCE);
}
#endif
// Step 6: add penalty to strategies that exceed the memory limits on devices
std::vector<size_t> gpu_mem_usage(machine->get_num_gpus(), 0);
float memory_penalty = 0.0f;
for (size_t l = 0; l < model->layers.size(); l++) {
Op* op = model->layers[l];
ParallelConfig config = global.find(op)->second;
CostMetrics cost_metrics = measure_operator_cost(op, config);
size_t memory_requirement = cost_metrics.memory_requirement;
for (int j = 0; j < config.num_parts(); j++) {
gpu_mem_usage[config.device_ids[j]] += memory_requirement;
}
}
if (export_file_name != "") {
for (int i = 0; i < machine->get_num_gpus(); i++) {
printf("Before penalty, dev id %d, usage %zu \n", i, gpu_mem_usage[i]);
}
}
// Penalize the total runtiem by 1ms if we exceed the memory budget by 1MB
for (int i = 0; i < machine->get_num_gpus(); i++) {
MemDevice* gpu_fb_mem = machine->get_gpu_fb_mem(i);
if (gpu_mem_usage[i] > gpu_fb_mem->capacity and gpu_fb_mem->capacity >= 0)
memory_penalty += (gpu_mem_usage[i] - gpu_fb_mem->capacity) * 1e-6;
}
//if (memory_penalty > 0.0f)
// printf("Memory penalty = %.4lf ms\n", memory_penalty);
return sim_time + memory_penalty;
}