astra-sim-alibabacloud/astra-sim/workload/Layer.cc (1,456 lines of code) (raw):

/****************************************************************************** This source code is licensed under the MIT license found in the LICENSE file in the root directory of this source tree. *******************************************************************************/ #include "Layer.hh" #include "astra-sim/system/DataSet.hh" #include "astra-sim/system/IntData.hh" #include "astra-sim/system/MockNcclLog.h" #include "astra-sim/system/AstraParamParse.hh" #ifdef NS3_MPI #include "ns3/mpi-interface.h" #include <mpi.h> using namespace ns3; #endif namespace AstraSim { Layer::Layer( std::string id, int layer_num, Sys* generator, Workload* workload, Tick fwd_pass_compute_time, ComType fwd_pass_comm_type, MockNccl::GroupType fwd_pass_group_type, uint64_t fwd_pass_comm_size, std::vector<bool> fwd_pass_comm_involved_dimensions, Tick input_grad_compute_time, ComType input_grad_comm_type, MockNccl::GroupType input_grad_group_type, uint64_t input_grad_comm_size, std::vector<bool> input_grad_comm_involved_dimensions, Tick weight_grad_compute_time, ComType weight_grad_comm_type, MockNccl::GroupType weight_grad_group_type, uint64_t weight_grad_comm_size, std::vector<bool> weight_grad_comm_involved_dimensions, Tick weight_grad_update_time, ParallelismPolicy specific_policy) { this->id = id; this->layer_num = layer_num; this->generator = generator; this->workload = workload; this->fwd_pass_compute_time = fwd_pass_compute_time; this->fwd_pass_comm_type = fwd_pass_comm_type; this->fwd_pass_group_type = fwd_pass_group_type; this->fwd_pass_comm_size = fwd_pass_comm_size; this->fwd_pass_comm_involved_dimensions = fwd_pass_comm_involved_dimensions; this->input_grad_compute_time = input_grad_compute_time; this->input_grad_comm_type = input_grad_comm_type; this->input_grad_group_type = input_grad_group_type; this->input_grad_comm_size = input_grad_comm_size; this->input_grad_comm_involved_dimensions = input_grad_comm_involved_dimensions; this->weight_grad_compute_time = weight_grad_compute_time; this->weight_grad_comm_type = weight_grad_comm_type; this->weight_grad_group_type = weight_grad_group_type; this->weight_grad_comm_size = weight_grad_comm_size; this->weight_grad_comm_involved_dimensions = weight_grad_comm_involved_dimensions; this->collective_counter = 0; this->weight_grad_update_time = weight_grad_update_time; this->fwd_update_time = weight_grad_update_time; this->input_grad_update_time = weight_grad_update_time; this->total_forward_pass_compute = 0; this->total_input_grad_compute = 0; this->total_weight_grad_compute = 0; this->total_weight_grad_comm = 0; this->total_input_grad_comm = 0; this->total_fwd_comm = 0; this->total_waiting_for_wg_comm = 0; this->total_waiting_for_ig_comm = 0; this->total_waiting_for_fwd_comm = 0; this->last_fwd_finished = 0; this->last_ig_finished = 0; this->last_wg_finished = 0; this->needs_fwd_in_bckwd_initiation = false; this->is_checkpoint = false; this->specific_parallellism = specific_policy; assert(generator != NULL); } void Layer::call(EventType event, CallData* mdata) { if (event == EventType::Wight_Grad_Comm_Finished) { last_wg_finished = Sys::boostedTick(); generator->register_event( this, EventType::Wight_Grad_Comm_Finished_After_Delay, mdata, weight_grad_update_time); return; } else if (event == EventType::Input_Grad_Comm_Finished) { last_ig_finished = Sys::boostedTick(); generator->register_event( this, EventType::Input_Grad_Comm_Finished_After_Delay, mdata, input_grad_update_time); return; } else if (event == EventType::Fwd_Comm_Finished) { last_fwd_finished = Sys::boostedTick(); generator->register_event( this, EventType::Fwd_Comm_Finished_After_Delay, mdata, fwd_update_time); return; } int data = ((IntData*)mdata)->data; IntData* intData = ((IntData*)mdata); if (event == EventType::Wight_Grad_Comm_Finished_After_Delay) { #ifndef PHY_MTP if (generator->id == 0) { std::cout << "***** info: weight gradient collective for layer: " << id << " is finished************" << std::endl; } weight_grad_datasets[data]->finish_tick += weight_grad_update_time; total_weight_grad_comm += weight_grad_datasets[data]->finish_tick - weight_grad_datasets[data]->creation_tick; if (weight_grad_datasets.size() == 1 && wg_barrier == CollectiveBarrier::Blocking) { total_waiting_for_wg_comm += weight_grad_datasets[data]->finish_tick - weight_grad_datasets[data]->creation_tick; update_stream_stats(weight_grad_datasets[data]); int dataset_streams = weight_grad_datasets[data]->total_streams; delete weight_grad_datasets[data]; weight_grad_datasets.erase(data); workload->call(EventType::General, NULL); generator->increase_finished_streams(dataset_streams); delete intData; return; } else if (started_waiting_for_weight_grad.size() > 0) { total_waiting_for_wg_comm += weight_grad_datasets[data]->finish_tick - started_waiting_for_weight_grad.front(); started_waiting_for_weight_grad.pop_front(); update_stream_stats(weight_grad_datasets[data]); int dataset_streams = weight_grad_datasets[data]->total_streams; delete weight_grad_datasets[data]; weight_grad_datasets.erase(data); workload->call(EventType::General, NULL); generator->increase_finished_streams(dataset_streams); delete intData; return; } update_stream_stats(weight_grad_datasets[data]); int dataset_streams = weight_grad_datasets[data]->total_streams; delete weight_grad_datasets[data]; weight_grad_datasets.erase(data); generator->increase_finished_streams(dataset_streams); delete intData; #else workload->call(EventType::General, NULL); generator->increase_finished_streams(1); #endif return; } else if (event == EventType::Input_Grad_Comm_Finished_After_Delay) { #ifndef PHY_MTP if (generator->id == 0) { std::cout << "***** info: input gradient collective for layer: " << id << " is finished************" << std::endl; } input_grad_datasets[data]->finish_tick += input_grad_update_time; total_input_grad_comm += input_grad_datasets[data]->finish_tick - input_grad_datasets[data]->creation_tick; if (input_grad_datasets.size() == 1 && ig_barrier == CollectiveBarrier::Blocking) { total_waiting_for_ig_comm += input_grad_datasets[data]->finish_tick - input_grad_datasets[data]->creation_tick; update_stream_stats(input_grad_datasets[data]); int dataset_streams = input_grad_datasets[data]->total_streams; delete input_grad_datasets[data]; input_grad_datasets.erase(data); workload->call(EventType::General, NULL); generator->increase_finished_streams(dataset_streams); delete intData; return; } else if (started_waiting_for_input_grad.size() > 0) { total_waiting_for_ig_comm += input_grad_datasets[data]->finish_tick - started_waiting_for_input_grad.front(); started_waiting_for_input_grad.pop_front(); update_stream_stats(input_grad_datasets[data]); int dataset_streams = input_grad_datasets[data]->total_streams; delete input_grad_datasets[data]; input_grad_datasets.erase(data); workload->call(EventType::General, NULL); generator->increase_finished_streams(dataset_streams); delete intData; return; } update_stream_stats(input_grad_datasets[data]); int dataset_streams = input_grad_datasets[data]->total_streams; delete input_grad_datasets[data]; input_grad_datasets.erase(data); generator->increase_finished_streams(dataset_streams); delete intData; #else workload->call(EventType::General, NULL); generator->increase_finished_streams(1); #endif return; } else if (event == EventType::Fwd_Comm_Finished_After_Delay) { #ifndef PHY_MTP if (generator->id == 0) { std::cout << "***** info: fwd pass comm collective for layer: " << id << " is finished************" << std::endl; } fwd_pass_datasets[data]->finish_tick += fwd_update_time; total_fwd_comm += fwd_pass_datasets[data]->finish_tick - fwd_pass_datasets[data]->creation_tick; if (fwd_pass_datasets.size() == 1 && fwd_barrier == CollectiveBarrier::Blocking) { total_waiting_for_fwd_comm += fwd_pass_datasets[data]->finish_tick - fwd_pass_datasets[data]->creation_tick; update_stream_stats(fwd_pass_datasets[data]); int dataset_streams = fwd_pass_datasets[data]->total_streams; delete fwd_pass_datasets[data]; fwd_pass_datasets.erase(data); workload->call(EventType::General, NULL); generator->increase_finished_streams(dataset_streams); delete intData; return; } else if (started_waiting_for_fwd_pass.size() > 0) { total_waiting_for_fwd_comm += fwd_pass_datasets[data]->finish_tick - started_waiting_for_fwd_pass.front(); started_waiting_for_fwd_pass.pop_front(); update_stream_stats(fwd_pass_datasets[data]); int dataset_streams = fwd_pass_datasets[data]->total_streams; delete fwd_pass_datasets[data]; fwd_pass_datasets.erase(data); workload->call(EventType::General, NULL); generator->increase_finished_streams(dataset_streams); delete intData; return; } update_stream_stats(fwd_pass_datasets[data]); int dataset_streams = fwd_pass_datasets[data]->total_streams; delete fwd_pass_datasets[data]; fwd_pass_datasets.erase(data); generator->increase_finished_streams(dataset_streams); delete intData; #else workload->call(EventType::General, NULL); generator->increase_finished_streams(1); #endif return; } } Tick Layer::get_fwd_pass_compute() { total_forward_pass_compute += fwd_pass_compute_time; return fwd_pass_compute_time; } Tick Layer::get_input_grad_compute() { total_input_grad_compute += input_grad_compute_time; return input_grad_compute_time; } Tick Layer::get_weight_grad_compute() { total_weight_grad_compute += weight_grad_compute_time; return weight_grad_compute_time; } void Layer::increment_waiting_for_wg() { total_waiting_for_wg_comm++; } void Layer::increment_waiting_for_ig() { total_waiting_for_ig_comm++; } void Layer::increment_waiting_for_fwd() { total_waiting_for_fwd_comm++; } bool Layer::is_fwd_pass_comm_finished() { if (fwd_pass_datasets.size() == 0) { return true; } return false; } bool Layer::is_fwd_pass_comm_finished_blocking() { if (fwd_pass_datasets.size() == 0) { return true; } if (started_waiting_for_fwd_pass.size() == 0) { started_waiting_for_fwd_pass.push_back(Sys::boostedTick()); } return false; } bool Layer::is_input_grad_comm_finished() { if (input_grad_datasets.size() == 0) { return true; } return false; } bool Layer::is_input_grad_comm_finished_blocking() { if (input_grad_datasets.size() == 0) { return true; } if (started_waiting_for_input_grad.size() == 0) { started_waiting_for_input_grad.push_back(Sys::boostedTick()); } return false; } bool Layer::is_weight_grad_comm_finished() { if (weight_grad_datasets.size() == 0) { return true; } return false; } bool Layer::is_weight_grad_comm_finished_blocking() { if (weight_grad_datasets.size() == 0) { return true; } if (started_waiting_for_weight_grad.size() == 0) { this->started_waiting_for_weight_grad.push_back(Sys::boostedTick()); } return false; } void Layer::print_involved_dimensions(std::vector<bool>& involved_dimensions) { std::cout << " involved dimensions: "; for (int i = 0; i < involved_dimensions.size(); i++) { if (involved_dimensions[i] == true) { std::cout << " 1,"; } else { std::cout << " 0,"; } } std::cout << std::endl; } LayerData Layer::report( std::string run_name, int layer_num, int total_rows, int stat_row, CSVWriter* detailed, CSVWriter* EndToEnd, double& total_compute, double& total_exposed, bool seprate_log, vector<double>& total_fwd_time, vector<double>& total_wg_time, vector<double>& total_ig_time, double& pre_bubble_time, double& DP_comm, double& DP_EP_comm, double& Expose_TP_comm, double& Expose_EP_comm) { LayerData layerData; take_stream_stats_average(); int TP_size = workload->model_parallel_npu_group; int PP_size = workload->pipeline_model_parallelism; int DP_size = workload->all_gpus / (TP_size * PP_size); int EP_size = workload->expert_parallel_npu_group; int vpp = workload->vpp; uint32_t pp_commsize = workload->pp_commsize; int GA = workload->GA; UserParam* param = UserParam::getInstance(); int input_grad_group_size = input_grad_group_type == MockNccl::GroupType::EP ? EP_size : TP_size; int fwd_pass_group_size = fwd_pass_group_type == MockNccl::GroupType::EP ? EP_size : TP_size; int weight_grad_group_size = weight_grad_group_type == MockNccl::GroupType::DP_EP ? DP_size / EP_size : DP_size; if (id != "embedding_layer"){ pre_bubble_time += ((total_waiting_for_fwd_comm + total_forward_pass_compute + total_weight_grad_compute + total_input_grad_compute + total_waiting_for_ig_comm) / FREQ); } if(weight_grad_group_type == MockNccl::GroupType::DP_EP){ DP_EP_comm += (total_waiting_for_wg_comm / FREQ); } else{ DP_comm += (total_waiting_for_wg_comm / FREQ); } if(fwd_pass_group_type == MockNccl::GroupType::EP){ Expose_EP_comm += ((total_waiting_for_fwd_comm + total_waiting_for_ig_comm) / FREQ); } else{ Expose_TP_comm += ((total_waiting_for_fwd_comm + total_waiting_for_ig_comm) / FREQ); } total_compute += (total_forward_pass_compute / FREQ); total_compute += (total_weight_grad_compute / FREQ); total_compute += (total_input_grad_compute / FREQ); total_exposed += (total_waiting_for_fwd_comm / FREQ); total_exposed += (total_waiting_for_wg_comm / FREQ); total_exposed += (total_waiting_for_ig_comm / FREQ); layerData.layer_name = id; layerData.total_forward_pass_compute = total_forward_pass_compute / FREQ; layerData.total_weight_grad_compute = total_weight_grad_compute / FREQ; layerData.total_input_grad_compute = total_input_grad_compute / FREQ; layerData.total_waiting_for_fwd_comm = total_waiting_for_fwd_comm / FREQ; layerData.total_waiting_for_wg_comm = total_waiting_for_wg_comm / FREQ; layerData.total_waiting_for_ig_comm = total_waiting_for_ig_comm / FREQ; layerData.total_fwd_comm = total_fwd_comm / FREQ; layerData.total_weight_grad_comm = total_weight_grad_comm / FREQ; layerData.total_input_grad_comm = total_input_grad_comm / FREQ; total_fwd_time[0] +=total_forward_pass_compute / FREQ; total_fwd_time[1] +=total_waiting_for_fwd_comm / FREQ; total_fwd_time[2] +=total_fwd_comm / FREQ; total_wg_time[0] +=total_weight_grad_compute / FREQ; total_wg_time[1] +=total_waiting_for_wg_comm / FREQ; total_wg_time[2] +=total_weight_grad_comm / FREQ; total_ig_time[0] +=total_input_grad_compute / FREQ; total_ig_time[1] +=total_waiting_for_ig_comm / FREQ; total_ig_time[2] +=total_input_grad_comm / FREQ; int i = 0; for (auto& qd : queuing_delay) { layerData.avg_queuing_delay.push_back(std::make_pair(i, qd / FREQ)); } i = 1; for (auto& ml : net_message_latency) { layerData.avg_network_message_dealy.push_back(std::make_pair(i, ml / FREQ)); } if (seprate_log) { std::string data; std::pair<float, float> total_bw; std::cout << "*******************" << std::endl; std::cout << "Layer id: " << id << std::endl; std::cout << "Total collectives issued for this layer: " << collective_counter << std::endl; std::cout << "************************* Workload stats " "************************* " << id << std::endl; if(stat_row == 0 && layer_num == 0) { data = "layer_name,"+run_name+",fwd compute,wg compute,ig compute,fwd exposed comm,wg exposed comm,ig exposed comm,fwd total comm,algbw,busbw,wg total comm,algbw,busbw,ig total comm,algbw,busbw,workload finished at"; EndToEnd->write_line(data); } data = ""; if(stat_row == 0){ data += id; } data = data + "," + run_name; std::cout << "id: " << id << " ,Total cycles spent on fwd pass compute: " << total_forward_pass_compute << std::endl; data = data + "," + std::to_string(total_forward_pass_compute/FREQ); std::cout << "id: " << id << " ,Total cycles spent on weight grad compute: " << total_weight_grad_compute << std::endl; data = data + "," + to_string(total_weight_grad_compute/FREQ); std::cout << "id: " << id << " ,Total cycles spent on input grad compute: " << total_input_grad_compute << std::endl; data = data + "," + to_string(total_input_grad_compute/FREQ); std::cout << "id: " << id << " ,Total cycles spent idle waiting for fwd finish: " << total_waiting_for_fwd_comm << std::endl; data = data + "," + to_string(total_waiting_for_fwd_comm/FREQ); std::cout << "id: " << id << " ,Total cycles spent idle waiting for weight grad finish: " << total_waiting_for_wg_comm << std::endl; data = data + "," + to_string(total_waiting_for_wg_comm / FREQ); std::cout << "id: " << id << " ,Total cycles spent idle waiting for input grad finish: " << total_waiting_for_ig_comm << std::endl; data = data + "," + to_string(total_waiting_for_ig_comm / FREQ); std::cout << "id: " << id << " ,Total cycles spent on fwd pass comm: " << total_fwd_comm << std::endl; total_bw = compute_busbw(fwd_pass_comm_type, fwd_pass_group_size, fwd_pass_comm_size, total_fwd_comm); data = data + "," + to_string(total_fwd_comm / FREQ); data = data + "," + to_string(total_bw.first); data = data + "," + to_string(total_bw.second); std::cout << "id: " << id << " ,Total cycles spent on weight grad comm: " << total_weight_grad_comm << std::endl; total_bw = compute_busbw(weight_grad_comm_type,weight_grad_group_size,weight_grad_comm_size,total_weight_grad_comm); data = data + "," + to_string(total_weight_grad_comm / FREQ); data = data + "," + to_string(total_bw.first); data = data + "," + to_string(total_bw.second); std::cout << "id: " << id << " ,Total cycles spent on input grad comm: " << total_input_grad_comm << std::endl; total_bw = compute_busbw(input_grad_comm_type,input_grad_group_size,input_grad_comm_size,total_input_grad_comm); data = data + "," + to_string(total_input_grad_comm / FREQ); data = data + "," + to_string(total_bw.first); data = data + "," + to_string(total_bw.second); data = data + "," + to_string(((double)Sys::boostedTick()) / FREQ); EndToEnd->write_line(data); data = "layer_name,"+run_name+",fwd compute,wg compute,ig compute,fwd exposed comm,wg exposed comm,ig exposed comm,fwd total comm,algbw,busbw,wg total comm,algbw,busbw,ig total comm,algbw,busbw,workload finished at"; if (layer_num == workload->SIZE - 1) { total_exposed = (((double)Sys::boostedTick()) / FREQ) - total_compute; data = "SUM," + run_name + "," + to_string(total_fwd_time[0]) + "," + to_string(total_wg_time[0]) + "," + to_string(total_ig_time[0]) + "," + to_string(total_fwd_time[1]) + "," + to_string(total_wg_time[1]) + "," + to_string(total_ig_time[1]) + "," + to_string(total_fwd_time[2]) + ",NONE,NONE," + to_string(total_wg_time[2]) + ",NONE,NONE," + to_string(total_ig_time[2]) + ",NONE,NONE"; EndToEnd->write_line(data); double total_time = total_compute + total_exposed; data = "total exposed comm," + to_string(total_exposed) + ",total comp," + to_string(total_compute) + ",total time," + to_string(total_time); EndToEnd->write_line(data); Tick Expose_PP_time = (2 * vpp * GA * (pp_commsize * GBps / (param->net_work_param.pp) * 1e9) / FREQ ); Expose_PP_time *= (1-param->net_work_param.pp_overlap_ratio) ; //pp bubble time pre_bubble_time *= static_cast<double>(PP_size - 1) / (GA * vpp); auto format_value = [](double value) { std::ostringstream stream; if (std::isfinite(value)) { stream << std::fixed << std::setprecision(0) << value; } else { stream << "NaN or Inf"; } return stream.str(); }; auto format_percentage = [&](double value) { double percentage = (value / total_time) * 100; std::ostringstream stream; stream << std::fixed << std::setprecision(2) << percentage; return stream.str() + "%"; }; std::string keys = "File name, Expose DP comm, Expose DP_EP comm, Expose TP comm, Expose_EP_comm, Expose_PP_comm, bubble time, total comp, total exposed comm, Total time"; std::string values = run_name + ", " + format_value(DP_comm) + " (" + format_percentage(DP_comm) + "), " + format_value(DP_EP_comm) + " (" + format_percentage(DP_EP_comm) + "), " + format_value(Expose_TP_comm) + " (" + format_percentage(Expose_TP_comm) + "), " + format_value(Expose_EP_comm) + " (" + format_percentage(Expose_EP_comm) + "), " + format_value(Expose_PP_time) + " (" + format_percentage(Expose_PP_time) + "), " + format_value(pre_bubble_time) + " (" + format_percentage(pre_bubble_time) + "), " + format_value(total_compute) + " (" + format_percentage(total_compute) + "), " + format_value(total_exposed) + " (" + format_percentage(total_exposed) + "), " + format_value(total_time); data = keys + "\n" + values; EndToEnd->write_res(data); } } return layerData; } std::string getFileName(const std::string& path) { size_t pos = path.find_last_of("/"); if (pos != std::string::npos) { return path.substr(pos + 1, path.length() - pos - 1); } return path; } LayerData Layer::report( std::string run_name, int layer_num, int total_rows, int stat_row, CSVWriter* detailed, CSVWriter* EndToEnd, double& total_compute, double& total_exposed, double& pre_bubble_time, double& DP_comm, double& DP_EP_comm, double& Expose_TP_comm, double& Expose_EP_comm, bool seprate_log) { LayerData layerData; take_stream_stats_average(); int TP_size = workload->model_parallel_npu_group; int PP_size = workload->pipeline_model_parallelism; int vpp = workload->vpp; uint32_t pp_commsize = workload->pp_commsize; int DP_size = generator->all_gpus[0] / (TP_size * PP_size); int GA = workload->GA; int EP_size = workload->expert_parallel_npu_group; int fwd_pass_group_size ; int weight_grad_group_size ; int input_grad_group_size ; UserParam* param = UserParam::getInstance(); input_grad_group_size = input_grad_group_type == MockNccl::GroupType::EP ? EP_size : TP_size; fwd_pass_group_size = fwd_pass_group_type == MockNccl::GroupType::EP ? EP_size : TP_size; weight_grad_group_size = weight_grad_group_type == MockNccl::GroupType::DP_EP ? DP_size / EP_size : DP_size; if(param->mode == ModeType::ANALYTICAL){ total_fwd_comm = compute_time(fwd_pass_comm_type,TP_size,fwd_pass_group_size,fwd_pass_comm_size,fwd_pass_group_type,generator->all_gpus[0],EP_size); total_weight_grad_comm = compute_time(weight_grad_comm_type,TP_size,weight_grad_group_size,weight_grad_comm_size,weight_grad_group_type,generator->all_gpus[0],EP_size); total_input_grad_comm = compute_time(input_grad_comm_type,TP_size,input_grad_group_size,input_grad_comm_size,input_grad_group_type,generator->all_gpus[0],EP_size); total_waiting_for_fwd_comm = total_fwd_comm; //tp forward total_waiting_for_ig_comm = total_input_grad_comm; //tp backward total_waiting_for_wg_comm = total_weight_grad_comm; } if (id != "embedding_layer"){ pre_bubble_time += ((total_waiting_for_fwd_comm + total_forward_pass_compute + total_weight_grad_compute + total_input_grad_compute + total_waiting_for_ig_comm) / FREQ); } if(weight_grad_group_type == MockNccl::GroupType::DP_EP){ total_waiting_for_wg_comm *= (1-param->net_work_param.dp_overlap_ratio); DP_EP_comm += (total_waiting_for_wg_comm / FREQ); } else{ total_waiting_for_wg_comm *= (1-param->net_work_param.dp_overlap_ratio); DP_comm += (total_waiting_for_wg_comm / FREQ); } if(fwd_pass_group_type == MockNccl::GroupType::EP){ total_waiting_for_fwd_comm *= (1-param->net_work_param.ep_overlap_ratio); total_waiting_for_ig_comm *= (1-param->net_work_param.ep_overlap_ratio); Expose_EP_comm += ((total_waiting_for_fwd_comm + total_waiting_for_ig_comm) / FREQ); } else{ total_waiting_for_fwd_comm *= (1-param->net_work_param.tp_overlap_ratio); total_waiting_for_ig_comm *= (1-param->net_work_param.tp_overlap_ratio); Expose_TP_comm += ((total_waiting_for_fwd_comm + total_waiting_for_ig_comm) / FREQ); } total_compute += (total_forward_pass_compute / FREQ); total_compute += (total_weight_grad_compute / FREQ); total_compute += (total_input_grad_compute / FREQ); total_exposed += (total_waiting_for_fwd_comm / FREQ); total_exposed += (total_waiting_for_wg_comm / FREQ); total_exposed += (total_waiting_for_ig_comm / FREQ); layerData.layer_name = id; layerData.total_forward_pass_compute = total_forward_pass_compute / FREQ; layerData.total_weight_grad_compute = total_weight_grad_compute / FREQ; layerData.total_input_grad_compute = total_input_grad_compute / FREQ; layerData.total_waiting_for_fwd_comm = total_waiting_for_fwd_comm / FREQ; layerData.total_waiting_for_wg_comm = total_waiting_for_wg_comm / FREQ; layerData.total_waiting_for_ig_comm = total_waiting_for_ig_comm / FREQ; layerData.total_fwd_comm = total_fwd_comm / FREQ; layerData.total_weight_grad_comm = total_weight_grad_comm / FREQ; layerData.total_input_grad_comm = total_input_grad_comm / FREQ; int i = 0; for (auto& qd : queuing_delay) { layerData.avg_queuing_delay.push_back(std::make_pair(i, qd / FREQ)); } i = 1; for (auto& ml : net_message_latency) { layerData.avg_network_message_dealy.push_back(std::make_pair(i, ml / FREQ)); } #ifdef NS3_MPI if (seprate_log) #else if (seprate_log) #endif { std::string data; std::pair<float, float> total_bw; std::cout << "*******************" << std::endl; std::cout << "Layer id: " << id << std::endl; std::cout << "Total collectives issued for this layer: " << collective_counter << std::endl; std::cout << "************************* Workload stats ************************* " << id << std::endl; if (stat_row == 0 && layer_num == 0) { data = "layer_name," + run_name + ",fwd compute,wg compute,ig compute,fwd exposed comm,wg exposed comm,ig exposed comm,fwd total comm,algbw,busbw,wg total comm,algbw,busbw,ig total comm,algbw,busbw"; EndToEnd->write_line(data); } data = ""; if (stat_row == 0) { data += id; } data = data + "," + run_name; auto format_value = [](double value) { std::ostringstream stream; if (std::isfinite(value)) { stream << std::fixed << std::setprecision(0) << value; } else { stream << "NaN or Inf"; } return stream.str(); }; auto format_value_bs = [](double value) { std::ostringstream stream; stream << std::fixed << std::setprecision(2) << value; return stream.str(); }; std::cout << "id: " << id << " ,Total cycles spent on fwd pass compute: " << format_value(total_forward_pass_compute / FREQ ) << std::endl; data = data + "," + format_value(total_forward_pass_compute / FREQ ); std::cout << "id: " << id << " ,Total cycles spent on weight grad compute: " << format_value(total_weight_grad_compute / FREQ ) << std::endl; data = data + "," + format_value(total_weight_grad_compute / FREQ ); std::cout << "id: " << id << " ,Total cycles spent on input grad compute: " << format_value(total_input_grad_compute / FREQ ) << std::endl; data = data + "," + format_value(total_input_grad_compute / FREQ ); std::cout << "id: " << id << " ,Total cycles spent idle waiting for fwd finish: " << format_value(total_waiting_for_fwd_comm / FREQ ) << std::endl; data = data + "," + format_value(total_waiting_for_fwd_comm / FREQ ); std::cout << "id: " << id << " ,Total cycles spent idle waiting for weight grad finish: " << format_value(total_waiting_for_wg_comm / FREQ ) << std::endl; data = data + "," + format_value(total_waiting_for_wg_comm / FREQ ); std::cout << "id: " << id << " ,Total cycles spent idle waiting for input grad finish: " << format_value(total_waiting_for_ig_comm / FREQ ) << std::endl; data = data + "," + format_value(total_waiting_for_ig_comm / FREQ ); std::cout << "id: " << id << " ,Total cycles spent on fwd pass comm: " << format_value(total_fwd_comm / FREQ ) << std::endl; total_bw = compute_busbw(fwd_pass_comm_type, fwd_pass_group_size, fwd_pass_comm_size, total_fwd_comm); data = data + "," + format_value(total_fwd_comm / FREQ ); data = data + "," + format_value_bs(total_bw.first); data = data + "," + format_value_bs(total_bw.second); std::cout << "id: " << id << " ,Total cycles spent on weight grad comm: " << format_value(total_weight_grad_comm / FREQ ) << std::endl; total_bw = compute_busbw(weight_grad_comm_type, weight_grad_group_size, weight_grad_comm_size, total_weight_grad_comm); data = data + "," + format_value(total_weight_grad_comm / FREQ ); data = data + "," + format_value_bs(total_bw.first); data = data + "," + format_value_bs(total_bw.second); std::cout << "id: " << id << " ,Total cycles spent on input grad comm: " << format_value(total_input_grad_comm / FREQ ) << std::endl; total_bw = compute_busbw(input_grad_comm_type, input_grad_group_size, input_grad_comm_size, total_input_grad_comm); data = data + "," + format_value(total_input_grad_comm / FREQ ); data = data + "," + format_value_bs(total_bw.first); data = data + "," + format_value_bs(total_bw.second); // data = data + "," + format_value(((double)Sys::boostedTick()) / FREQ ); EndToEnd->write_line(data); if (layer_num == workload->SIZE - 1) { if (param->mode != ModeType::ANALYTICAL) { total_exposed = (((double)Sys::boostedTick()) / FREQ ) - total_compute; } //pp commtime Tick Expose_PP_time = (2 * vpp * GA * (pp_commsize * GBps / (param->net_work_param.pp) * 1e9) / FREQ ); Expose_PP_time *= (1-param->net_work_param.pp_overlap_ratio) ; //pp bubble time pre_bubble_time *= static_cast<double>(PP_size - 1) / (GA * vpp); //total time double total_time = total_compute + total_exposed + pre_bubble_time + Expose_PP_time; auto format_percentage = [&](double value) { double percentage = (value / total_time) * 100; std::ostringstream stream; stream << std::fixed << std::setprecision(2) << percentage; return stream.str() + "%"; }; std::string file_name = param->res; size_t last_slash_pos = param->res.find_last_of('/'); std::string result; if (last_slash_pos != std::string::npos) { file_name = param->res.substr(last_slash_pos + 1); // 取 '/' 后面的部分 } std::string keys = "File name, Expose DP comm, Expose DP_EP comm, Expose TP comm, Expose_EP_comm, Expose_PP_comm, bubble time, total comp, total exposed comm, Total time"; std::string values = file_name + ", " + format_value(DP_comm) + " (" + format_percentage(DP_comm) + "), " + format_value(DP_EP_comm) + " (" + format_percentage(DP_EP_comm) + "), " + format_value(Expose_TP_comm) + " (" + format_percentage(Expose_TP_comm) + "), " + format_value(Expose_EP_comm) + " (" + format_percentage(Expose_EP_comm) + "), " + format_value(Expose_PP_time) + " (" + format_percentage(Expose_PP_time) + "), " + format_value(pre_bubble_time) + " (" + format_percentage(pre_bubble_time) + "), " + format_value(total_compute) + " (" + format_percentage(total_compute) + "), " + format_value(total_exposed) + " (" + format_percentage(total_exposed) + "), " + format_value(total_time); data = keys + "\n" + values; EndToEnd->write_res(data); if(param->net_work_param.visual){ std::string chart_path = EndToEnd->path; std::ofstream htmlFile(chart_path + "chart.html"); std::string file_name = getFileName(chart_path); htmlFile << "<!DOCTYPE html>\n"; htmlFile << "<html>\n<head>\n"; htmlFile << "<script src=\"https://cdn.jsdelivr.net/npm/chart.js\"></script>\n"; htmlFile << "<style>\n"; htmlFile << "body { display: flex; flex-direction: column; justify-content: center; align-items: center; height: 50vh; margin: 0; padding-top: 10%; }\n"; htmlFile << "canvas { width: 50%; max-width: 400px; height: auto; }\n"; htmlFile << "h2 { margin: 5px 0; }\n"; htmlFile << "</style>\n"; htmlFile << "</head>\n<body>\n"; htmlFile << "<canvas id=\"myPieChart\"></canvas>\n"; htmlFile << "<h2>Total Time: " << to_string(total_time) << " ns</h2>\n"; htmlFile << "<h2>model: " << file_name << " </h2>\n"; htmlFile << "<script>\n"; htmlFile << "var ctx = document.getElementById('myPieChart').getContext('2d');\n"; htmlFile << "var myPieChart = new Chart(ctx, {\n"; htmlFile << " type: 'pie',\n"; htmlFile << " data: {\n"; htmlFile << " labels: ['Expose DP comm', 'Expose DP_EP comm','Expose TP comm', 'Expose_EP_comm','Total compute', 'PP Bubble time', 'Expose PP comm'],\n"; htmlFile << " datasets: [{\n"; htmlFile << " data: [" << DP_comm << ", " << DP_EP_comm << ", " << Expose_TP_comm << ", " << Expose_EP_comm << ", " << total_compute << ", " << pre_bubble_time << ", " << Expose_PP_time << "],\n"; htmlFile << " backgroundColor: ['#FF6384', '#36A2EB', '#FFCE56', '#4BC0C0', '#9966FF', '#FF9F40','#FF5733'],\n"; htmlFile << " }]\n"; htmlFile << " },\n"; htmlFile << " options: {\n"; htmlFile << " responsive: true,\n"; htmlFile << " maintainAspectRatio: true,\n"; htmlFile << " plugins: {\n"; htmlFile << " tooltip: {\n"; htmlFile << " callbacks: {\n"; htmlFile << " label: function(context) {\n"; htmlFile << " var label = context.label || '';\n"; htmlFile << " if (label) {\n"; htmlFile << " label += ': ';\n"; htmlFile << " }\n"; htmlFile << " if (context.parsed !== null) {\n"; htmlFile << " label += context.parsed + ' ns';\n"; htmlFile << " }\n"; htmlFile << " return label;\n"; htmlFile << " }\n"; htmlFile << " }\n"; htmlFile << " }\n"; htmlFile << " }\n"; htmlFile << " }\n"; htmlFile << "});\n"; htmlFile << "</script>\n"; htmlFile << "</body>\n</html>"; htmlFile.close(); std::cout << "HTML file created" << std::endl; } } } return layerData; } static std::pair<int, int> binarySearch(const std::vector<long>& arr, long target) { int low = 0; int high = arr.size() - 1; int leftIndex = -1, rightIndex = -1; while (low <= high) { int mid = low + (high - low) / 2; if (arr[mid] < target) { low = mid + 1; leftIndex = mid; } else if (arr[mid] > target) { high = mid - 1; rightIndex = mid; } else { leftIndex = mid; rightIndex = mid; break; } } return std::make_pair(leftIndex, rightIndex); } Tick Layer::compute_time( ComType comtype, int tp_size, int nranks, uint64_t data_size, MockNccl::GroupType group_type, int all_gpus, int ep_size) { UserParam* param = UserParam::getInstance(); Tick comp_time = 0; if (comtype == ComType::None) { return 0; } bool DP_comm_inside = false; bool TP_comm_inside = false; bool EP_comm_inside = false; int n_ranks; int nnics; uint32_t gpus_per_server = param->net_work_param.gpus_per_server; GPUType gpu_type = param->net_work_param.gpu_type; float tp_ar = param->net_work_param.tp_ar; float tp_ag = param->net_work_param.tp_ag; float tp_ata = param->net_work_param.tp_ata; float ep_ata = param->net_work_param.ep_ata; float dp_ag = param->net_work_param.dp_ag; float ep_ag = param->net_work_param.ep_ag; float dp_ar = param->net_work_param.dp_ar; float ep_ar = param->net_work_param.ep_ar; if (group_type == MockNccl::GroupType::TP || group_type == MockNccl::GroupType::EP) { n_ranks = tp_size; if (n_ranks <= gpus_per_server) TP_comm_inside = true; } else if ( group_type == MockNccl::GroupType::DP || group_type == MockNccl::GroupType::EP || group_type == MockNccl::GroupType::DP_EP) { n_ranks = nranks; nnics = gpus_per_server / tp_size; if (all_gpus == gpus_per_server && tp_size <= gpus_per_server) DP_comm_inside = true; } if (TP_comm_inside || DP_comm_inside) { if (comtype == ComType::All_Reduce) { comp_time = data_size * GBps / tp_ar * 1e9 * 2 * //tp2 ep8 164.8 tp16 218 (nranks - 1) / (nranks / 1.0); } else if (group_type == MockNccl::GroupType::TP && ( comtype == ComType::All_Gather || comtype == ComType::Reduce_Scatter )) { comp_time = data_size * GBps / tp_ag * 1e9 * (nranks - 1) / (nranks / 1.0); } else if (group_type == MockNccl::GroupType::TP && ( comtype == ComType::All_to_All)) { comp_time = data_size * GBps / tp_ata * 1e9 * (nranks - 1) / (nranks / 1.0); }else if (group_type == MockNccl::GroupType::EP && comtype == ComType::All_to_All) { comp_time = data_size * GBps / ep_ata * 1e9 * (nranks - 1) / (nranks / 1.0); }else { comp_time = 0; } } else if (!TP_comm_inside && group_type == MockNccl::GroupType::TP) { if (comtype == ComType::All_Reduce) { comp_time = data_size * GBps / tp_ar * 1e9 * 2 * (nranks - 1) / (nranks / 1.0); } else if ( comtype == ComType::All_Gather || comtype == ComType::Reduce_Scatter) { comp_time = data_size * GBps / tp_ag * 1e9 * (nranks - 1) / (nranks / 1.0); } else if ( comtype == ComType::All_to_All) { comp_time = data_size * GBps / tp_ata * 1e9 * (nranks - 1) / (nranks / 1.0); } else { comp_time = 0; } } else if ( !DP_comm_inside && (group_type == MockNccl::GroupType::DP)) { if (comtype == ComType::All_Reduce) { comp_time = data_size * GBps / dp_ar * 1e9 * 2 * (nranks - 1) / (nranks / 1.0); } else if ( comtype == ComType::All_Gather || comtype == ComType::Reduce_Scatter || comtype == ComType::All_to_All) { comp_time = data_size * GBps / dp_ag * 1e9 * //tp2 ep8 48.5 (nranks - 1) / (nranks / 1.0); } else { comp_time = 0; } }else if ( !DP_comm_inside && ( group_type == MockNccl::GroupType::DP_EP)) { if (comtype == ComType::All_Reduce) { comp_time = data_size * GBps / ep_ar* 1e9 * 2 * (nranks - 1) / (nranks / 1.0); } else if ( comtype == ComType::All_Gather || comtype == ComType::Reduce_Scatter || comtype == ComType::All_to_All) { comp_time = data_size * GBps / ep_ag * 1e9 * //tp2 ep8 48.5 (nranks - 1) / (nranks / 1.0); } else { comp_time = 0; } } return comp_time; } std::pair<float,float> Layer::compute_busbw(ComType comtype, int nranks, uint64_t data_size,Tick total_comm){ float algbw = data_size / (total_comm / FREQ) * 1000000 * GBps; float busbw = 0.0; if (comtype == ComType::All_Reduce) { busbw = algbw * 2 * (nranks - 1) / (nranks / 1.0); } else if ( comtype == ComType::All_Gather || comtype == ComType::Reduce_Scatter || comtype == ComType::All_to_All) { busbw = algbw * (nranks - 1) / (nranks / 1.0); } else { busbw = 0.0; } return std::make_pair(algbw,busbw); } void Layer::issue_forward_pass_comm( SchedulingPolicy pref_scheduling, CollectiveBarrier barrier) { MockNcclLog* NcclLog = MockNcclLog::getInstance(); #ifdef ANALYTI fwd_barrier = barrier; if (generator->id == 0){ NcclLog->writeLog( NcclLogLevel::DEBUG, "forward pass for layer %s is analytical ", id.c_str()); NcclLog->writeLog( NcclLogLevel::DEBUG, "forward pass for layer-id %d is analytical ", layer_num); } if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; #endif DataSet* fp = NULL; fwd_barrier = barrier; collective_counter++; if (fwd_pass_comm_type == ComType::All_Reduce) { #ifdef PHY_MTP fp = generator->generate_all_reduce( fwd_pass_comm_size, fwd_pass_comm_involved_dimensions, pref_scheduling, layer_num, EventType::Fwd_Comm_Finished, this); #else fp = generator->generate_all_reduce( fwd_pass_comm_size, fwd_pass_comm_involved_dimensions, pref_scheduling, layer_num); #endif if (!fp->active) { if (generator->id == 0) { std::cout << "info: all dims disabled, no forward pass collective for layer: " << id << std::endl; } collective_counter--; delete fp; if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } if (generator->id == 0) { std::cout << "info: all-reduce forward pass collective issued for layer: " << id << ","; print_involved_dimensions(fwd_pass_comm_involved_dimensions); } } else if (fwd_pass_comm_type == ComType::All_to_All) { #ifdef PHY_MTP fp = generator->generate_all_to_all( fwd_pass_comm_size, fwd_pass_comm_involved_dimensions, pref_scheduling, layer_num, EventType::Fwd_Comm_Finished, this); #else fp = generator->generate_all_to_all( fwd_pass_comm_size, fwd_pass_comm_involved_dimensions, pref_scheduling, layer_num); #endif if (!fp->active) { if (generator->id == 0) { std::cout << "info: all dims disabled, no forward pass collective for layer: " << id << std::endl; } collective_counter--; delete fp; if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } if (generator->id == 0) { std::cout << "info: all-to-all forward pass collective issued for layer: " << id << ","; print_involved_dimensions(fwd_pass_comm_involved_dimensions); } } else if (fwd_pass_comm_type == ComType::All_Gather) { #ifdef PHY_MTP fp = generator->generate_all_gather( fwd_pass_comm_size, fwd_pass_comm_involved_dimensions, pref_scheduling, layer_num, EventType::Fwd_Comm_Finished, this); #else fp = generator->generate_all_gather( fwd_pass_comm_size, fwd_pass_comm_involved_dimensions, pref_scheduling, layer_num); #endif if (!fp->active) { if (generator->id == 0) { std::cout << "info: all dims disabled, no forward pass collective for layer: " << id << std::endl; } collective_counter--; delete fp; if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } if (generator->id == 0) { std::cout << "info: all-gather forward pass collective issued for layer: " << id << ","; print_involved_dimensions(fwd_pass_comm_involved_dimensions); } } else if (fwd_pass_comm_type == ComType::Reduce_Scatter) { #ifdef PHY_MTP fp = generator->generate_reduce_scatter( fwd_pass_comm_size, fwd_pass_comm_involved_dimensions, pref_scheduling, layer_num, EventType::Fwd_Comm_Finished, this); #else fp = generator->generate_reduce_scatter( fwd_pass_comm_size, fwd_pass_comm_involved_dimensions, pref_scheduling, layer_num); #endif if (!fp->active) { if (generator->id == 0) { std::cout << "info: all dims disabled, no forward pass collective for layer: " << id << std::endl; } collective_counter--; delete fp; if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } if (generator->id == 0) { std::cout << "info: reduce-scatter forward pass collective issued for layer: " << id << ","; print_involved_dimensions(fwd_pass_comm_involved_dimensions); } } else if (fwd_pass_comm_type == ComType::None) { collective_counter--; if (generator->id == 0) { std::cout << "info: no forward pass collective for layer: " << id << std::endl; } if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } else { Sys::sys_panic("no known collective operation! "); } #ifndef PHY_MTP fwd_pass_datasets[fp->my_id] = fp; fp->set_notifier(this, EventType::Fwd_Comm_Finished); #endif NcclLog->writeLog(NcclLogLevel::DEBUG,"Fwd_Comm_Finished set_notifier success"); } void Layer::issue_input_grad_comm( SchedulingPolicy pref_scheduling, CollectiveBarrier barrier) { MockNcclLog* NcclLog = MockNcclLog::getInstance(); #ifdef ANALYTI ig_barrier = barrier; if (generator->id == 0){ NcclLog->writeLog( NcclLogLevel::DEBUG, "input grad collective for layer %s is analytical ", id.c_str()); NcclLog->writeLog( NcclLogLevel::DEBUG, "input grad collective for layer-id %d is analytical ", layer_num); } if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; #endif DataSet* ig = NULL; ig_barrier = barrier; collective_counter++; if (input_grad_comm_type == ComType::All_Reduce) { #ifdef PHY_MTP ig = generator->generate_all_reduce( input_grad_comm_size, input_grad_comm_involved_dimensions, pref_scheduling, layer_num, EventType::Input_Grad_Comm_Finished, this); #else ig = generator->generate_all_reduce( input_grad_comm_size, input_grad_comm_involved_dimensions, pref_scheduling, layer_num); #endif if (!ig->active) { if (generator->id == 0) { std::cout << "info: all dims disabled, no input grad collective for layer: " << id << std::endl; } collective_counter--; delete ig; if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } if (generator->id == 0) { std::cout << "info: all-reduce input grad collective issued for layer: " << id << ","; print_involved_dimensions(input_grad_comm_involved_dimensions); } } else if (input_grad_comm_type == ComType::All_to_All) { #ifdef PHY_MTP ig = generator->generate_all_to_all( input_grad_comm_size, input_grad_comm_involved_dimensions, pref_scheduling, layer_num, EventType::Input_Grad_Comm_Finished, this); #else ig = generator->generate_all_to_all( input_grad_comm_size, input_grad_comm_involved_dimensions, pref_scheduling, layer_num); #endif if (!ig->active) { if (generator->id == 0) { std::cout << "info: all dims disabled, no input grad collective for layer: " << id << std::endl; } collective_counter--; delete ig; if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } if (generator->id == 0) { std::cout << "info: all-to-all input grad collective issued for layer: " << id << ","; print_involved_dimensions(input_grad_comm_involved_dimensions); } } else if (input_grad_comm_type == ComType::All_Gather) { #ifdef PHY_MTP ig = generator->generate_all_gather( input_grad_comm_size, input_grad_comm_involved_dimensions, pref_scheduling, layer_num, EventType::Input_Grad_Comm_Finished, this); #else ig = generator->generate_all_gather( input_grad_comm_size, input_grad_comm_involved_dimensions, pref_scheduling, layer_num); #endif if (!ig->active) { if (generator->id == 0) { std::cout << "info: all dims disabled, no input grad collective for layer: " << id << std::endl; } collective_counter--; delete ig; if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } if (generator->id == 0) { std::cout << "info: all-gather input grad collective issued for layer: " << id << ","; print_involved_dimensions(input_grad_comm_involved_dimensions); } } else if (input_grad_comm_type == ComType::Reduce_Scatter) { #ifdef PHY_MTP ig = generator->generate_reduce_scatter( input_grad_comm_size, input_grad_comm_involved_dimensions, pref_scheduling, layer_num, EventType::Input_Grad_Comm_Finished, this); #else ig = generator->generate_reduce_scatter( input_grad_comm_size, input_grad_comm_involved_dimensions, pref_scheduling, layer_num); #endif if (!ig->active) { if (generator->id == 0) { std::cout << "info: all dims disabled, no input grad collective for layer: " << id << std::endl; } collective_counter--; delete ig; if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } if (generator->id == 0) { std::cout << "info: reduce-scatter input grad collective issued for layer: " << id << ","; print_involved_dimensions(input_grad_comm_involved_dimensions); } } else if (input_grad_comm_type == ComType::None) { collective_counter--; if (generator->id == 0) { std::cout << "info: no input grad collective for layer: " << id << std::endl; } if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } else { std::cout << "no known collective operation! for layer: " << id << std::endl; Sys::sys_panic("no known collective operation! "); } #ifndef PHY_MTP input_grad_datasets[ig->my_id] = ig; ig->set_notifier(this, EventType::Input_Grad_Comm_Finished); #endif } void Layer::issue_weight_grad_comm( SchedulingPolicy pref_scheduling, CollectiveBarrier barrier) { MockNcclLog* NcclLog = MockNcclLog::getInstance(); #ifdef ANALYTI wg_barrier = barrier; if (generator->id == 0){ NcclLog->writeLog( NcclLogLevel::DEBUG, "weight grad collective for layer %s is analytical ", id.c_str()); NcclLog->writeLog( NcclLogLevel::DEBUG, "weight grad collective for layer-id %d is analytical ", layer_num); } if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; #endif DataSet* wg = NULL; wg_barrier = barrier; collective_counter++; if (weight_grad_comm_type == ComType::All_Reduce) { #ifdef PHY_MTP wg = generator->generate_all_reduce( weight_grad_comm_size, weight_grad_comm_involved_dimensions, pref_scheduling, layer_num, EventType::Wight_Grad_Comm_Finished, this); #else wg = generator->generate_all_reduce( weight_grad_comm_size, weight_grad_comm_involved_dimensions, pref_scheduling, layer_num); #endif if (!wg->active) { if (generator->id == 0) { std::cout << "info: all dims disabled, no weight grad collective for layer: " << id << std::endl; } collective_counter--; delete wg; if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } if (generator->id == 0) { std::cout << "info: all-reduce weight grad collective issued for layer: " << id << " with size: " << weight_grad_comm_size << ","; print_involved_dimensions(weight_grad_comm_involved_dimensions); } } else if (weight_grad_comm_type == ComType::All_to_All) { #ifdef PHY_MTP wg = generator->generate_all_to_all( weight_grad_comm_size, weight_grad_comm_involved_dimensions, pref_scheduling, layer_num, EventType::Wight_Grad_Comm_Finished, this); #else wg = generator->generate_all_to_all( weight_grad_comm_size, weight_grad_comm_involved_dimensions, pref_scheduling, layer_num); #endif if (!wg->active) { if (generator->id == 0) { std::cout << "info: all dims disabled, no weight grad collective for layer: " << id << std::endl; } collective_counter--; delete wg; if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } if (generator->id == 0) { std::cout << "info: all-to-all weight grad collective issued for layer: " << id << " with size: " << weight_grad_comm_size << ","; print_involved_dimensions(weight_grad_comm_involved_dimensions); } } else if (weight_grad_comm_type == ComType::All_Gather) { if(generator->id == 0) std::cout << "Layer issue wg all gather at tick: " << Sys::boostedTick() << std::endl; #ifdef PHY_MTP wg = generator->generate_all_gather( weight_grad_comm_size, weight_grad_comm_involved_dimensions, pref_scheduling, layer_num, EventType::Wight_Grad_Comm_Finished, this); #else wg = generator->generate_all_gather( weight_grad_comm_size, weight_grad_comm_involved_dimensions, pref_scheduling, layer_num); #endif if (!wg->active) { if (generator->id == 0) { std::cout << "info: all dims disabled, no weight grad collective for layer: " << id << std::endl; } collective_counter--; delete wg; if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } if (generator->id == 0) { std::cout << "info: all-gather weight grad collective issued for layer: " << id << ","; print_involved_dimensions(weight_grad_comm_involved_dimensions); } } else if (weight_grad_comm_type == ComType::Reduce_Scatter) { #ifdef PHY_MTP wg = generator->generate_reduce_scatter( weight_grad_comm_size, weight_grad_comm_involved_dimensions, pref_scheduling, layer_num, EventType::Wight_Grad_Comm_Finished, this); #else wg = generator->generate_reduce_scatter( weight_grad_comm_size, weight_grad_comm_involved_dimensions, pref_scheduling, layer_num); #endif if (!wg->active) { if (generator->id == 0) { std::cout << "info: all dims disabled, no weight grad collective for layer: " << id << std::endl; } collective_counter--; delete wg; if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } if (generator->id == 0) { std::cout << "info: reduce-scatter weight grad collective issued for layer: " << id << ","; print_involved_dimensions(weight_grad_comm_involved_dimensions); } } else if (weight_grad_comm_type == ComType::None) { collective_counter--; if (generator->id == 0) { std::cout << "info: no weight grad collective for layer: " << id << std::endl; } if (barrier == CollectiveBarrier::Blocking) { workload->call(EventType::General, NULL); } return; } else { Sys::sys_panic("no known collective operation! "); } #ifndef PHY_MTP weight_grad_datasets[wg->my_id] = wg; wg->set_notifier(this, EventType::Wight_Grad_Comm_Finished); #endif } } // namespace AstraSim