void FFMapper::select_task_options()

in src/mapper/mapper.cc [246:344]


void FFMapper::select_task_options(const MapperContext ctx,
                                   const Task& task,
                                   TaskOptions& output)
{
  unsigned long long task_hash = compute_task_hash(task);
  output.inline_task = false;
  output.stealable = false;
  output.map_locally = true;

  if (task.task_id == STRATEGY_SEARCH_TASK_ID) {
    output.initial_proc = all_gpus[0];
    return;
  }
  if (task.task_id == NCCL_GETUNIQUEID_TASK_ID) {
    output.initial_proc = all_gpus[0];
    return;
  }
  if (task.task_id == UPDATE_METRICS_TASK_ID) {
    output.initial_proc = all_cpus[0];
    return;
  }
  if (task.task_id == TOP_LEVEL_TASK_ID) {
    output.initial_proc = all_cpus[0];
    // control replicate top level task
    if (enable_control_replication) {
      output.replicate = true;
    }
    return;
  }
  if (task.task_id == PYTHON_TOP_LEVEL_TASK_ID) {
    output.initial_proc = local_pys[0];
    // control replicate python top level task
    if (enable_control_replication) {
      output.replicate = true;
    }
    return;
  }

  if (is_parameter_server_update_task(task.task_id)
  || is_initializer_task(task.task_id)) {
    // For Parameter Server Update, pick a processor from config
    MappingTagID hash = task.tag;
    ParallelConfig config;
    if (strategies.find(hash) != strategies.end()) {
      config = strategies[hash];
      int num_parts = 1;
      for (int i = 0; i < config.nDims; i++)
        num_parts *= config.dim[i];
      if (num_parts == 1) {
        output.initial_proc = all_gpus[config.device_ids[0]];
        // Current assert this sould be a local proc
        assert(output.initial_proc.address_space() == node_id);
        return;
      } else {
        output.initial_proc = all_gpus[config.device_ids[0]];
        return;
      }
    }
    if (cache_update_tasks.find(task_hash) != cache_update_tasks.end()) {
      output.initial_proc = cache_update_tasks[task_hash];
      assert(output.initial_proc.address_space() == node_id);
      return;
    }
    // randomly select a local processor
    output.initial_proc = local_gpus[task_hash % local_gpus.size()];
    cache_update_tasks[task_hash] = output.initial_proc;
    return;
  }

  if ((task.task_id >= CUSTOM_CPU_TASK_ID_FIRST)
     && (task.task_id <= CUSTOM_CPU_TASK_ID_LAST))
  {
    if (!task.is_index_space) {
      output.initial_proc = all_cpus[0];
      return;
    }
  }

  if ((task.task_id == PY_DL_FLOAT_LOAD_ENTIRE_CPU_TASK_ID)
    || (task.task_id == PY_DL_INT_LOAD_ENTIRE_CPU_TASK_ID)
    || (task.task_id == PY_DL_FLOAT_INDEX_LOAD_ENTIRE_CPU_TASK_ID)
    || (task.task_id == PY_DL_INT_INDEX_LOAD_ENTIRE_CPU_TASK_ID))
  {
    if (!task.is_index_space) {
      output.initial_proc = all_cpus[0];
      return;
    }
  }

  // Assert that all single tasks should be handled and returned before
  // So task must be an indextask
  if (!task.is_index_space) {
    fprintf(stderr, "The following task is currently not captured by the "
            "FlexFlow Mapper: %s\n"
            "Report the issue to the FlexFlow developers",
            task.get_task_name());
  }
  assert(task.is_index_space);
}