in astra-sim-alibabacloud/astra-sim/system/Sys.cc [313:415]
int Sys::break_dimension(int model_parallel_npu_group) {
if (model_parallel_npu_group == 1) {
return -1;
}
int dimension_to_break = 0;
int all_npus = 1;
for (; dimension_to_break < physical_dims.size(); dimension_to_break++) {
if (all_npus * physical_dims[dimension_to_break] <
model_parallel_npu_group) {
all_npus *= physical_dims[dimension_to_break];
} else if (
all_npus * physical_dims[dimension_to_break] >
model_parallel_npu_group) {
for (auto lt : logical_topologies) {
delete lt.second;
}
logical_topologies.clear();
delete scheduler_unit;
delete vLevels;
std::vector<int>::iterator levelIterator = queues_per_dim.begin();
std::advance(levelIterator, dimension_to_break);
queues_per_dim.insert(levelIterator, queues_per_dim[dimension_to_break]);
scheduler_unit = new SchedulerUnit(
this,
queues_per_dim,
max_running,
active_first_phase,
concurrent_streams);
vLevels = new QueueLevels(queues_per_dim, 0, NI->get_backend_type());
int first_subdim = model_parallel_npu_group / all_npus;
int second_subdim = physical_dims[dimension_to_break] / first_subdim;
std::vector<int> logical_dims;
for (int dim = 0; dim < physical_dims.size(); dim++) {
if (dim != dimension_to_break) {
logical_dims.push_back(physical_dims[dim]);
} else {
logical_dims.push_back(first_subdim);
logical_dims.push_back(second_subdim);
}
}
std::vector<CollectiveImplementation*>::iterator it =
all_reduce_implementation_per_dimension.begin();
if (all_reduce_implementation_per_dimension.size() > dimension_to_break) {
std::advance(it, dimension_to_break);
} else {
std::advance(it, all_reduce_implementation_per_dimension.size());
}
CollectiveImplementation* replicate =
(CollectiveImplementation*)(*it)->clone();
all_reduce_implementation_per_dimension.insert(it, replicate);
it = reduce_scatter_implementation_per_dimension.begin();
if (reduce_scatter_implementation_per_dimension.size() >
dimension_to_break) {
std::advance(it, dimension_to_break);
} else {
std::advance(it, reduce_scatter_implementation_per_dimension.size());
}
replicate = (CollectiveImplementation*)(*it)->clone();
reduce_scatter_implementation_per_dimension.insert(it, replicate);
it = all_gather_implementation_per_dimension.begin();
if (all_gather_implementation_per_dimension.size() > dimension_to_break) {
std::advance(it, dimension_to_break);
} else {
std::advance(it, all_gather_implementation_per_dimension.size());
}
replicate = (CollectiveImplementation*)(*it)->clone();
all_gather_implementation_per_dimension.insert(it, replicate);
it = all_to_all_implementation_per_dimension.begin();
if (all_to_all_implementation_per_dimension.size() > dimension_to_break) {
std::advance(it, dimension_to_break);
} else {
std::advance(it, all_to_all_implementation_per_dimension.size());
}
replicate = (CollectiveImplementation*)(*it)->clone();
all_to_all_implementation_per_dimension.insert(it, replicate);
logical_topologies["AllReduce"] = new GeneralComplexTopology(
id, logical_dims, all_reduce_implementation_per_dimension);
logical_topologies["ReduceScatter"] = new GeneralComplexTopology(
id, logical_dims, reduce_scatter_implementation_per_dimension);
logical_topologies["AllGather"] = new GeneralComplexTopology(
id, logical_dims, all_gather_implementation_per_dimension);
logical_topologies["AllToAll"] = new GeneralComplexTopology(
id, logical_dims, all_to_all_implementation_per_dimension);
this->logical_broken_dims = logical_dims;
this->dim_to_break = dimension_to_break;
return dimension_to_break;
} else if (
all_npus * physical_dims[dimension_to_break] ==
model_parallel_npu_group) {
return dimension_to_break;
}
}
return -1;
}