astra-sim-alibabacloud/astra-sim/system/Sys.hh (377 lines of code) (raw):

/****************************************************************************** This source code is licensed under the MIT license found in the LICENSE file in the root directory of this source tree. *******************************************************************************/ #ifndef __SYSTEM_HH__ #define __SYSTEM_HH__ #include <assert.h> #include <math.h> #include <algorithm> #include <chrono> #include <cstdint> #include <ctime> #include <fstream> #include <list> #include <map> #include <sstream> #include <tuple> #include <vector> #include "AstraMemoryAPI.hh" #include "AstraNetworkAPI.hh" #include "Callable.hh" #include "CollectivePhase.hh" #include "Common.hh" #include "SendPacketEventHandlerData.hh" #include "UsageTracker.hh" #include "astra-sim/system/MockNcclChannel.h" #include "astra-sim/system/topology/RingTopology.hh" #include "astra-sim/workload/Workload.hh" #ifdef NS3_MTP #include "ns3/mtp-interface.h" #endif #include <atomic> #include "astra-sim/system/MockNcclGroup.h" namespace AstraSim { class MemBus; class BaseStream; class StreamBaseline; class DataSet; class SimSendCaller; class SimRecvCaller; class QueueLevels; class Workload; class LogicalTopology; class BasicLogicalTopology; class OfflineGreedy; enum ParallelStrategy { TP, DP, PP, EP, DP_EP, NONE }; class Sys : public Callable { public: class SchedulerUnit { public: Sys* sys; int ready_list_threshold; int queue_threshold; int max_running_streams; std::map<int, int> running_streams; std::map<int, std::list<BaseStream*>::iterator> stream_pointer; std::vector<Tick> latency_per_dimension; std::vector<double> total_chunks_per_dimension; std::vector<uint64_t> total_active_chunks_per_dimension; std::map<int, int> queue_id_to_dimension; std::vector<UsageTracker> usage; SchedulerUnit( Sys* sys, std::vector<int> queues, int max_running_streams, int ready_list_threshold, int queue_threshold); void notify_stream_removed(int vnet, Tick running_time); void notify_stream_added(int vnet); void notify_stream_added_into_ready_list(); std::vector<double> get_average_latency_per_dimension(); }; SchedulerUnit* scheduler_unit; ~Sys(); AstraNetworkAPI* NI; AstraMemoryAPI* MEM; int finished_workloads; int id; int npu_offset; int nvswitch_id; int num_gpus; std::vector<int>NVSwitchs; int ngpus_per_node; GPUType gpu_type; std::vector<CollectiveImplementation*> all_reduce_implementation_per_dimension; std::vector<CollectiveImplementation*> reduce_scatter_implementation_per_dimension; std::vector<CollectiveImplementation*> all_gather_implementation_per_dimension; std::vector<CollectiveImplementation*> all_to_all_implementation_per_dimension; CollectiveOptimization collectiveOptimization; std::chrono::high_resolution_clock::time_point start_sim_time; std::chrono::high_resolution_clock::time_point end_sim_time; std::list<Callable*> registered_for_finished_stream_event; std::vector<int> physical_dims; std::vector<int> all_gpus; std::vector<int> queues_per_dim; int max_running; int concurrent_streams; int active_first_phase; int dim_to_break; std::vector<int> logical_broken_dims; int priority_counter; bool boost_mode; bool rendezvous_enabled; bool initialized; int processing_latency; int communication_delay; int preferred_dataset_splits; int num_channels; float compute_scale; float comm_scale; float injection_scale; int local_reduction_delay; uint64_t pending_events; std::string method; Workload* workload; MemBus* memBus; int all_queues; std::list<BaseStream*> ready_list; #ifdef PHY_MTP std::list<StreamBaseline*> running_list; #endif SchedulingPolicy scheduling_policy; int first_phase_streams; int total_running_streams; std::map<int, std::list<BaseStream*>> active_Streams; std::map<int, std::list<int>> stream_priorities; QueueLevels* vLevels; std::map<std::string, LogicalTopology*> logical_topologies; std::map<Tick, std::list<std::tuple<Callable*, EventType, CallData*>>> event_queue; int total_nodes; static Tick offset; static std::vector<Sys*> all_generators; static uint8_t* dummy_data; uint64_t streams_injected; uint64_t streams_finished; int stream_counter; bool enabled; std::string inp_scheduling_policy; std::string inp_all_reduce_implementation; std::string inp_reduce_scatter_implementation; std::string inp_all_gather_implementation; std::string inp_all_to_all_implementation; std::string inp_collective_optimization; float inp_L; float inp_o; float inp_g; float inp_G; int inp_model_shared_bus; int active_chunks_per_dimension; bool model_shared_bus; int inp_boost_mode; IntraDimensionScheduling intra_dimension_scheduling; InterDimensionScheduling inter_dimension_scheduling; int round_robin_inter_dimension_scheduler; OfflineGreedy* offline_greedy; Tick last_scheduled_collective; void register_for_finished_stream(Callable* callable); void increase_finished_streams(int amount); void zero_latecy_register_event( Callable* callable, EventType event, CallData* callData, int cycles); void register_event( Callable* callable, EventType event, CallData* callData, int cycles); void insert_into_ready_list(BaseStream* stream); #ifdef PHY_MTP void insert_into_running_list(StreamBaseline* stream); #endif void schedule(int num); void register_phases( BaseStream* stream, std::list<CollectivePhase> phases_to_go); void call(EventType type, CallData* data); void try_register_event( Callable* callable, EventType event, CallData* callData, Tick& cycles); void call_events(); void workload_finished() { finished_workloads++; }; static Tick boostedTick(); static void exiting(); int nextPowerOf2(int n); static void sys_panic(std::string msg); void exitSimLoop(std::string msg); bool seprate_log; std::map<std::pair<int, int>, std::list<SimSendCaller*>> pending_sends; std::map<std::pair<int, int>, bool> is_there_pending_sends; Sys(AstraNetworkAPI* NI, AstraMemoryAPI* MEM, int id, int npu_offset, int num_passes, std::vector<int> physical_dims, std::vector<int> queues_per_dim, std::string my_sys, std::string my_workload, float comm_scale, float compute_scale, float injection_scale, int total_stat_rows, int stat_row, std::string path, std::string run_name, bool seprate_log, bool rendezvous_enabledstd, GPUType _gpu_type, std::vector<int> _all_gpus, std::vector<int> _NVSwitchs, int _ngpus_per_node); void iterate(); bool initialize_sys(std::string name); std::string trim(const std::string& str, const std::string& whitespace); bool parse_var(std::string var, std::string value); bool post_process_inputs(); std::vector<CollectiveImplementation*> generate_collective_implementation_from_input(std::string input); int break_dimension(int model_parallel_npu_group); int front_end_sim_send( Tick delay, void* buffer, uint64_t count, int type, int dst, int tag, sim_request* request, void (*msg_handler)(void* fun_arg), void* fun_arg); int front_end_sim_recv( Tick delay, void* buffer, uint64_t count, int type, int src, int tag, sim_request* request, void (*msg_handler)(void* fun_arg), void* fun_arg); int sim_send( Tick delay, void* buffer, uint64_t count, int type, int dst, int tag, sim_request* request, void (*msg_handler)(void* fun_arg), void* fun_arg); int sim_recv( Tick delay, void* buffer, uint64_t count, int type, int src, int tag, sim_request* request, void (*msg_handler)(void* fun_arg), void* fun_arg); int rendezvous_sim_send( Tick delay, void* buffer, uint64_t count, int type, int dst, int tag, sim_request* request, void (*msg_handler)(void* fun_arg), void* fun_arg); int rendezvous_sim_recv( Tick delay, void* buffer, uint64_t count, int type, int src, int tag, sim_request* request, void (*msg_handler)(void* fun_arg), void* fun_arg); Tick mem_read(uint64_t bytes); Tick mem_write(uint64_t bytes); static int get_layer_numbers(std::string workload_input); std::vector<std::string> split_string(std::string str, std::string sep); DataSet* generate_all_reduce( uint64_t size, std::vector<bool> involved_dimensions, SchedulingPolicy pref_scheduling, int layer, EventType event = EventType::NONE, Callable* layer_ptr = nullptr); DataSet* generate_all_to_all( uint64_t size, std::vector<bool> involved_dimensions, SchedulingPolicy pref_scheduling, int layer, EventType event = EventType::NONE, Callable* layer_ptr = nullptr); DataSet* generate_all_gather( uint64_t size, std::vector<bool> involved_dimensions, SchedulingPolicy pref_scheduling, int layer, EventType event = EventType::NONE, Callable* layer_ptr = nullptr); DataSet* generate_reduce_scatter( uint64_t size, std::vector<bool> involved_dimensions, SchedulingPolicy pref_scheduling, int layer, EventType event = EventType::NONE, Callable* layer_ptr = nullptr); DataSet* generate_collective( uint64_t size, int layer_num, LogicalTopology* topology, std::vector<CollectiveImplementation*> implementation_per_dimension, std::vector<bool> dimensions_involved, ComType collective_type, SchedulingPolicy pref_scheduling, EventType event = EventType::NONE, Callable* layer_ptr = nullptr); CollectivePhase generate_collective_phase( ComType collective_type, int layer_num, BasicLogicalTopology* topology, uint64_t data_size, int queue_id, RingTopology::Direction direction, InjectionPolicy injection_policy, CollectiveImplementation* collective_implementation, bool boost_mode); void insert_stream(std::list<BaseStream*>* queue, BaseStream* baseStream); void proceed_to_next_vnet_baseline(StreamBaseline* stream); uint64_t determine_chunk_size(uint64_t size, ComType type); int get_priority(SchedulingPolicy pref_scheduling); static void handleEvent(void* arg); timespec_t generate_time(int cycles); class sysCriticalSection { public: inline sysCriticalSection () { while (g_sys_inCriticalSection.exchange (true, std::memory_order_acquire)) ; } inline void ExitSection() { g_sys_inCriticalSection.store (false, std::memory_order_release); } inline ~sysCriticalSection () { } }; static std::atomic<bool> g_sys_inCriticalSection; std::map<ParallelStrategy,MockNccl::MockNcclComm*> mock_nccl_comms; std::map<std::pair<int,int>, MockNccl::SingleFlow> generate_net_test_flow_model(uint64_t data_size, int nums); std::map<std::pair<int,int>, MockNccl::SingleFlow> generate_nvl_test_flow_model(uint64_t data_size, int nums); std::shared_ptr<void> generate_flow_model(ParallelStrategy comm_ps,uint64_t data_size, ComType collective_type); struct MockNccl::ncclInfo* get_nccl_Info(ParallelStrategy comm_ps, uint64_t data_size, ComType collective_type); bool mock_nccl_comms_init(); bool mock_nccl_grobal_group_init(); }; } // namespace AstraSim #endif