astra-sim-alibabacloud/astra-sim/system/collective/NcclTreeFlowModel.hh (103 lines of code) (raw):

/* *Copyright (c) 2024, Alibaba Group; *Licensed under the Apache License, Version 2.0 (the "License"); *you may not use this file except in compliance with the License. *You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 *Unless required by applicable law or agreed to in writing, software *distributed under the License is distributed on an "AS IS" BASIS, *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *See the License for the specific language governing permissions and *limitations under the License. */ #ifndef __NCCL_TREE_FLOW_MODEL_HH__ #define __NCCL_TREE_FLOW_MODEL_HH__ #include <assert.h> #include <math.h> #include<set> #include <algorithm> #include <chrono> #include <cstdint> #include <ctime> #include <fstream> #include <list> #include <map> #include <sstream> #include <tuple> #include <vector> #include<condition_variable> #include "Algorithm.hh" #include "astra-sim/system/Common.hh" #include "astra-sim/system/MemBus.hh" #include "astra-sim/system/MyPacket.hh" #include "astra-sim/system/topology/RingTopology.hh" #include "astra-sim/system/MockNcclQps.h" namespace AstraSim { class NcclTreeFlowModel : public Algorithm { public: std::chrono::time_point<std::chrono::high_resolution_clock> start_time; std::chrono::time_point<std::chrono::high_resolution_clock> end_time; MemBus::Transmition transmition; int id; int nodes_in_ring; std::map<int, int> _stream_count; std::atomic<int> send_packets; std::atomic<int> recv_packets; int parallel_reduce; std::map<std::pair<int, int>, std::list<MyPacket>> packets; bool toggle; std::map<std::pair<int,int>, int> free_packets; bool processed; bool send_back; bool NPU_to_MA; std::map<int, int> indegree_mapping; std::map<int, int> inprocessing_indegree; std::map<int, int>* zero_latency_packets; std::map<int, int>* non_zero_latency_packets; MockNccl::FlowModels _flow_models; uint32_t m_channels; uint32_t len_channel; MockNccl::NcclQps* pQps; std::condition_variable judge_exit_cv; std::mutex judge_exit_mutex; std::mutex judge_mutex; std::atomic<bool> judge_exit_flag; NcclTreeFlowModel(){}; ~NcclTreeFlowModel(){}; NcclTreeFlowModel( ComType type, int id, int layer_num, RingTopology* ring_topology, uint64_t data_size, RingTopology::Direction direction, InjectionPolicy injection_policy, bool boost_mode, std::shared_ptr<MockNccl::FlowModels> ptr_flow_models, int treechannels); virtual void run(EventType event, CallData* data); void process_stream_count(int channel_id); void release_packets(int channel_id, int flow_id, uint64_t message_size); void reduce(int channel_id, int flow_id); bool iteratable(int channel_id); virtual int get_non_zero_latency_packets(); void insert_packets(int channel_id, int flow_id); void init_indegree_mapping(); bool ready(int channel_id, int flow_id); bool recv_ready(int channel_id, int flow_id); bool init_recv_ready(); void exit(); #ifdef PHY_MTP bool phy_iteratable(int channel_id); bool phy_ready(int channel_id,int flow_id); void waiting_to_exit(); #endif class FlowCriticalSection { public: inline FlowCriticalSection () { while (g_flow_inCriticalSection.exchange (true, std::memory_order_acquire)) ; } inline void ExitSection() { g_flow_inCriticalSection.store (false, std::memory_order_release); } inline ~FlowCriticalSection () { g_flow_inCriticalSection.store (false, std::memory_order_release); } }; static std::atomic<bool> g_flow_inCriticalSection; }; } // namespace AstraSim #endif