astra-sim-alibabacloud/astra-sim/system/Sys.cc (2,036 lines of code) (raw):

/****************************************************************************** This source code is licensed under the MIT license found in the LICENSE file in the root directory of this source tree. *******************************************************************************/ #include "Sys.hh" #include "BaseStream.hh" #include "DataSet.hh" #include "MemBus.hh" #include "QueueLevels.hh" #include "SimRecvCaller.hh" #include "SimSendCaller.hh" #include "StreamBaseline.hh" #include "Common.hh" #include "RendezvousRecvData.hh" #include "RendezvousSendData.hh" #include "astra-sim/system/collective/AllToAll.hh" #include "astra-sim/system/collective/DoubleBinaryTreeAllReduce.hh" #include "astra-sim/system/collective/HalvingDoubling.hh" #include "astra-sim/system/collective/Ring.hh" #include "astra-sim/system/collective/NcclTreeFlowModel.hh" #include "astra-sim/system/scheduling/OfflineGreedy.hh" #include "astra-sim/system/topology/BasicLogicalTopology.hh" #include "astra-sim/system/topology/DoubleBinaryTreeTopology.hh" #include "astra-sim/system/topology/GeneralComplexTopology.hh" #include "astra-sim/system/topology/LocalRingGlobalBinaryTree.hh" #include "astra-sim/system/topology/LocalRingNodeA2AGlobalDBT.hh" #include "astra-sim/system/topology/Torus3D.hh" #include "astra-sim/system/MockNcclLog.h" #include "astra-sim/workload/Layer.hh" #include <algorithm> #include <cmath> #include <numeric> MockNccl::MockNcclGroup* GlobalGroup = nullptr; namespace AstraSim { std::atomic<bool> Sys::g_sys_inCriticalSection(false); Tick Sys::offset = 0; uint8_t* Sys::dummy_data = new uint8_t[2]; std::vector<Sys*> Sys::all_generators; Sys::~Sys() { end_sim_time = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast<std::chrono::minutes>( end_sim_time - start_sim_time); if (id == 0) { auto timenow = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); std::cout << "*****" << std::endl << "Time to exit: " << ctime(&timenow) << "all-reduce Collective implementation: " << inp_all_reduce_implementation << std::endl << "reduce-scatter Collective implementation: " << inp_reduce_scatter_implementation << std::endl << "all-gather Collective implementation: " << inp_all_gather_implementation << std::endl << "all-to-all Collective implementation: " << inp_all_to_all_implementation << std::endl << "Collective optimization: " << inp_collective_optimization << std::endl << "Total sim duration: " << duration.count() / 60 << ":" << duration.count() % 60 << " hours" << std::endl << "Total streams injected: " << streams_injected << std::endl << "Total streams finished: " << streams_finished << std::endl << "Percentage of finished streams: " << (((double)streams_finished) / streams_injected) * 100 << " %" << std::endl << "*****" << std::endl; } #ifndef PHY_MTP all_generators[id + npu_offset] = nullptr; for (auto lt : logical_topologies) { delete lt.second; } logical_topologies.clear(); for (auto ci : all_reduce_implementation_per_dimension) { delete ci; } for (auto ci : reduce_scatter_implementation_per_dimension) { delete ci; } for (auto ci : all_gather_implementation_per_dimension) { delete ci; } for (auto ci : all_to_all_implementation_per_dimension) { delete ci; } if (scheduler_unit != nullptr) delete scheduler_unit; if (vLevels != nullptr) delete vLevels; if (memBus != nullptr) delete memBus; if (workload != nullptr) delete workload; if (offline_greedy != nullptr) delete offline_greedy; bool shouldExit = true; for(int i = 0; i < num_gpus; ++ i) { auto& a = all_generators[i]; if (a != nullptr) { shouldExit = false; break; } } if (shouldExit) { exitSimLoop("Exiting"); } #else exitSimLoop("Exiting"); #endif } Sys::Sys( AstraNetworkAPI* NI, AstraMemoryAPI* MEM, int id, int npu_offset, int num_passes, std::vector<int> physical_dims, std::vector<int> queues_per_dim, std::string my_sys, std::string my_workload, float comm_scale, float compute_scale, float injection_scale, int total_stat_rows, int stat_row, std::string path, std::string run_name, bool seprate_log, bool rendezvous_enabled, GPUType _gpu_type, std::vector<int>_all_gpus, std::vector<int>_NVSwitchs, int _ngpus_per_node) { scheduler_unit = nullptr; vLevels = nullptr; memBus = nullptr; workload = nullptr; offline_greedy = nullptr; this->initialized = false; this->intra_dimension_scheduling = IntraDimensionScheduling::FIFO; this->inter_dimension_scheduling = InterDimensionScheduling::Ascending; round_robin_inter_dimension_scheduler = 0; this->last_scheduled_collective = 0; this->dim_to_break = -1; start_sim_time = std::chrono::high_resolution_clock::now(); this->NI = NI; this->MEM = MEM; this->id = id; this->npu_offset=npu_offset; this->method = "baseline"; this->finished_workloads = 0; this->streams_finished = 0; this->streams_injected = 0; this->first_phase_streams = 0; this->total_running_streams = 0; this->priority_counter = 0; this->comm_scale = comm_scale; this->compute_scale = compute_scale; this->injection_scale = injection_scale; this->inp_model_shared_bus = 0; this->inp_boost_mode = 0; this->num_channels = 1; this->processing_latency = 10; this->communication_delay = 10; this->local_reduction_delay = 1; this->active_chunks_per_dimension = 1; this->seprate_log = seprate_log; this->rendezvous_enabled = rendezvous_enabled; this->NVSwitchs = _NVSwitchs; this->all_gpus = _all_gpus; this->gpu_type = _gpu_type; this->ngpus_per_node = _ngpus_per_node; if ((id + npu_offset + 1) > all_generators.size()) { all_generators.resize(id + npu_offset + 1); } all_generators[id+npu_offset] = this; inp_scheduling_policy = "LIFO"; communication_delay = 10 * injection_scale; active_chunks_per_dimension = 1; preferred_dataset_splits = 1; inp_boost_mode = 0; inp_all_reduce_implementation = "NcclFlowModel"; inp_all_gather_implementation = "NcclFlowModel"; inp_reduce_scatter_implementation = "NcclFlowModel"; inp_all_to_all_implementation = "NcclFlowModel"; inp_collective_optimization = "baseline"; bool result = post_process_inputs(); if (result == false) { sys_panic( "Unable to initialize the system layer because the file can not be openned"); } this->pending_events = 0; int total_disabled = 0; this->physical_dims = physical_dims; this->queues_per_dim = queues_per_dim; int element = 0; all_queues = 0; total_nodes = 1; for (int current_dim = 0; current_dim < queues_per_dim.size(); current_dim++) { all_queues += queues_per_dim[current_dim]; bool enabled = !boost_mode; if (id % total_nodes == 0 && id < total_nodes * physical_dims[current_dim]) { enabled = true; } if (!enabled) { total_disabled += queues_per_dim[current_dim]; } if (physical_dims[current_dim] >= 1) { total_nodes *= physical_dims[current_dim]; } for (int j = 0; j < queues_per_dim[current_dim]; j++) { std::list<BaseStream*> temp; active_Streams[element] = temp; std::list<int> pri; stream_priorities[element] = pri; element++; } } if (all_queues == total_disabled) { NI->enabled = false; std::cout << "Node " << id << " has been totally disabled" << std::endl; } concurrent_streams = (int)std::ceil(((double)active_chunks_per_dimension) / queues_per_dim[0]); active_first_phase = 100000000; if (id == 0) { std::cout << "The final active chunks per dimension 1 after allocating to queues is: " << concurrent_streams * queues_per_dim[0] << std::endl; } max_running = 100000000; scheduler_unit = new SchedulerUnit( this, queues_per_dim, max_running, active_first_phase, concurrent_streams); vLevels = new QueueLevels(queues_per_dim, 0, NI->get_backend_type()); logical_topologies["AllReduce"] = new GeneralComplexTopology( id, physical_dims, all_reduce_implementation_per_dimension); logical_topologies["ReduceScatter"] = new GeneralComplexTopology( id, physical_dims, reduce_scatter_implementation_per_dimension); logical_topologies["AllGather"] = new GeneralComplexTopology( id, physical_dims, all_gather_implementation_per_dimension); logical_topologies["AllToAll"] = new GeneralComplexTopology( id, physical_dims, all_to_all_implementation_per_dimension); stream_counter = 0; if (id == 0) { std::atexit(exiting); std::cout << "total nodes: " << total_nodes << std::endl; } NI->sim_init(MEM); memBus = new MemBus( "NPU", "MA", this, inp_L, inp_o, inp_g, inp_G, model_shared_bus, communication_delay, true); workload = new Workload( run_name, this, my_workload, num_passes, total_stat_rows, stat_row, path, this->seprate_log); if (workload->initialized == false) { sys_panic( "Unable to initialize the workload layer because it can not open the workload file"); return; } #if defined(NS3_MTP) || defined(NS3_MPI) || defined(PHY_MTP) result = mock_nccl_grobal_group_init(); if(result == false) { sys_panic( "Unable to initialize the system grobal group because the file can not be openned"); } result = mock_nccl_comms_init(); if (result == false) { sys_panic( "Unable to initialize the system mockncclComm because the file can not be openned"); } #endif if (inter_dimension_scheduling == InterDimensionScheduling::OfflineGreedy || inter_dimension_scheduling == InterDimensionScheduling::OfflineGreedyFlex) { offline_greedy = new OfflineGreedy(this); } this->initialized = true; } int Sys::break_dimension(int model_parallel_npu_group) { if (model_parallel_npu_group == 1) { return -1; } int dimension_to_break = 0; int all_npus = 1; for (; dimension_to_break < physical_dims.size(); dimension_to_break++) { if (all_npus * physical_dims[dimension_to_break] < model_parallel_npu_group) { all_npus *= physical_dims[dimension_to_break]; } else if ( all_npus * physical_dims[dimension_to_break] > model_parallel_npu_group) { for (auto lt : logical_topologies) { delete lt.second; } logical_topologies.clear(); delete scheduler_unit; delete vLevels; std::vector<int>::iterator levelIterator = queues_per_dim.begin(); std::advance(levelIterator, dimension_to_break); queues_per_dim.insert(levelIterator, queues_per_dim[dimension_to_break]); scheduler_unit = new SchedulerUnit( this, queues_per_dim, max_running, active_first_phase, concurrent_streams); vLevels = new QueueLevels(queues_per_dim, 0, NI->get_backend_type()); int first_subdim = model_parallel_npu_group / all_npus; int second_subdim = physical_dims[dimension_to_break] / first_subdim; std::vector<int> logical_dims; for (int dim = 0; dim < physical_dims.size(); dim++) { if (dim != dimension_to_break) { logical_dims.push_back(physical_dims[dim]); } else { logical_dims.push_back(first_subdim); logical_dims.push_back(second_subdim); } } std::vector<CollectiveImplementation*>::iterator it = all_reduce_implementation_per_dimension.begin(); if (all_reduce_implementation_per_dimension.size() > dimension_to_break) { std::advance(it, dimension_to_break); } else { std::advance(it, all_reduce_implementation_per_dimension.size()); } CollectiveImplementation* replicate = (CollectiveImplementation*)(*it)->clone(); all_reduce_implementation_per_dimension.insert(it, replicate); it = reduce_scatter_implementation_per_dimension.begin(); if (reduce_scatter_implementation_per_dimension.size() > dimension_to_break) { std::advance(it, dimension_to_break); } else { std::advance(it, reduce_scatter_implementation_per_dimension.size()); } replicate = (CollectiveImplementation*)(*it)->clone(); reduce_scatter_implementation_per_dimension.insert(it, replicate); it = all_gather_implementation_per_dimension.begin(); if (all_gather_implementation_per_dimension.size() > dimension_to_break) { std::advance(it, dimension_to_break); } else { std::advance(it, all_gather_implementation_per_dimension.size()); } replicate = (CollectiveImplementation*)(*it)->clone(); all_gather_implementation_per_dimension.insert(it, replicate); it = all_to_all_implementation_per_dimension.begin(); if (all_to_all_implementation_per_dimension.size() > dimension_to_break) { std::advance(it, dimension_to_break); } else { std::advance(it, all_to_all_implementation_per_dimension.size()); } replicate = (CollectiveImplementation*)(*it)->clone(); all_to_all_implementation_per_dimension.insert(it, replicate); logical_topologies["AllReduce"] = new GeneralComplexTopology( id, logical_dims, all_reduce_implementation_per_dimension); logical_topologies["ReduceScatter"] = new GeneralComplexTopology( id, logical_dims, reduce_scatter_implementation_per_dimension); logical_topologies["AllGather"] = new GeneralComplexTopology( id, logical_dims, all_gather_implementation_per_dimension); logical_topologies["AllToAll"] = new GeneralComplexTopology( id, logical_dims, all_to_all_implementation_per_dimension); this->logical_broken_dims = logical_dims; this->dim_to_break = dimension_to_break; return dimension_to_break; } else if ( all_npus * physical_dims[dimension_to_break] == model_parallel_npu_group) { return dimension_to_break; } } return -1; } int Sys::get_layer_numbers(std::string workload_input) { return Workload::get_layer_numbers(workload_input); } int Sys::get_priority(SchedulingPolicy pref_scheduling) { if (pref_scheduling == SchedulingPolicy::None) { if (scheduling_policy == SchedulingPolicy::LIFO) { return priority_counter++; } else { return priority_counter--; } } else if (pref_scheduling == SchedulingPolicy::HIGHEST) { return 100000000; } else { if (scheduling_policy == SchedulingPolicy::LIFO) { return priority_counter++; } else { return priority_counter--; } } } int Sys::rendezvous_sim_send( Tick delay, void* buffer, uint64_t count, int type, int dst, int tag, sim_request* request, void (*msg_handler)(void* fun_arg), void* fun_arg) { RendezvousSendData* rsd = new RendezvousSendData( id, this, buffer, count, type, dst, tag, *request, msg_handler, fun_arg); sim_request newReq = *request; uint64_t rendevouz_size = 8192; newReq.dstRank = request->srcRank; newReq.srcRank = request->dstRank; newReq.reqCount = rendevouz_size; int newTag = tag + 500000000; newReq.tag = newTag; sim_recv( delay, buffer, rendevouz_size, type, dst, newTag, &newReq, &Sys::handleEvent, rsd); return 1; } int Sys::sim_send( Tick delay, void* buffer, uint64_t count, int type, int dst, int tag, sim_request* request, void (*msg_handler)(void* fun_arg), void* fun_arg) { if (delay == 0 && fun_arg == nullptr) { Sys::sysCriticalSection cs; SendPacketEventHandlerData* fun_arg_tmp = new SendPacketEventHandlerData(this, id+npu_offset, dst, tag); fun_arg = (void*)fun_arg_tmp; if (is_there_pending_sends.find(std::make_pair(dst, tag)) == is_there_pending_sends.end() || is_there_pending_sends[std::make_pair(dst, tag)] == false) { is_there_pending_sends[std::make_pair(dst, tag)] = true; cs.ExitSection(); } else { if (pending_sends.find(std::make_pair(dst, tag)) == pending_sends.end()) { std::list<SimSendCaller*> tmp; pending_sends[std::make_pair(dst, tag)] = tmp; } pending_sends[std::make_pair(dst, tag)].push_back( new SimSendCaller( this, buffer, count, type, dst, tag, *request, msg_handler, fun_arg)); cs.ExitSection(); return 1; } } if (delay == 0) { NI->sim_send(buffer, count, type, dst, tag, request, msg_handler, fun_arg); } else { try_register_event( new SimSendCaller( this, buffer, count, type, dst, tag, *request, msg_handler, fun_arg), EventType::General, nullptr, delay); } return 1; } int Sys::front_end_sim_send( Tick delay, void* buffer, uint64_t count, int type, int dst, int tag, sim_request* request, void (*msg_handler)(void* fun_arg), void* fun_arg) { if (rendezvous_enabled) { return rendezvous_sim_send( delay, buffer, count, type, dst, tag, request, msg_handler, fun_arg); } else { return sim_send( delay, buffer, count, type, dst, tag, request, msg_handler, fun_arg); } } int Sys::rendezvous_sim_recv( Tick delay, void* buffer, uint64_t count, int type, int src, int tag, sim_request* request, void (*msg_handler)(void* fun_arg), void* fun_arg) { RendezvousRecvData* rrd = new RendezvousRecvData( id, this, buffer, count, type, src, tag, *request, msg_handler, fun_arg); sim_request newReq = *request; uint64_t rendevouz_size = 8192; newReq.dstRank = request->srcRank; newReq.srcRank = request->dstRank; newReq.reqCount = rendevouz_size; int newTag = tag + 500000000; newReq.tag = newTag; sim_send( delay, buffer, rendevouz_size, type, src, newTag, &newReq, &Sys::handleEvent, rrd); return 1; } int Sys::sim_recv( Tick delay, void* buffer, uint64_t count, int type, int src, int tag, sim_request* request, void (*msg_handler)(void* fun_arg), void* fun_arg) { if (delay == 0) { NI->sim_recv(buffer, count, type, src, tag, request, msg_handler, fun_arg); } else { try_register_event( new SimRecvCaller( this, buffer, count, type, src, tag, *request, msg_handler, fun_arg), EventType::General, nullptr, delay); } return 1; } int Sys::front_end_sim_recv( Tick delay, void* buffer, uint64_t count, int type, int src, int tag, sim_request* request, void (*msg_handler)(void* fun_arg), void* fun_arg) { if (rendezvous_enabled) { return rendezvous_sim_recv( delay, buffer, count, type, src, tag, request, msg_handler, fun_arg); } else { return sim_recv( delay, buffer, count, type, src, tag, request, msg_handler, fun_arg); } } Tick Sys::mem_read(uint64_t bytes) { if (MEM == nullptr) { return 10; } uint64_t delay_ns = MEM->npu_mem_read(bytes); Tick delay_cycles = delay_ns / CLOCK_PERIOD; return delay_cycles; } Tick Sys::mem_write(uint64_t bytes) { if (MEM == nullptr) { return 10; } uint64_t delay_ns = MEM->npu_mem_write(bytes); Tick delay_cycles = delay_ns / CLOCK_PERIOD; return delay_cycles; } std::string Sys::trim( const std::string& str, const std::string& whitespace = " \t") { const auto strBegin = str.find_first_not_of(whitespace); if (strBegin == std::string::npos) return ""; const auto strEnd = str.find_last_not_of(whitespace); const auto strRange = strEnd - strBegin + 1; return str.substr(strBegin, strRange); } std::vector<CollectiveImplementation*> Sys:: generate_collective_implementation_from_input(std::string input) { std::vector<std::string> inputs_per_dimension = split_string(input, "_"); std::vector<CollectiveImplementation*> result; for (std::string dimension_input : inputs_per_dimension) { if (dimension_input == "ring") { result.push_back( new CollectiveImplementation(CollectiveImplementationType::Ring)); } else if (dimension_input == "oneRing") { result.push_back( new CollectiveImplementation(CollectiveImplementationType::OneRing)); } else if (dimension_input == "doubleBinaryTree") { result.push_back(new CollectiveImplementation( CollectiveImplementationType::DoubleBinaryTree)); } else if (dimension_input.rfind("direct", 0) == 0) { int window = -1; if (dimension_input != "direct") { window = std::stoi(dimension_input.substr(6, 5)); } result.push_back(new DirectCollectiveImplementation( CollectiveImplementationType::Direct, window)); } else if (dimension_input.rfind("oneDirect", 0) == 0) { int window = -1; if (dimension_input != "oneDirect") { window = std::stoi(dimension_input.substr(9, 5)); } result.push_back(new DirectCollectiveImplementation( CollectiveImplementationType::OneDirect, window)); } else if (dimension_input == "halvingDoubling") { result.push_back(new CollectiveImplementation( CollectiveImplementationType::HalvingDoubling)); } else if (dimension_input == "oneHalvingDoubling") { result.push_back(new CollectiveImplementation( CollectiveImplementationType::OneHalvingDoubling)); } else if(dimension_input == "NcclFlowModel") { result.push_back(new CollectiveImplementation( CollectiveImplementationType::NcclFlowModel)); } else if(dimension_input == "ncclRingTreeModel") { result.push_back(new CollectiveImplementation( CollectiveImplementationType::NcclTreeFlowModel)); } else { sys_panic( "Cannot interpret collective implementations. Please check the collective implementations in the sys" "input file"); } } return result; } bool Sys::parse_var(std::string var, std::string value) { var = trim(var); value = trim(value); if (id == 0) { std::cout << "Var is: " << var << " ,val is: " << value << std::endl; } if (var == "scheduling-policy:") { inp_scheduling_policy = value; } else if (var == "all-reduce-implementation:") { std::stringstream mval(value); mval >> inp_all_reduce_implementation; } else if (var == "reduce-scatter-implementation:") { std::stringstream mval(value); mval >> inp_reduce_scatter_implementation; } else if (var == "all-gather-implementation:") { std::stringstream mval(value); mval >> inp_all_gather_implementation; } else if (var == "all-to-all-implementation:") { std::stringstream mval(value); mval >> inp_all_to_all_implementation; } else if (var == "collective-optimization:") { std::stringstream mval(value); mval >> inp_collective_optimization; } else if (var == "endpoint-delay:") { std::stringstream mval(value); mval >> communication_delay; communication_delay = communication_delay * injection_scale; } else if (var == "local-reduction-delay:") { std::stringstream mval(value); mval >> local_reduction_delay; } else if (var == "active-chunks-per-dimension:") { std::stringstream mval(value); mval >> active_chunks_per_dimension; } else if (var == "L:") { std::stringstream mval(value); mval >> inp_L; } else if (var == "o:") { std::stringstream mval(value); mval >> inp_o; } else if (var == "g:") { std::stringstream mval(value); mval >> inp_g; } else if (var == "G:") { std::stringstream mval(value); mval >> inp_G; } else if (var == "model-shared-bus:") { std::stringstream mval(value); mval >> inp_model_shared_bus; } else if (var == "preferred-dataset-splits:") { std::stringstream mval(value); mval >> preferred_dataset_splits; } else if (var == "boost-mode:") { std::stringstream mval(value); mval >> inp_boost_mode; } else if (var == "intra-dimension-scheduling:") { std::stringstream mval(value); std::string tmp; mval >> tmp; if (tmp == "FIFO") { intra_dimension_scheduling = IntraDimensionScheduling::FIFO; } else if (tmp == "RG") { intra_dimension_scheduling = IntraDimensionScheduling::RG; } else if (tmp == "smallestFirst") { intra_dimension_scheduling = IntraDimensionScheduling::SmallestFirst; } else if (tmp == "lessRemainingPhaseFirst") { intra_dimension_scheduling = IntraDimensionScheduling::LessRemainingPhaseFirst; } else { sys_panic( "unknown value for intra-dimension-scheduling in sys input file"); } } else if (var == "inter-dimension-scheduling:") { std::stringstream mval(value); std::string tmp; mval >> tmp; if (tmp == "ascending") { inter_dimension_scheduling = InterDimensionScheduling::Ascending; } else if (tmp == "offlineGreedy") { inter_dimension_scheduling = InterDimensionScheduling::OfflineGreedy; } else if (tmp == "offlineGreedyFlex") { inter_dimension_scheduling = InterDimensionScheduling::OfflineGreedyFlex; } else if (tmp == "roundRobin") { inter_dimension_scheduling = InterDimensionScheduling::RoundRobin; } else { sys_panic( "unknown value for inter-dimension-scheduling in sys input file"); } } else if (var == "seprate-log:") { std::stringstream mval(value); int int_to_bool; mval >> int_to_bool; if (int_to_bool == 0) { this->seprate_log = false; } else { this->seprate_log = true; } } else if (var != "") { std::cerr << "######### Exiting because " << var << " is an unknown variable. Check your system input file. #########" << std::endl; exit(1); } return true; } bool Sys::post_process_inputs() { all_reduce_implementation_per_dimension = generate_collective_implementation_from_input( inp_all_reduce_implementation); if (all_reduce_implementation_per_dimension.size() == 0) { sys_panic("unknown value for all-reduce-implementation in sys input file"); } reduce_scatter_implementation_per_dimension = generate_collective_implementation_from_input( inp_reduce_scatter_implementation); if (reduce_scatter_implementation_per_dimension.size() == 0) { sys_panic( "unknown value for reduce-scatter-implementation in sys input file"); } all_gather_implementation_per_dimension = generate_collective_implementation_from_input( inp_all_gather_implementation); if (all_gather_implementation_per_dimension.size() == 0) { sys_panic("unknown value for all-gather-implementation in sys input file"); } all_to_all_implementation_per_dimension = generate_collective_implementation_from_input( inp_all_to_all_implementation); if (all_to_all_implementation_per_dimension.size() == 0) { sys_panic("unknown value for all-to-all-implementation in sys input file"); } if (inp_collective_optimization == "baseline") { collectiveOptimization = CollectiveOptimization::Baseline; } else if (inp_collective_optimization == "localBWAware") { collectiveOptimization = CollectiveOptimization::LocalBWAware; } else { sys_panic("unknown value for collective optimization in sys input file"); } if (inp_boost_mode == 1) { boost_mode = true; } else { boost_mode = false; } if (inp_scheduling_policy == "LIFO") { this->scheduling_policy = SchedulingPolicy::LIFO; } else if (inp_scheduling_policy == "FIFO") { this->scheduling_policy = SchedulingPolicy::FIFO; } else { sys_panic("unknown value for scheduling policy in sys input file"); } if (inp_model_shared_bus == 1) { model_shared_bus = true; } else { model_shared_bus = false; } return true; } bool Sys::initialize_sys(std::string name) { std::ifstream inFile; inFile.open(name); if (!inFile) { if (id == 0) { std::cerr << "Unable to open file: " << name << std::endl; std::cerr << "############ Exiting because unable to open the system " "input file ############" << std::endl; std::cerr << "This error is fatal. Please check your path and filename." << std::endl; } exit(1); } else { if (id == 0) { std::cout << "Success in opening system file" << std::endl; } } std::string var; std::string value; while (inFile.peek() != EOF) { var = ""; inFile >> var; if (inFile.peek() != EOF) { inFile >> value; } bool result = parse_var(var, value); if (result == false) { inFile.close(); return result; } } inFile.close(); return post_process_inputs(); } Sys::SchedulerUnit::SchedulerUnit( Sys* sys, std::vector<int> queues, int max_running_streams, int ready_list_threshold, int queue_threshold) { this->sys = sys; this->ready_list_threshold = ready_list_threshold; this->queue_threshold = queue_threshold; this->max_running_streams = max_running_streams; this->latency_per_dimension.resize(queues.size(), 0); this->total_chunks_per_dimension.resize(queues.size(), 0); this->total_active_chunks_per_dimension.resize(queues.size(), 0); int base = 0; int dimension = 0; for (auto q : queues) { for (int i = 0; i < q; i++) { this->running_streams[base] = 0; std::list<BaseStream*>::iterator it; this->stream_pointer[base] = it; this->queue_id_to_dimension[base] = dimension; base++; } dimension++; UsageTracker u(2); usage.push_back(u); } } void Sys::SchedulerUnit::notify_stream_added_into_ready_list() { if (this->sys->first_phase_streams < ready_list_threshold && this->sys->total_running_streams < max_running_streams) { int max = ready_list_threshold - sys->first_phase_streams; if (max > max_running_streams - this->sys->total_running_streams) { max = max_running_streams - this->sys->total_running_streams; } sys->schedule(max); } return; } void Sys::SchedulerUnit::notify_stream_added(int vnet) { if (sys->id == 0 && ++total_active_chunks_per_dimension[queue_id_to_dimension[vnet]] == 1) { usage[queue_id_to_dimension[vnet]].increase_usage(); } stream_pointer[vnet] = sys->active_Streams[vnet].begin(); std::advance(stream_pointer[vnet], running_streams[vnet]); while (stream_pointer[vnet] != sys->active_Streams[vnet].end() && running_streams[vnet] < queue_threshold) { (*stream_pointer[vnet])->init(); running_streams[vnet]++; std::advance(stream_pointer[vnet], 1); } MockNcclLog* NcclLog = MockNcclLog::getInstance(); NcclLog->writeLog(NcclLogLevel::DEBUG,"Sys::SchedulerUnit::notify_stream_added finished"); } void Sys::SchedulerUnit::notify_stream_removed(int vnet, Tick running_time) { if (sys->id == 0 && --total_active_chunks_per_dimension[queue_id_to_dimension[vnet]] == 0) { usage[queue_id_to_dimension[vnet]].decrease_usage(); } running_streams[vnet]--; int dimension = this->queue_id_to_dimension[vnet]; latency_per_dimension[dimension] += running_time; total_chunks_per_dimension[dimension]++; if (this->sys->first_phase_streams < ready_list_threshold && this->sys->total_running_streams < max_running_streams) { int max = ready_list_threshold - sys->first_phase_streams; if (max > max_running_streams - this->sys->total_running_streams) { max = max_running_streams - this->sys->total_running_streams; } sys->schedule(max); } stream_pointer[vnet] = sys->active_Streams[vnet].begin(); std::advance(stream_pointer[vnet], running_streams[vnet]); while (stream_pointer[vnet] != sys->active_Streams[vnet].end() && running_streams[vnet] < queue_threshold) { (*stream_pointer[vnet])->init(); running_streams[vnet]++; std::advance(stream_pointer[vnet], 1); } } std::vector<double> Sys::SchedulerUnit::get_average_latency_per_dimension() { std::vector<double> result; result.resize(latency_per_dimension.size(), -1); for (int i = 0; i < result.size(); i++) { result[i] = latency_per_dimension[i] / total_chunks_per_dimension[i]; } return result; } int Sys::nextPowerOf2(int n) { int count = 0; if (n && !(n & (n - 1))) return n; while (n != 0) { n >>= 1; count += 1; } return 1 << count; } void Sys::sys_panic(std::string msg) { std::cerr << msg << std::endl; exit(1); } void Sys::iterate() { call_events(); } std::vector<std::string> Sys::split_string(std::string str, std::string sep) { char* cstr = const_cast<char*>(str.c_str()); char* current; std::vector<std::string> arr; current = strtok(cstr, sep.c_str()); while (current != nullptr) { arr.push_back(current); current = strtok(nullptr, sep.c_str()); } return arr; } uint64_t Sys::determine_chunk_size(uint64_t size, ComType type) { uint64_t chunk_size = size / preferred_dataset_splits; return chunk_size; } DataSet* Sys::generate_all_reduce( uint64_t size, std::vector<bool> involved_dimensions, SchedulingPolicy pref_scheduling, int layer, EventType event, Callable* layer_ptr) { return generate_collective( size, layer, logical_topologies["AllReduce"], all_reduce_implementation_per_dimension, involved_dimensions, ComType::All_Reduce, pref_scheduling, event, layer_ptr); } DataSet* Sys::generate_all_gather( uint64_t size, std::vector<bool> involved_dimensions, SchedulingPolicy pref_scheduling, int layer, EventType event, Callable* layer_ptr) { return generate_collective( size, layer, logical_topologies["AllGather"], all_gather_implementation_per_dimension, involved_dimensions, ComType::All_Gather, pref_scheduling, event, layer_ptr); } DataSet* Sys::generate_reduce_scatter( uint64_t size, std::vector<bool> involved_dimensions, SchedulingPolicy pref_scheduling, int layer, EventType event, Callable* layer_ptr) { return generate_collective( size, layer, logical_topologies["ReduceScatter"], reduce_scatter_implementation_per_dimension, involved_dimensions, ComType::Reduce_Scatter, pref_scheduling, event, layer_ptr); } DataSet* Sys::generate_all_to_all( uint64_t size, std::vector<bool> involved_dimensions, SchedulingPolicy pref_scheduling, int layer, EventType event, Callable* layer_ptr) { return generate_collective( size, layer, logical_topologies["AllToAll"], all_to_all_implementation_per_dimension, involved_dimensions, ComType::All_to_All, pref_scheduling, event, layer_ptr); } CollectivePhase Sys::generate_collective_phase( ComType collective_type, int layer_num, BasicLogicalTopology* topology, uint64_t data_size, int queue_id, RingTopology::Direction direction, InjectionPolicy injection_policy, CollectiveImplementation* collective_implementation, bool boost_mode) { MockNcclLog* NcclLog = MockNcclLog::getInstance(); if (collective_implementation->type == CollectiveImplementationType::Ring || collective_implementation->type == CollectiveImplementationType::OneRing) { CollectivePhase vn( this, queue_id, new Ring( collective_type, id, layer_num, (RingTopology*)topology, data_size, direction, injection_policy, boost_mode)); return vn; } else if ( collective_implementation->type == CollectiveImplementationType::Direct || collective_implementation->type == CollectiveImplementationType::OneDirect) { CollectivePhase vn( this, queue_id, new AllToAll( collective_type, ((DirectCollectiveImplementation*)collective_implementation) ->direct_collective_window, id, layer_num, (RingTopology*)topology, data_size, direction, InjectionPolicy::Normal, boost_mode)); return vn; } else if ( collective_implementation->type == CollectiveImplementationType::DoubleBinaryTree) { CollectivePhase vn( this, queue_id, new DoubleBinaryTreeAllReduce( id, layer_num, (BinaryTree*)topology, data_size, boost_mode)); return vn; } else if ( collective_implementation->type == CollectiveImplementationType::HalvingDoubling || collective_implementation->type == CollectiveImplementationType::OneHalvingDoubling) { CollectivePhase vn( this, queue_id, new HalvingDoubling( collective_type, id, layer_num, (RingTopology*)topology, data_size, boost_mode)); return vn; } else if(collective_implementation->type == CollectiveImplementationType::NcclFlowModel) { ParallelStrategy comm_ps; if (workload->current_state == Workload::LoopState::Forward_Pass){ comm_ps = static_cast<ParallelStrategy> (workload->layers[workload->index]->fwd_pass_group_type); } else if(workload->current_state == Workload::LoopState::Input_Gradient){ comm_ps = static_cast<ParallelStrategy> (workload->layers[workload->index]->input_grad_group_type); } else if(workload->current_state == Workload::LoopState::Weight_Gradient){ comm_ps = static_cast<ParallelStrategy> (workload->layers[workload->index]->weight_grad_group_type); } MockNccl::ncclInfo *nccl_info; std::shared_ptr<void> ptr_FlowModels; { Sys::sysCriticalSection cs; nccl_info = get_nccl_Info(comm_ps,data_size,collective_type); ptr_FlowModels = generate_flow_model(comm_ps, data_size, collective_type); cs.ExitSection(); } if(nccl_info->algorithm == NCCL_ALGO_RING) { std::shared_ptr<MockNccl::FlowModels> RingFlowModels = std::static_pointer_cast<MockNccl::FlowModels>(ptr_FlowModels); std::map<int,std::map<int,std::vector<int>>> channels; { Sys::sysCriticalSection cs; channels = mock_nccl_comms[comm_ps]->get_rings(); cs.ExitSection(); } NcclLog->writeLog(NcclLogLevel::DEBUG,"rank %d generate FlowModels",id); if(RingFlowModels != nullptr){ NcclLog->writeLog(NcclLogLevel::DEBUG,"rank %d NcclMock generate %d channel and flow model count: %d",id,channels.size(),RingFlowModels->size()); for (auto flow : *RingFlowModels) { int prev; int parent_flow_id; int child_flow_id; if (flow.second.prev.size() == 0) { prev = -1; } else { prev = flow.second.prev[0]; } if (flow.second.child_flow_id.size() == 0) { child_flow_id = -1; } else { child_flow_id = flow.second.child_flow_id[0]; } if (flow.second.parent_flow_id.size() == 0) { parent_flow_id = -1; } else { parent_flow_id = flow.second.parent_flow_id[0]; } NcclLog->writeLog(NcclLogLevel::DEBUG," %d, %d, %d to %d current_flow_id %d prev rank: %d parent_flow_id: %d child_flow_id: %d chunk_id: %d flow_size: %lu chunk_count: %d ",flow.first.first,flow.first.second,flow.second.src,flow.second.dest,flow.second.flow_id,prev,parent_flow_id,child_flow_id,flow.second.chunk_id,flow.second.flow_size,flow.second.chunk_count); } } CollectivePhase vn( this, queue_id, new NcclTreeFlowModel( collective_type, id, layer_num, (RingTopology*)topology, data_size, direction, injection_policy, boost_mode, RingFlowModels, channels.size())); return vn; } else if(nccl_info->algorithm == NCCL_ALGO_TREE) { std::shared_ptr<MockNccl::FlowModels> TreeFlowModels; MockNccl::TreeChannels treechannels; { Sys::sysCriticalSection cs; TreeFlowModels = std::static_pointer_cast<MockNccl::FlowModels>(ptr_FlowModels); treechannels = mock_nccl_comms[comm_ps]->get_treechannels(); cs.ExitSection(); } CollectivePhase vn( this, queue_id, new NcclTreeFlowModel( collective_type, id, layer_num, (RingTopology*)topology, data_size, direction, injection_policy, boost_mode, TreeFlowModels, treechannels.size())); return vn; } else if(nccl_info->algorithm == NCCL_ALGO_NVLS) { collective_type = ComType::All_Reduce_NVLS; std::shared_ptr<MockNccl::FlowModels> RingFlowModels = std::static_pointer_cast<MockNccl::FlowModels>(ptr_FlowModels); MockNccl::TreeChannels treechannels; { Sys::sysCriticalSection cs; treechannels = mock_nccl_comms[comm_ps]->get_treechannels(); cs.ExitSection(); } NcclLog->writeLog(NcclLogLevel::DEBUG,"rank %d generate FlowModels",id); if(RingFlowModels != nullptr){ NcclLog->writeLog(NcclLogLevel::DEBUG,"rank %d NcclMock generate %d channel and flow model count: %d",id,treechannels.size(),RingFlowModels->size()); for (auto flow : *RingFlowModels) { int prev; int parent_flow_id; int child_flow_id; if (flow.second.prev.size() == 0) { prev = -1; } else { prev = flow.second.prev[0]; } if (flow.second.child_flow_id.size() == 0) { child_flow_id = -1; } else { child_flow_id = flow.second.child_flow_id[0]; } if (flow.second.parent_flow_id.size() == 0) { parent_flow_id = -1; } else { parent_flow_id = flow.second.parent_flow_id[0]; } NcclLog->writeLog(NcclLogLevel::DEBUG," %d, %d, %d to %d current_flow_id %d prev rank: %d parent_flow_id: %d child_flow_id: %d chunk_id: %d flow_size: %lu chunk_count: %d ",flow.first.first,flow.first.second,flow.second.src,flow.second.dest,flow.second.flow_id,prev,parent_flow_id,child_flow_id,flow.second.chunk_id,flow.second.flow_size,flow.second.chunk_count); } } CollectivePhase vn( this, queue_id, new NcclTreeFlowModel( collective_type, id, layer_num, (RingTopology*)topology, data_size, direction, injection_policy, boost_mode, RingFlowModels, treechannels.size())); return vn; } } else { std::cerr << "Error: No known collective implementation for collective phase" << std::endl; exit(1); } } std::map<std::pair<int,int>, MockNccl::SingleFlow> Sys:: generate_net_test_flow_model(uint64_t data_size, int nums) { std::map<std::pair<int,int>, MockNccl::SingleFlow> result; MockNccl::SingleFlow tmp; for (int i = 0; i < nums; i++) { tmp.flow_id = i; tmp.src = 0; tmp.dest = 1; tmp.flow_size = data_size; tmp.parent_flow_id = {}; tmp.child_flow_id = {}; tmp.channel_id = 0; result[make_pair(0, i)] = tmp; } return result; } std::map<std::pair<int,int>, MockNccl::SingleFlow> Sys::generate_nvl_test_flow_model(uint64_t data_size, int nums) { std::map<std::pair<int,int>, MockNccl::SingleFlow> result; MockNccl::SingleFlow tmp; for (int i = 0; i < nums; i++) { tmp.flow_id = i; tmp.src = 0; tmp.dest = 1; tmp.flow_size = data_size; tmp.parent_flow_id = {}; tmp.child_flow_id = {}; tmp.channel_id = 0; result[make_pair(0, i)] = tmp; } return result; } bool Sys::mock_nccl_grobal_group_init(){ if (GlobalGroup != nullptr) return true; else { int total_nodes = this->total_nodes; int TP_size = workload->model_parallel_npu_group == 0 ? total_nodes : workload->model_parallel_npu_group; int PP_size = 1; int DP_size = all_gpus[0] / (TP_size * PP_size); int EP_size = workload->expert_parallel_npu_group; int DP_EP_size = DP_size / EP_size; GlobalGroup = new MockNccl::MockNcclGroup(all_gpus[0],ngpus_per_node,TP_size,DP_size,PP_size,EP_size,DP_EP_size,NVSwitchs,gpu_type); return true; } } bool Sys::mock_nccl_comms_init(){ int TP_size = workload->model_parallel_npu_group == 0 ? total_nodes : workload->model_parallel_npu_group; int PP_size = 1; int DP_size = total_nodes / (TP_size * PP_size); int EP_size = workload->expert_parallel_npu_group; int DP_EP_size = DP_size / EP_size; MockNccl::MockNcclComm* pComm; if (TP_size > 1) { pComm = new MockNccl::MockNcclComm(id,MockNccl::GroupType::TP,GlobalGroup); mock_nccl_comms[TP] = pComm; } if(DP_size > 1) { pComm = new MockNccl::MockNcclComm(id,MockNccl::GroupType::DP,GlobalGroup); mock_nccl_comms[DP] = pComm; } if(EP_size > 1 ){ pComm = new MockNccl::MockNcclComm(id,MockNccl::GroupType::EP,GlobalGroup); mock_nccl_comms[EP] = pComm; } if(DP_EP_size > 1){ pComm = new MockNccl::MockNcclComm(id,MockNccl::GroupType::DP_EP,GlobalGroup); mock_nccl_comms[DP_EP] = pComm; } return true; } struct MockNccl::ncclInfo* Sys::get_nccl_Info(ParallelStrategy comm_ps, uint64_t data_size, ComType collective_type) { return mock_nccl_comms[comm_ps]->get_algo_proto_info(data_size, collective_type ); } std::shared_ptr<void> Sys::generate_flow_model(ParallelStrategy comm_ps, uint64_t data_size, ComType collective_type) { MockNccl::MockNcclComm* pComm = mock_nccl_comms[comm_ps]; MockNccl::State current_state; switch (this->workload->current_state) { case Workload::LoopState::Forward_Pass: current_state = MockNccl::State::Forward_Pass; break; case Workload::LoopState::Input_Gradient: current_state = MockNccl::State::Input_Gradient; break; case Workload::LoopState::Weight_Gradient: current_state = MockNccl::State::Weight_Gradient; break; } return pComm->get_flow_model(data_size,collective_type,this->workload->index,current_state); } DataSet* Sys::generate_collective( uint64_t size, int layer_num, LogicalTopology* topology, std::vector<CollectiveImplementation*> implementation_per_dimension, std::vector<bool> dimensions_involved, ComType collective_type, SchedulingPolicy pref_scheduling, EventType event, Callable* layer_ptr ) { uint64_t chunk_size = determine_chunk_size(size, collective_type); if(id == 0) std::cout << "chunk size is: " << chunk_size << " , size is: " << size << " , layer_num is: " << layer_num << " , node: " << id << std::endl; uint64_t recommended_chunk_size = chunk_size; int streams = ceil(((double)size) / chunk_size); int64_t tmp; DataSet* dataset = new DataSet(streams); #ifdef PHY_MTP if (event != EventType::NONE && layer_ptr != nullptr) { dataset->set_notifier(layer_ptr,event); } #endif int pri = get_priority(pref_scheduling); int count = 0; if (id == 0 && (inter_dimension_scheduling == InterDimensionScheduling::OfflineGreedy || inter_dimension_scheduling == InterDimensionScheduling::OfflineGreedyFlex)) { if (last_scheduled_collective != Sys::boostedTick()) { offline_greedy->reset_loads(); last_scheduled_collective = Sys::boostedTick(); } } while (size > 0) { count++; chunk_size=std::min(chunk_size,size); std::vector<int> dim_mapper(topology->get_num_of_dimensions()); std::iota(std::begin(dim_mapper), std::end(dim_mapper), 0); if (collective_type == ComType::All_Gather) { std::reverse(dim_mapper.begin(), dim_mapper.end()); } if (inter_dimension_scheduling == InterDimensionScheduling::RoundRobin) { std::rotate( dim_mapper.begin(), dim_mapper.begin() + round_robin_inter_dimension_scheduler, dim_mapper.end()); round_robin_inter_dimension_scheduler++; if (round_robin_inter_dimension_scheduler == topology->get_num_of_dimensions()) { round_robin_inter_dimension_scheduler = 0; } } else if ( collective_type != ComType::All_to_All && (inter_dimension_scheduling == InterDimensionScheduling::OfflineGreedy || inter_dimension_scheduling == InterDimensionScheduling::OfflineGreedyFlex)) { uint64_t prev_size = size; dim_mapper = offline_greedy->get_chunk_scheduling( stream_counter, size, recommended_chunk_size, dimensions_involved, inter_dimension_scheduling, collective_type); chunk_size = prev_size - size; } if (collective_type == ComType::All_to_All || (inter_dimension_scheduling != InterDimensionScheduling::OfflineGreedy && inter_dimension_scheduling != InterDimensionScheduling::OfflineGreedyFlex)) { size -= chunk_size; } tmp = chunk_size; std::list<CollectivePhase> vect; CollectivePhase phase; if (collective_type != ComType::All_Reduce || collectiveOptimization == CollectiveOptimization::Baseline) { for (int dim = 0; dim < topology->get_num_of_dimensions(); dim++) { if (topology->get_num_of_nodes_in_dimension(dim_mapper[dim]) == 1 || !dimensions_involved[dim_mapper[dim]]) { continue; } std::pair<int, RingTopology::Direction> queue = vLevels->get_next_queue_at_level(dim_mapper[dim]); phase = generate_collective_phase( collective_type, layer_num, topology->get_basic_topology_at_dimension( dim_mapper[dim], collective_type), tmp, queue.first, queue.second, InjectionPolicy::Normal, implementation_per_dimension[dim_mapper[dim]], boost_mode); vect.push_back(phase); tmp = phase.final_data_size; } } else if ( inter_dimension_scheduling == InterDimensionScheduling::OfflineGreedy || inter_dimension_scheduling == InterDimensionScheduling::OfflineGreedyFlex || inter_dimension_scheduling == InterDimensionScheduling::OnlineGreedy) { int dim = 0; for (dim = 0; dim < topology->get_num_of_dimensions(); dim++) { if (topology->get_num_of_nodes_in_dimension(dim_mapper[dim]) == 1 || !dimensions_involved[dim_mapper[dim]]) { continue; } std::pair<int, RingTopology::Direction> queue = vLevels->get_next_queue_at_level(dim_mapper[dim]); phase = generate_collective_phase( ComType::Reduce_Scatter, layer_num, topology->get_basic_topology_at_dimension( dim_mapper[dim], ComType::Reduce_Scatter), tmp, queue.first, queue.second, InjectionPolicy::Normal, implementation_per_dimension[dim_mapper[dim]], boost_mode); vect.push_back(phase); tmp = phase.final_data_size; } dim--; for (; dim >= 0; dim--) { if (topology->get_num_of_nodes_in_dimension(dim_mapper[dim]) == 1 || !dimensions_involved[dim_mapper[dim]]) { continue; } std::pair<int, RingTopology::Direction> queue = vLevels->get_next_queue_at_level(dim_mapper[dim]); phase = generate_collective_phase( ComType::All_Gather, layer_num, topology->get_basic_topology_at_dimension( dim_mapper[dim], ComType::All_Gather), tmp, queue.first, queue.second, InjectionPolicy::Normal, implementation_per_dimension[dim_mapper[dim]], boost_mode); vect.push_back(phase); tmp = phase.final_data_size; } } else { int dim = 0; int last_active_dim = 0; for (dim = 0; dim < topology->get_num_of_dimensions(); dim++) { if (topology->get_num_of_nodes_in_dimension(dim_mapper[dim]) != 1 && dimensions_involved[dim_mapper[dim]]) { last_active_dim = dim; } } for (dim = 0; dim < last_active_dim; dim++) { if (topology->get_num_of_nodes_in_dimension(dim_mapper[dim]) == 1 || !dimensions_involved[dim_mapper[dim]]) { continue; } std::pair<int, RingTopology::Direction> queue = vLevels->get_next_queue_at_level(dim_mapper[dim]); phase = generate_collective_phase( ComType::Reduce_Scatter, layer_num, topology->get_basic_topology_at_dimension( dim_mapper[dim], ComType::Reduce_Scatter), tmp, queue.first, queue.second, InjectionPolicy::Normal, implementation_per_dimension[dim_mapper[dim]], boost_mode); vect.push_back(phase); tmp = phase.final_data_size; } while (dim > 0 && (dimensions_involved[dim_mapper[dim]] == false || topology->get_num_of_nodes_in_dimension(dim_mapper[dim]) == 1)) { dim--; } if (dimensions_involved[dim_mapper[dim]] && topology->get_num_of_nodes_in_dimension(dim_mapper[dim]) > 1) { std::pair<int, RingTopology::Direction> queue = vLevels->get_next_queue_at_level(dim_mapper[dim]); phase = generate_collective_phase( ComType::All_Reduce, layer_num, topology->get_basic_topology_at_dimension( dim_mapper[dim], ComType::All_Reduce), tmp, queue.first, queue.second, InjectionPolicy::Normal, implementation_per_dimension[dim_mapper[dim]], boost_mode); vect.push_back(phase); tmp = phase.final_data_size; } dim--; for (; dim >= 0; dim--) { if (topology->get_num_of_nodes_in_dimension(dim_mapper[dim]) == 1 || !dimensions_involved[dim_mapper[dim]]) { continue; } std::pair<int, RingTopology::Direction> queue = vLevels->get_next_queue_at_level(dim_mapper[dim]); phase = generate_collective_phase( ComType::All_Gather, layer_num, topology->get_basic_topology_at_dimension( dim_mapper[dim], ComType::All_Gather), tmp, queue.first, queue.second, InjectionPolicy::Normal, implementation_per_dimension[dim_mapper[dim]], boost_mode); vect.push_back(phase); tmp = phase.final_data_size; } } if (vect.size() > 0) { StreamBaseline* newStream = new StreamBaseline(this, dataset, stream_counter++, vect, pri); newStream->current_queue_id = -1; #ifdef PHY_MTP insert_into_running_list(newStream); #endif insert_into_ready_list(newStream); MockNcclLog* NcclLog = MockNcclLog::getInstance(); NcclLog->writeLog(NcclLogLevel::DEBUG,"Sys::generate_collective finished"); } else { dataset->active = false; break; } } if (dataset->active) { streams_injected += count; dataset->total_streams = count; } return dataset; } void Sys::call_events() { if(event_queue.find(Sys::boostedTick())==event_queue.end()){ goto FINISH_CHECK; } for (auto& callable : event_queue[Sys::boostedTick()]) { try { pending_events--; (std::get<0>(callable)) ->call(std::get<1>(callable), std::get<2>(callable)); } catch (...) { std::cerr << "warning! a callable is removed before call" << std::endl; } } { Sys::sysCriticalSection cs; if (event_queue[Sys::boostedTick()].size() > 0) { event_queue[Sys::boostedTick()].clear(); } event_queue.erase(Sys::boostedTick()); cs.ExitSection(); } FINISH_CHECK: if ((finished_workloads == 1 && event_queue.size() == 0 && pending_sends.size() == 0) || initialized == false) { delete this; } } void Sys::exitSimLoop(std::string msg) { if(id == 0 ){ std::cout << msg << std::endl; } NI->sim_finish(); return; } Tick Sys::boostedTick() { Sys* ts = all_generators[0]; if (ts == nullptr) { for (int i = 1; i < all_generators.size(); i++) { if (all_generators[i] != nullptr) { ts = all_generators[i]; break; } } } timespec_t tmp = ts->NI->sim_get_time(); Tick tick = tmp.time_val / CLOCK_PERIOD; return tick + offset; } void Sys::proceed_to_next_vnet_baseline(StreamBaseline* stream) { MockNcclLog* NcclLog = MockNcclLog::getInstance(); NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase1, stream->current_queue_id %d stream->phases_to_go.size %d",stream->current_queue_id,stream->phases_to_go.size()); int previous_vnet = stream->current_queue_id; if (stream->steps_finished == 1) { first_phase_streams--; } if (stream->steps_finished != 0) { stream->net_message_latency.back() /= stream->net_message_counter; } if (stream->my_current_phase.algorithm != nullptr) { delete stream->my_current_phase.algorithm; } if (stream->phases_to_go.size() == 0) { stream->take_bus_stats_average(); stream->dataset->notify_stream_finished((StreamStat*)stream); } NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase2"); if (stream->current_queue_id >= 0 && stream->my_current_phase.enabled) { std::list<BaseStream*>& target = active_Streams.at(stream->my_current_phase.queue_id); for (std::list<BaseStream*>::iterator it = target.begin(); it != target.end(); ++it) { if (((StreamBaseline*)(*it))->stream_num == stream->stream_num) { target.erase(it); break; } } } NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase2-1"); if (stream->phases_to_go.size() == 0) { total_running_streams--; if (previous_vnet >= 0) { NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase2-1"); scheduler_unit->notify_stream_removed( previous_vnet, Sys::boostedTick() - stream->last_init); } #ifdef PHY_MTP running_list.pop_front(); #endif NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: delete stream"); delete stream; return; } NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase3"); stream->steps_finished++; stream->current_queue_id = stream->phases_to_go.front().queue_id; stream->current_com_type = stream->phases_to_go.front().comm_type; CollectivePhase vi = stream->phases_to_go.front(); stream->my_current_phase = vi; stream->phases_to_go.pop_front(); stream->test = 0; stream->test2 = 0; stream->initialized = false; stream->last_phase_change = Sys::boostedTick(); stream->total_packets_sent = 0; stream->net_message_latency.push_back(0); stream->net_message_counter = 0; NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase1, stream->current_queue_id %d stream->phases_to_go.size %d",stream->current_queue_id,stream->phases_to_go.size()); if (stream->my_current_phase.enabled) { insert_stream(&active_Streams[stream->current_queue_id], stream); } NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: phase4"); stream->state = StreamState::Ready; if (previous_vnet >= 0) { scheduler_unit->notify_stream_removed( previous_vnet, Sys::boostedTick() - stream->last_init); } #ifdef PHY_MTP ready_list.pop_front(); first_phase_streams++; total_running_streams++; #endif scheduler_unit->notify_stream_added(stream->current_queue_id); NcclLog->writeLog(NcclLogLevel::DEBUG,"proceed_to_next_vnet_baseline :: exit"); } void Sys::exiting() {} void Sys::insert_stream(std::list<BaseStream*>* queue, BaseStream* baseStream) { std::list<BaseStream*>::iterator it = queue->begin(); if (intra_dimension_scheduling == IntraDimensionScheduling::FIFO || baseStream->current_queue_id < 0 || baseStream->current_com_type == ComType::All_to_All || baseStream->current_com_type == ComType::All_Reduce) { while (it != queue->end()) { if ((*it)->initialized == true) { std::advance(it, 1); continue; } else if ((*it)->priority >= baseStream->priority) { std::advance(it, 1); continue; } else { break; } } } else if (intra_dimension_scheduling == IntraDimensionScheduling::RG) { ComType one_to_last = ComType::None; ComType last = ComType::None; while (it != queue->end()) { one_to_last = last; last = (*it)->current_com_type; if ((*it)->initialized == true) { std::advance(it, 1); if (it != queue->end() && (*it)->initialized == false) { one_to_last = last; last = (*it)->current_com_type; std::advance(it, 1); } continue; } else if ((*it)->priority > baseStream->priority) { std::advance(it, 1); continue; } else if ( (last == ComType::Reduce_Scatter && one_to_last == ComType::All_Gather) || (last == ComType::All_Gather && one_to_last == ComType::Reduce_Scatter)) { std::advance(it, 1); continue; } else { break; } } } else if ( intra_dimension_scheduling == IntraDimensionScheduling::SmallestFirst) { while (it != queue->end()) { if ((*it)->initialized == true) { std::advance(it, 1); continue; } else if ( (*it)->my_current_phase.initial_data_size < baseStream->my_current_phase.initial_data_size) { std::advance(it, 1); continue; } else { break; } } } else if ( intra_dimension_scheduling == IntraDimensionScheduling::LessRemainingPhaseFirst) { while (it != queue->end()) { if ((*it)->initialized == true) { std::advance(it, 1); continue; } else if ((*it)->phases_to_go.size() < baseStream->phases_to_go.size()) { std::advance(it, 1); continue; } else { break; } } } queue->insert(it, baseStream); } void Sys::register_for_finished_stream(Callable* callable) { registered_for_finished_stream_event.push_back(callable); } void Sys::increase_finished_streams(int amount) { streams_finished += amount; for (auto c : registered_for_finished_stream_event) { c->call(EventType::StreamsFinishedIncrease, nullptr); } } void Sys::register_phases( BaseStream* stream, std::list<CollectivePhase> phases_to_go) { for (auto& vnet : phases_to_go) { stream_priorities[vnet.queue_id].push_back(stream->stream_num); } } void Sys::zero_latecy_register_event( Callable* callable, EventType event, CallData* callData, int cycles){ Tick mycycles = 0; bool should_schedule = false; { #ifdef NS3_MTP Sys::sysCriticalSection cs; #endif #ifdef PHY_MTP Sys::sysCriticalSection cs; #endif if (event_queue.find(Sys::boostedTick() + mycycles) == event_queue.end()) { std::list<std::tuple<Callable*, EventType, CallData*>> tmp; event_queue[Sys::boostedTick() + mycycles] = tmp; should_schedule = true; } event_queue[Sys::boostedTick() + mycycles].push_back( std::make_tuple(callable, event, callData)); #ifdef NS3_MTP cs.ExitSection(); #endif #ifdef PHY_MTP cs.ExitSection(); #endif } pending_events++; if (should_schedule) { timespec_t tmp = generate_time(mycycles); BasicEventHandlerData* data = new BasicEventHandlerData(this, EventType::CallEvents); this->handleEvent(data); } } void Sys::register_event( Callable* callable, EventType event, CallData* callData, int cycles) { Tick mycycles = cycles; try_register_event(callable, event, callData, mycycles); return; } void Sys::call(EventType type, CallData* data) { if (id == 0 && type == EventType::General) { increase_finished_streams(1); } } void Sys::try_register_event( Callable* callable, EventType event, CallData* callData, Tick& cycles) { bool should_schedule = false; { MockNcclLog* NcclLog = MockNcclLog::getInstance(); NcclLog->writeLog( NcclLogLevel::DEBUG, "try_register_event EventType %d ", event); #ifdef NS3_MTP Sys::sysCriticalSection cs; #endif if (event_queue.find(Sys::boostedTick() + cycles) == event_queue.end()) { std::list<std::tuple<Callable*, EventType, CallData*>> tmp; event_queue[Sys::boostedTick() + cycles] = tmp; should_schedule = true; } event_queue[Sys::boostedTick() + cycles].push_back( std::make_tuple(callable, event, callData)); #ifdef NS3_MTP cs.ExitSection(); #endif } if (should_schedule) { timespec_t tmp = generate_time(cycles); BasicEventHandlerData* data = new BasicEventHandlerData(this, EventType::CallEvents); NI->sim_schedule(tmp, &Sys::handleEvent, data); } cycles = 0; pending_events++; return; } #ifdef PHY_MTP void Sys::insert_into_running_list(StreamBaseline* stream) { running_list.push_back(stream); } #endif void Sys::insert_into_ready_list(BaseStream* stream) { insert_stream(&ready_list, stream); scheduler_unit->notify_stream_added_into_ready_list(); } void Sys::schedule(int num) { MockNcclLog* NcclLog = MockNcclLog::getInstance(); int ready_list_size = ready_list.size(); int counter = std::min(num, ready_list_size); NcclLog->writeLog(NcclLogLevel::DEBUG,"Sys.cc::schedule num %d ready_list_size %d",num,ready_list_size); while (counter > 0) { int top_vn = ready_list.front()->phases_to_go.front().queue_id; int total_waiting_streams = ready_list.size(); int total_phases = ready_list.front()->phases_to_go.size(); proceed_to_next_vnet_baseline((StreamBaseline*)ready_list.front()); #ifndef PHY_MTP if (ready_list.front()->current_queue_id == -1) { Sys::sys_panic( "should not happen! " ); } ready_list.pop_front(); first_phase_streams++; total_running_streams++; #endif counter--; } NcclLog->writeLog(NcclLogLevel::DEBUG,"Sys::shedule finished"); } void Sys::handleEvent(void* arg) { if (arg == nullptr) { return; } BasicEventHandlerData* ehd = (BasicEventHandlerData*)arg; Sys* node = ehd->node; EventType event = ehd->event; MockNcclLog* NcclLog = MockNcclLog::getInstance(); if (event == EventType::CallEvents) { NcclLog->writeLog(NcclLogLevel::DEBUG," Sys::handleEvent EventType::CallEvents"); node->iterate(); delete ehd; } else if (event == EventType::RendezvousSend) { RendezvousSendData* rsd = (RendezvousSendData*)ehd; rsd->send->call(EventType::General, nullptr); delete rsd; } else if (event == EventType::RendezvousRecv) { RendezvousRecvData* rrd = (RendezvousRecvData*)ehd; rrd->recv->call(EventType::General, nullptr); delete rrd; } else if (event == EventType::PacketReceived) { RecvPacketEventHadndlerData* rcehd = (RecvPacketEventHadndlerData*)ehd; StreamBaseline* owner = static_cast<StreamBaseline*>(rcehd->owner); owner->consume(rcehd); delete rcehd; } else if (event == EventType::PacketSent) { SendPacketEventHandlerData* sendhd = (SendPacketEventHandlerData*)ehd; NcclLog->writeLog(NcclLogLevel::DEBUG,"packet sent, sender id: %d, node id: %d",sendhd->senderNodeId,node->id); #ifdef NS3_MTP Sys::sysCriticalSection cs; #endif #ifdef PHY_MTP Sys::sysCriticalSection cs; #endif if(all_generators[sendhd->senderNodeId]== nullptr){ #ifdef NS3_MTP cs.ExitSection(); #endif #ifdef PHY_MTP cs.ExitSection(); #endif goto SEND_HANDLER_END; } if (node->pending_sends.find( std::make_pair(sendhd->receiverNodeId, sendhd->tag)) == node->pending_sends.end() || node->pending_sends[std::make_pair(sendhd->receiverNodeId, sendhd->tag)] .size() == 0) { node->is_there_pending_sends[std::make_pair( sendhd->receiverNodeId, sendhd->tag)] = false; if(node->event_queue.find(Sys::boostedTick())==node->event_queue.end()) if ((node->finished_workloads == 1 && node->event_queue.size() == 0 && node->pending_sends.size() == 0) || node->initialized == false) { delete node; } #ifdef NS3_MTP cs.ExitSection(); #endif #ifdef PHY_MTP cs.ExitSection(); #endif } else { SimSendCaller* simSendCaller = node->pending_sends[std::make_pair(sendhd->receiverNodeId, sendhd->tag)] .front(); node->pending_sends[std::make_pair(sendhd->receiverNodeId, sendhd->tag)] .pop_front(); if(node->pending_sends[std::make_pair(sendhd->receiverNodeId, sendhd->tag)].size() == 0) node->pending_sends.erase(std::make_pair(sendhd->receiverNodeId, sendhd->tag)); #ifdef NS3_MTP cs.ExitSection(); #endif #ifdef PHY_MTP cs.ExitSection(); #endif simSendCaller->call(EventType::General, nullptr); } SEND_HANDLER_END: delete sendhd; }else if(event==EventType::PacketSentFinshed){ AstraSim::SendPacketEventHandlerData* ehd = (AstraSim::SendPacketEventHandlerData*) arg; if(ehd->owner!=nullptr) ehd->owner->sendcallback(ehd); } } AstraSim::timespec_t Sys::generate_time(int cycles) { timespec_t tmp = NI->sim_get_time(); double addition = cycles * ((double)CLOCK_PERIOD); tmp.time_val = addition; return tmp; } } // namespace AstraSim