in src/mapper/mapper.cc [246:344]
void FFMapper::select_task_options(const MapperContext ctx,
const Task& task,
TaskOptions& output)
{
unsigned long long task_hash = compute_task_hash(task);
output.inline_task = false;
output.stealable = false;
output.map_locally = true;
if (task.task_id == STRATEGY_SEARCH_TASK_ID) {
output.initial_proc = all_gpus[0];
return;
}
if (task.task_id == NCCL_GETUNIQUEID_TASK_ID) {
output.initial_proc = all_gpus[0];
return;
}
if (task.task_id == UPDATE_METRICS_TASK_ID) {
output.initial_proc = all_cpus[0];
return;
}
if (task.task_id == TOP_LEVEL_TASK_ID) {
output.initial_proc = all_cpus[0];
// control replicate top level task
if (enable_control_replication) {
output.replicate = true;
}
return;
}
if (task.task_id == PYTHON_TOP_LEVEL_TASK_ID) {
output.initial_proc = local_pys[0];
// control replicate python top level task
if (enable_control_replication) {
output.replicate = true;
}
return;
}
if (is_parameter_server_update_task(task.task_id)
|| is_initializer_task(task.task_id)) {
// For Parameter Server Update, pick a processor from config
MappingTagID hash = task.tag;
ParallelConfig config;
if (strategies.find(hash) != strategies.end()) {
config = strategies[hash];
int num_parts = 1;
for (int i = 0; i < config.nDims; i++)
num_parts *= config.dim[i];
if (num_parts == 1) {
output.initial_proc = all_gpus[config.device_ids[0]];
// Current assert this sould be a local proc
assert(output.initial_proc.address_space() == node_id);
return;
} else {
output.initial_proc = all_gpus[config.device_ids[0]];
return;
}
}
if (cache_update_tasks.find(task_hash) != cache_update_tasks.end()) {
output.initial_proc = cache_update_tasks[task_hash];
assert(output.initial_proc.address_space() == node_id);
return;
}
// randomly select a local processor
output.initial_proc = local_gpus[task_hash % local_gpus.size()];
cache_update_tasks[task_hash] = output.initial_proc;
return;
}
if ((task.task_id >= CUSTOM_CPU_TASK_ID_FIRST)
&& (task.task_id <= CUSTOM_CPU_TASK_ID_LAST))
{
if (!task.is_index_space) {
output.initial_proc = all_cpus[0];
return;
}
}
if ((task.task_id == PY_DL_FLOAT_LOAD_ENTIRE_CPU_TASK_ID)
|| (task.task_id == PY_DL_INT_LOAD_ENTIRE_CPU_TASK_ID)
|| (task.task_id == PY_DL_FLOAT_INDEX_LOAD_ENTIRE_CPU_TASK_ID)
|| (task.task_id == PY_DL_INT_INDEX_LOAD_ENTIRE_CPU_TASK_ID))
{
if (!task.is_index_space) {
output.initial_proc = all_cpus[0];
return;
}
}
// Assert that all single tasks should be handled and returned before
// So task must be an indextask
if (!task.is_index_space) {
fprintf(stderr, "The following task is currently not captured by the "
"FlexFlow Mapper: %s\n"
"Report the issue to the FlexFlow developers",
task.get_task_name());
}
assert(task.is_index_space);
}