astra-sim-alibabacloud/astra-sim/workload/Workload.cc (1,522 lines of code) (raw):
/******************************************************************************
This source code is licensed under the MIT license found in the
LICENSE file in the root directory of this source tree.
*******************************************************************************/
#include "Workload.hh"
#include "CSVWriter.hh"
#include "Layer.hh"
#include "astra-sim/system/MockNcclLog.h"
namespace AstraSim {
Workload::~Workload() {
if (end_to_end != nullptr) {
delete end_to_end;
}
if (detailed != nullptr) {
delete detailed;
}
if (dimension_utilization != nullptr) {
delete dimension_utilization;
}
for (int i = 0; i < SIZE; i++) {
delete layers[i];
}
if (layers != nullptr) {
delete[] layers;
}
}
Workload::Workload(
std::string run_name,
Sys* generator,
std::string name,
int TOTAL_PASS,
int total_rows,
int stat_row,
std::string path,
bool seprate_log) {
this->initialized = false;
this->layers = nullptr;
this->SIZE = 0;
this->counter = 0;
this->delay_loaded = false;
this->checkpoint_initiated = false;
this->collective_issued = false;
this->current_state = LoopState::Forward_Pass;
this->generator = generator;
this->TOTAL_PASS = TOTAL_PASS;
this->pass_counter = 0;
this->index = 0;
this->waiting_for_comm = 0;
end_to_end = nullptr;
detailed = nullptr;
dimension_utilization = nullptr;
this->path = path;
this->stat_row = stat_row;
this->seprate_log = seprate_log;
this->initialized = initialize_workload(name);
if (this->initialized == false) {
return;
}
this->total_rows = total_rows;
this->run_name = run_name;
this->registered_for_finished_streams = false;
#ifndef PHY_MTP
if (generator->id == 0 && seprate_log) {
std::cout << "stat path: " << path << " ,total rows: " << total_rows
<< " ,stat row: " << stat_row << std::endl;
detailed = new CSVWriter(path, "detailed_"+std::to_string(generator->total_nodes)+".csv");
end_to_end = new CSVWriter(path, "EndToEnd.csv");
dimension_utilization =
new CSVWriter(path, run_name + "_dimension_utilization_"+std::to_string(generator->npu_offset)+".csv");
if (stat_row == 0) {
initialize_stat_files();
}
}
#endif
}
void Workload::initialize_stat_files() {
#ifdef NS3_MPI
detailed->initialize_csv(SIZE * total_rows + 20, 50);
#endif
#ifdef NS3_MTP
detailed->initialize_csv(SIZE * total_rows + 20, 50);
#endif
end_to_end->initialize_csv(SIZE * total_rows + 20, 50);
}
void Workload::call(EventType event, CallData* data) {
if (counter > 0) {
if(generator->id == 0) std::cout << "counter > 0" << std::endl;
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (parallelismPolicy == ParallelismPolicy::Data) {
iterate_data_parallel();
} else if (parallelismPolicy == ParallelismPolicy::Transformer) {
iterate_hybrid_parallel_Transformer();
} else if (
parallelismPolicy == ParallelismPolicy::DLRM ||
parallelismPolicy == ParallelismPolicy::DLRMEnhanced) {
iterate_hybrid_parallel_DLRM();
} else if (parallelismPolicy == ParallelismPolicy::MicroBenchmark) {
iterate_micro_benchmark();
} else if (parallelismPolicy == ParallelismPolicy::Model) {
iterate_model_parallel();
} else if (parallelismPolicy == ParallelismPolicy::HybridDataModel) {
iterate_hybrid_parallel_data_model();
} else if (parallelismPolicy == ParallelismPolicy::HybridModelData) {
iterate_hybrid_parallel_model_data();
} else if (parallelismPolicy == ParallelismPolicy::DistributedInference) {
iterate_distributed_inference();
} else if (parallelismPolicy == ParallelismPolicy::TransformerFwdInBckwd) {
iterate_hybrid_parallel_Transformer_fwd_in_bckwd();
} else if (parallelismPolicy == ParallelismPolicy::HybridCustomized) {
iterate_hybrid_parallel_customized();
} else {
Sys::sys_panic("No known parallelism!");
}
}
void Workload::report() {
double total_compute = 0;
double total_exposed = 0;
// #ifdef ANALYTI
double pre_bubble_time = 0;
double DP_comm = 0;
double DP_EP_comm = 0;
double Expose_TP_comm = 0;
double Expose_EP_comm = 0;
// #endif
std::vector<double> total_fwd_time = {0, 0, 0};
std::vector<double> total_wg_time = {0, 0, 0};
std::vector<double> total_ig_time = {0, 0, 0};
AstraSimDataAPI astraSimDataAPI;
astraSimDataAPI.run_name = run_name;
astraSimDataAPI.workload_finished_time = ((double)Sys::boostedTick()) / FREQ;
std::cout<<"workload stats for the job scheduled at NPU offset: "
<<generator->npu_offset<<std::endl;
for (int i = 0; i < SIZE; i++) {
#ifdef ANALYTI
astraSimDataAPI.layers_stats.push_back(layers[i]->report(
run_name,
i,
total_rows,
stat_row,
detailed,
end_to_end,
total_compute,
total_exposed,
pre_bubble_time,
DP_comm,
DP_EP_comm,
Expose_TP_comm,
Expose_EP_comm,
this->seprate_log));
#else
astraSimDataAPI.layers_stats.push_back(layers[i]->report(
run_name,
i,
total_rows,
stat_row,
detailed,
end_to_end,
total_compute,
total_exposed,
this->seprate_log,
total_fwd_time,
total_wg_time,
total_ig_time,
pre_bubble_time,
DP_comm,
DP_EP_comm,
Expose_TP_comm,
Expose_EP_comm));
#endif
}
astraSimDataAPI.total_compute = total_compute;
astraSimDataAPI.total_exposed_comm = total_exposed;
astraSimDataAPI.avg_chunk_latency_per_logical_dimension =
generator->scheduler_unit->get_average_latency_per_dimension();
for (auto& latency :
astraSimDataAPI.avg_chunk_latency_per_logical_dimension) {
latency /= FREQ;
}
std::cout << "*************************" << std::endl;
std::cout << "all passes finished at time: " << Sys::boostedTick()
<< ", id of first layer: " << layers[0]->id << std::endl;
generator->NI->pass_front_end_report(astraSimDataAPI);
#ifdef NS3_MTP
if (this->seprate_log) {
std::list<std::list<std::pair<uint64_t, double>>> dims;
for (int i = 0; i < generator->scheduler_unit->usage.size(); i++) {
dims.push_back(
generator->scheduler_unit->usage[i].report_percentage(10000));
}
dimension_utilization->finalize_csv(dims);
}
#endif
#ifdef NS3_MPI
if (this->seprate_log) {
std::list<std::list<std::pair<uint64_t, double>>> dims;
for (int i = 0; i < generator->scheduler_unit->usage.size(); i++) {
dims.push_back(
generator->scheduler_unit->usage[i].report_percentage(10000));
}
dimension_utilization->finalize_csv(dims);
}
#endif
}
void Workload::check_for_sim_end() {
if (pass_counter == TOTAL_PASS) {
current_state = LoopState::Wait_For_Sim_Finish;
if (generator->streams_finished != generator->streams_injected &&
registered_for_finished_streams == false) {
generator->register_for_finished_stream(this);
registered_for_finished_streams = true;
layers[0]->is_weight_grad_comm_finished_blocking();
return;
}
if (generator->streams_finished == generator->streams_injected) {
#ifndef PHY_MTP
if (generator->id == 0) {
report();
}
#endif
generator->workload_finished();
return;
}
}
return;
}
void Workload::iterate_micro_benchmark() {
assert(index >= 0);
assert(index < SIZE);
if (current_state != LoopState::Wait_For_Sim_Finish) {
for (pass_counter = 0; pass_counter < TOTAL_PASS; pass_counter++) {
layers[index]->issue_weight_grad_comm(
SchedulingPolicy::None, CollectiveBarrier::Non_Blocking);
}
}
check_for_sim_end();
}
void Workload::iterate_data_parallel() {
assert(index >= 0);
assert(index < SIZE);
check_for_sim_end();
if (current_state == LoopState::Forward_Pass) {
if (!layers[index]->is_weight_grad_comm_finished_blocking()) {
return;
}
if (delay_loaded == false) {
counter = layers[index]->get_fwd_pass_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
index++;
delay_loaded = false;
if (index >= SIZE) {
current_state = LoopState::Weight_Gradient;
index--;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Weight_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_weight_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
delay_loaded = false;
layers[index]->issue_weight_grad_comm(
SchedulingPolicy::None, CollectiveBarrier::Non_Blocking);
if (index == 0) {
if (generator->id == 0) {
std::cout << "pass: " << pass_counter
<< " finished at time: " << Sys::boostedTick() << std::endl;
}
pass_counter++;
current_state = LoopState::Forward_Pass;
} else {
current_state = LoopState::Input_Gradient;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Input_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_input_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
delay_loaded = false;
index--;
current_state = LoopState::Weight_Gradient;
generator->register_event(this, EventType::General, NULL, 1);
return;
}
}
void Workload::iterate_hybrid_parallel_customized() {
assert(index >= 0);
assert(index < SIZE);
check_for_sim_end();
if (current_state == LoopState::Forward_Pass) {
if (!layers[index]->is_weight_grad_comm_finished_blocking()) {
return;
}
if (delay_loaded == false) {
counter = layers[index]->get_fwd_pass_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
layers[index]->issue_forward_pass_comm(
SchedulingPolicy::None, CollectiveBarrier::Blocking);
return;
}
index++;
delay_loaded = false;
collective_issued = false;
if (index >= SIZE) {
current_state = LoopState::Input_Gradient;
index--;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Weight_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_weight_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
layers[index]->issue_weight_grad_comm(
SchedulingPolicy::FIFO, CollectiveBarrier::Non_Blocking);
}
if (!layers[index]->is_input_grad_comm_finished_blocking()) {
return;
}
collective_issued = false;
delay_loaded = false;
if (index >= 0) {
index--;
}
if (index == -1) {
index = 0;
if (generator->id == 0) {
std::cout << "pass: " << pass_counter
<< " finished at time: " << Sys::boostedTick() << std::endl;
}
pass_counter++;
current_state = LoopState::Forward_Pass;
} else {
current_state = LoopState::Input_Gradient;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Input_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_input_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued && index > 0) {
collective_issued = true;
layers[index]->issue_input_grad_comm(
SchedulingPolicy::LIFO, CollectiveBarrier::Non_Blocking);
}
collective_issued = false;
delay_loaded = false;
current_state = LoopState::Weight_Gradient;
generator->register_event(this, EventType::General, NULL, 1);
return;
}
}
void Workload::iterate_hybrid_parallel_data_model() {
assert(index >= 0);
assert(index < SIZE);
check_for_sim_end();
if (current_state == LoopState::Forward_Pass) {
if (!layers[index]->is_weight_grad_comm_finished_blocking()) {
return;
}
if (delay_loaded == false) {
counter = layers[index]->get_fwd_pass_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
layers[index]->issue_forward_pass_comm(
SchedulingPolicy::None, CollectiveBarrier::Blocking);
return;
}
index++;
delay_loaded = false;
collective_issued = false;
if (index >= SIZE) {
current_state = LoopState::Input_Gradient;
index--;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Weight_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_weight_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
layers[index]->issue_weight_grad_comm(
SchedulingPolicy::FIFO, CollectiveBarrier::Non_Blocking);
}
if (!layers[index]->is_input_grad_comm_finished_blocking()) {
return;
}
collective_issued = false;
delay_loaded = false;
if (index >= 0) {
index--;
}
if (index == -1) {
index = 0;
if (generator->id == 0) {
std::cout << "pass: " << pass_counter
<< " finished at time: " << Sys::boostedTick() << std::endl;
}
pass_counter++;
current_state = LoopState::Forward_Pass;
} else {
current_state = LoopState::Input_Gradient;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Input_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_input_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued && index > 0) {
collective_issued = true;
layers[index]->issue_input_grad_comm(
SchedulingPolicy::LIFO, CollectiveBarrier::Non_Blocking);
}
collective_issued = false;
delay_loaded = false;
current_state = LoopState::Weight_Gradient;
generator->register_event(this, EventType::General, NULL, 1);
return;
}
}
void Workload::iterate_hybrid_parallel_model_data() {
assert(index >= 0);
assert(index < SIZE);
check_for_sim_end();
if (current_state == LoopState::Forward_Pass) {
if (!layers[index]->is_weight_grad_comm_finished_blocking()) {
return;
}
if (delay_loaded == false) {
counter = layers[index]->get_fwd_pass_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
layers[index]->issue_forward_pass_comm(
SchedulingPolicy::None, CollectiveBarrier::Blocking);
return;
}
index++;
delay_loaded = false;
collective_issued = false;
if (index >= SIZE) {
current_state = LoopState::Input_Gradient;
index--;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Weight_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_weight_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
layers[index]->issue_weight_grad_comm(
SchedulingPolicy::FIFO, CollectiveBarrier::Non_Blocking);
}
if (!layers[index]->is_input_grad_comm_finished_blocking()) {
return;
}
collective_issued = false;
delay_loaded = false;
if (index >= 0) {
index--;
}
if (index == -1) {
index = 0;
if (generator->id == 0) {
std::cout << "pass: " << pass_counter
<< " finished at time: " << Sys::boostedTick() << std::endl;
}
pass_counter++;
current_state = LoopState::Forward_Pass;
} else {
current_state = LoopState::Input_Gradient;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Input_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_input_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued && index > 0) {
collective_issued = true;
layers[index]->issue_input_grad_comm(
SchedulingPolicy::LIFO, CollectiveBarrier::Non_Blocking);
}
collective_issued = false;
delay_loaded = false;
current_state = LoopState::Weight_Gradient;
generator->register_event(this, EventType::General, NULL, 1);
return;
}
}
void Workload::iterate_distributed_inference() {
assert(index >= 0);
assert(index < SIZE);
check_for_sim_end();
if (current_state == LoopState::Forward_Pass) {
if (!layers[index]->is_weight_grad_comm_finished_blocking()) {
return;
}
if (delay_loaded == false) {
counter = layers[index]->get_fwd_pass_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
layers[index]->issue_forward_pass_comm(
SchedulingPolicy::None, CollectiveBarrier::Blocking);
return;
}
index++;
delay_loaded = false;
collective_issued = false;
if (index >= SIZE) {
index = 0;
pass_counter++;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
}
}
void Workload::iterate_model_parallel() {
assert(index >= 0);
assert(index < SIZE);
check_for_sim_end();
if (current_state == LoopState::Forward_Pass) {
if (!layers[index]->is_weight_grad_comm_finished_blocking()) {
return;
}
if (delay_loaded == false) {
counter = layers[index]->get_fwd_pass_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
std::vector<bool> involved_dimensions{true, true, true};
layers[index]->issue_forward_pass_comm(
SchedulingPolicy::None, CollectiveBarrier::Blocking);
return;
}
index++;
delay_loaded = false;
collective_issued = false;
if (index >= SIZE) {
current_state = LoopState::Input_Gradient;
index--;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Weight_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_weight_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!layers[index]->is_input_grad_comm_finished_blocking()) {
return;
}
collective_issued = false;
delay_loaded = false;
if (index >= 0) {
index--;
}
if (index == -1) {
index = 0;
if (generator->id == 0) {
std::cout << "pass: " << pass_counter
<< " finished at time: " << Sys::boostedTick() << std::endl;
}
pass_counter++;
current_state = LoopState::Forward_Pass;
} else {
current_state = LoopState::Input_Gradient;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Input_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_input_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued && index > 0) {
collective_issued = true;
std::vector<bool> involved_dimensions{true, true, true};
layers[index]->issue_input_grad_comm(
SchedulingPolicy::LIFO, CollectiveBarrier::Non_Blocking);
}
collective_issued = false;
delay_loaded = false;
current_state = LoopState::Weight_Gradient;
generator->register_event(this, EventType::General, NULL, 1);
return;
}
}
void Workload::iterate_hybrid_parallel_Transformer() {
assert(index >= 0);
assert(index < SIZE);
check_for_sim_end();
if (current_state == LoopState::Forward_Pass) {
if (!layers[index]->is_weight_grad_comm_finished_blocking()) {
return;
}
if (delay_loaded == false) {
counter = layers[index]->get_fwd_pass_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
layers[index]->issue_forward_pass_comm(
SchedulingPolicy::None, CollectiveBarrier::Blocking);
return;
}
index++;
delay_loaded = false;
collective_issued = false;
if (index >= SIZE) {
current_state = LoopState::Input_Gradient;
index--;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Weight_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_weight_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
layers[index]->issue_weight_grad_comm(
SchedulingPolicy::FIFO, CollectiveBarrier::Non_Blocking);
}
if (!layers[index]->is_input_grad_comm_finished_blocking()) {
return;
}
collective_issued = false;
delay_loaded = false;
if (index >= 0) {
index--;
}
if (index == -1) {
index = 0;
if (generator->id == 0) {
std::cout << "pass: " << pass_counter
<< " finished at time: " << Sys::boostedTick() << std::endl;
}
pass_counter++;
current_state = LoopState::Forward_Pass;
} else {
current_state = LoopState::Input_Gradient;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Input_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_input_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
layers[index]->issue_input_grad_comm(
SchedulingPolicy::LIFO, CollectiveBarrier::Blocking);
return;
}
collective_issued = false;
delay_loaded = false;
current_state = LoopState::Weight_Gradient;
generator->register_event(this, EventType::General, NULL, 1);
return;
}
}
void Workload::iterate_hybrid_parallel_Transformer_fwd_in_bckwd() {
MockNcclLog* NcclLog = MockNcclLog::getInstance();
assert(index >= 0);
assert(index < SIZE);
check_for_sim_end();
if (current_state == LoopState::Forward_Pass) {
if (!layers[index]->is_weight_grad_comm_finished_blocking()) {
return;
}
if (delay_loaded == false) {
counter = layers[index]->get_fwd_pass_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
if(layers[index]->fwd_pass_comm_size < 4096 && layers[index]->fwd_pass_comm_size >0){
layers[index]->fwd_pass_comm_size = 4096;
}
layers[index]->issue_forward_pass_comm(
SchedulingPolicy::None, CollectiveBarrier::Blocking);
return;
}
index++;
delay_loaded = false;
collective_issued = false;
if (index >= SIZE) {
current_state = LoopState::Input_Gradient;
index--;
}
NcclLog->writeLog(NcclLogLevel::DEBUG,"workload::call fwd_pass register_event EventType::General ");
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Weight_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_weight_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
layers[index]->issue_weight_grad_comm(
SchedulingPolicy::FIFO, CollectiveBarrier::Non_Blocking);
}
if (!layers[index]->is_input_grad_comm_finished_blocking()) {
return;
}
collective_issued = false;
delay_loaded = false;
if (index >= 0) {
index--;
}
if (index == -1) {
index = 0;
if (generator->id == 0) {
std::cout << "pass: " << pass_counter
<< " finished at time: " << Sys::boostedTick() << std::endl;
}
pass_counter++;
current_state = LoopState::Forward_Pass;
} else {
current_state = LoopState::Input_Gradient;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Input_Gradient) {
if (layers[index]->needs_fwd_in_bckwd_initiation && !checkpoint_initiated) {
int tmp = index;
while (!layers[index--]->is_checkpoint)
;
index++;
current_state = LoopState::Forward_In_BackPass;
checkpoint_initiated = true;
generator->register_event(this, EventType::General, NULL, 1);
if (generator->id == 0) {
std::cout << "***** info, initiating fwd_in_bkwd starting from layer:"
<< index << " to layer: " << tmp
<< " ,at time: " << Sys::boostedTick() << std::endl;
}
return;
}
if (delay_loaded == false) {
counter = layers[index]->get_input_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
layers[index]->issue_input_grad_comm(
SchedulingPolicy::LIFO, CollectiveBarrier::Blocking);
return;
}
checkpoint_initiated = false;
collective_issued = false;
delay_loaded = false;
current_state = LoopState::Weight_Gradient;
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Forward_In_BackPass) {
if (!layers[index]->is_weight_grad_comm_finished_blocking()) {
return;
}
if (delay_loaded == false) {
counter = layers[index]->get_fwd_pass_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
layers[index]->issue_forward_pass_comm(
SchedulingPolicy::None, CollectiveBarrier::Blocking);
return;
}
index++;
delay_loaded = false;
collective_issued = false;
if (layers[index]->needs_fwd_in_bckwd_initiation) {
current_state = LoopState::Input_Gradient;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
}
}
void Workload::iterate_hybrid_parallel_DLRM() {
assert(index >= 0);
assert(index < SIZE);
check_for_sim_end();
if (current_state == LoopState::Forward_Pass) {
if (!layers[index]->is_weight_grad_comm_finished_blocking()) {
return;
}
if (delay_loaded == false) {
counter = layers[index]->get_fwd_pass_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued &&
layers[index]->fwd_pass_comm_type == ComType::All_to_All) {
collective_issued = true;
layers[index]->issue_forward_pass_comm(
SchedulingPolicy::HIGHEST, CollectiveBarrier::Non_Blocking);
} else if (index == DLRM_LAST_BOTTOM_LAYER) {
if (!layers[0]->is_fwd_pass_comm_finished_blocking()) {
return;
}
}
index++;
delay_loaded = false;
collective_issued = false;
if (index >= SIZE) {
current_state = LoopState::Weight_Gradient;
index--;
}
if (generator->id == 0) {
std::cout << "*************************layer changed to: " << index
<< std::endl;
}
generator->register_event(this, EventType::General, NULL, 1);
return;
} else if (current_state == LoopState::Weight_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_weight_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (!collective_issued) {
collective_issued = true;
layers[index]->issue_weight_grad_comm(
SchedulingPolicy::None, CollectiveBarrier::Non_Blocking);
}
if (parallelismPolicy == ParallelismPolicy::DLRM &&
!layers[index]->is_input_grad_comm_finished_blocking()) {
return;
}
if (index == 0) {
if (generator->id == 0) {
std::cout << "pass: " << pass_counter
<< " finished at time: " << Sys::boostedTick() << std::endl;
}
pass_counter++;
current_state = LoopState::Forward_Pass;
} else {
current_state = LoopState::Input_Gradient;
}
delay_loaded = false;
collective_issued = false;
generator->register_event(this, EventType::General, NULL, 1);
} else if (current_state == LoopState::Input_Gradient) {
if (delay_loaded == false) {
counter = layers[index]->get_input_grad_compute();
delay_loaded = true;
}
if (counter > 0) {
generator->try_register_event(
this, EventType::Workload_Wait, NULL, counter);
return;
}
if (index == DLRM_LAST_BOTTOM_LAYER + 1) {
layers[0]->issue_input_grad_comm(
SchedulingPolicy::HIGHEST, CollectiveBarrier::Non_Blocking);
}
index--;
if (generator->id == 0) {
std::cout << "*************************layer changed to: " << index
<< " in ig" << std::endl;
}
current_state = LoopState::Weight_Gradient;
collective_issued = false;
delay_loaded = false;
generator->register_event(this, EventType::General, NULL, 1);
}
}
int Workload::get_layer_numbers(std::string workload_input) {
std::ifstream inFile;
inFile.open("workload_inputs/" + workload_input);
if (!inFile) {
std::cerr << "Unable to open file: " << workload_input << std::endl;
std::cerr << "This error is fatal. Please check your path and filename."
<< std::endl;
exit(1);
} else {
std::cout << "Success in opening workload file" << std::endl;
}
std::string dummyLine;
std::getline(inFile, dummyLine);
int layers;
inFile >> layers;
inFile.close();
return layers;
}
ParallelismPolicy Workload::decode_parallelsim(std::string parallelism) {
if (parallelism == "DATA")
return ParallelismPolicy::Data;
else if (parallelism == "HYBRID_TRANSFORMER")
return ParallelismPolicy::Transformer;
else if (parallelism == "HYBRID_TRANSFORMER_FWD_IN_BCKWD")
return ParallelismPolicy::TransformerFwdInBckwd;
else if (parallelism == "HYBRID_DLRM")
return ParallelismPolicy::DLRM;
else if (parallelism == "HYBRID_DLRM_ENHANCED")
return ParallelismPolicy ::DLRMEnhanced;
else if (parallelism == "MODEL")
return ParallelismPolicy::Model;
else if (parallelism == "HYBRID_DATA_MODEL")
return ParallelismPolicy::HybridDataModel;
else if (parallelism == "HYBRID_MODEL_DATA")
return ParallelismPolicy::HybridModelData;
else if (parallelism == "HYBRID_CUSTOMIZED")
return ParallelismPolicy::HybridCustomized;
else if (parallelism == "MICRO")
return ParallelismPolicy::MicroBenchmark;
else if (parallelism == "DISTRIBUTED_INFERENCE")
return ParallelismPolicy::DistributedInference;
else
return ParallelismPolicy::None;
}
std::map<std::string, std::vector<bool>> Workload::decode_involved_dimensions(
ParallelismPolicy policy,
int model_parallel_npu_group) {
std::map<std::string, std::vector<bool>> result;
std::vector<bool> none{
false, false, false, false, false, false, false, false, false, false};
std::vector<bool> all{
true, true, true, true, true, true, true, true, true, true};
if (policy == ParallelismPolicy::All) {
result["fwd"] = all;
result["ig"] = all;
result["wg"] = all;
} else if (
policy == ParallelismPolicy::Data || policy == ParallelismPolicy::DLRM ||
policy == ParallelismPolicy::DLRMEnhanced ||
policy == ParallelismPolicy::MicroBenchmark) {
result["fwd"] = none;
result["ig"] = none;
result["wg"] = all;
} else if (
policy == ParallelismPolicy::Model ||
policy == ParallelismPolicy::DistributedInference) {
result["fwd"] = all;
result["ig"] = all;
result["wg"] = none;
} else if (policy == ParallelismPolicy::HybridModelData) {
std::vector<bool> data{
true, false, false, false, false, false, false, false, false, false};
std::vector<bool> model{
false, true, true, true, true, true, true, true, true, true};
result["fwd"] = model;
result["ig"] = model;
result["wg"] = data;
} else if (policy == ParallelismPolicy::HybridDataModel) {
std::vector<bool> model{
true, false, false, false, false, false, false, false, false, false};
std::vector<bool> data{
false, true, true, true, true, true, true, true, true, true};
result["fwd"] = model;
result["ig"] = model;
result["wg"] = data;
} else if (
policy == ParallelismPolicy::TransformerFwdInBckwd ||
policy == ParallelismPolicy::Transformer) {
int model_parallel_boundary =
generator->break_dimension(model_parallel_npu_group);
std::vector<bool> model;
std::vector<bool> data;
for (int i = 0; i <= model_parallel_boundary; i++) {
model.push_back(true);
data.push_back(false);
}
for (int i = model_parallel_boundary + 1; i < 10; i++) {
model.push_back(false);
data.push_back(true);
}
result["fwd"] = model;
result["ig"] = model;
result["wg"] = data;
}
return result;
}
bool Workload::initialize_workload(std::string name) {
std::map<int, bool> chekpoints;
std::map<int, bool> need_checkpoint_initiation;
std::ifstream inFile;
inFile.open(name);
if (!inFile) {
std::cerr << "Unable to open file: " << name << std::endl;
std::cerr << "######### Exiting because unable to open the workload input "
"file #########"
<< std::endl;
std::cerr << "This error is fatal. Please check your path and filename."
<< std::endl;
exit(1);
} else {
if (generator->id == 0) {
std::cout << "Success in opening workload file" << std::endl;
}
}
std::string firstline;
std::getline(inFile,firstline);
// std::cout << "First line is : '" << firstline << "'" << std::endl;
std::istringstream iss(firstline);
std:string token;
std::vector<std::string> tokens;
// bool findparallesimPolcy = false;
while (iss >> token) {
tokens.push_back(token);
// std::cout << "Token is : '" << token << "'" << std::endl;
}
if(!tokens.empty()){
parallelismPolicy = decode_parallelsim(tokens[0]);
}
if (parallelismPolicy == ParallelismPolicy::TransformerFwdInBckwd ||
parallelismPolicy == ParallelismPolicy::Transformer) {
for (size_t i = 1; i < tokens.size(); i = i+1){
if(tokens[i]=="model_parallel_NPU_group:"){
model_parallel_npu_group = std::stoi(tokens[i+1]);
if (generator->id == 0) {
std::cout <<"model_parallel_NPU_group is " << model_parallel_npu_group << std::endl;
}
}else if(tokens[i]=="ep:"){
expert_parallel_npu_group = std::stoi(tokens[i+1]);
}else if(tokens[i]== "pp:"){
pipeline_model_parallelism = std::stoi(tokens[i+1]);
}else if(tokens[i]=="vpp:"){
vpp = std::stoi(tokens[i+1]);
}else if(tokens[i]=="ga:"){
GA = std::stoi(tokens[i+1]);
}else if(tokens[i]=="all_gpus:"){
all_gpus = std::stoi(tokens[i+1]);
}
}
if(parallelismPolicy == ParallelismPolicy::TransformerFwdInBckwd){
if (generator->id == 0) {
std::cout << "checkpoints layers are: ";
}
for(size_t i = 1; i < tokens.size(); i = i+1){
if(tokens[i]=="checkpoints:"){
int account = std::stoi(tokens[i+1]);
while(account-- >0){
int j = 2;
int layer = std::stoi(tokens[i+j]);
chekpoints[layer] = true;
if (generator->id == 0) {
std::cout << layer << ", ";
}
j++;
}
}else if(tokens[i]=="checkpoint_initiates:"){
if (generator->id == 0) {
std::cout << std::endl;
std::cout << "layers initiating fwd_in_bckwd are: ";
}
int account = std::stoi(tokens[i+1]);
while(account-- >0){
int j = 2;
int layer = std::stoi(tokens[i+j]);
need_checkpoint_initiation[layer] = true;
if (generator->id == 0) {
std::cout << layer << ", ";
}
j++;
}
if (generator->id == 0) {
std::cout << std::endl;
}
}
}
}
}else if(parallelismPolicy == ParallelismPolicy::DLRM ||
parallelismPolicy == ParallelismPolicy::DLRMEnhanced){
for (size_t i = 1; i < tokens.size(); i = i+1){
if(tokens[i]=="DLRM_LAST_BOTTOM_LAYER:"){
DLRM_LAST_BOTTOM_LAYER = std::stoi(tokens[i+1]);
}
}
if (generator->id == 0) {
std::cout
<< "****************** info: DLRM workload last bottom layer is: "
<< DLRM_LAST_BOTTOM_LAYER << std::endl;
}
}else if (parallelismPolicy == ParallelismPolicy::None) {
#ifndef PHY_MTP
std::cerr << "######### Exiting because unable to decode the workload "
"parallelization strategy #########"
<< std::endl;
inFile.close();
exit(1);
#else
parallelismPolicy = ParallelismPolicy::TransformerFwdInBckwd;
#endif
}
std::map<std::string, std::vector<bool>> general_involved_dimensions =
decode_involved_dimensions(parallelismPolicy, model_parallel_npu_group);
pp_commsize = 0;
for (size_t i = 1; i < tokens.size(); i = i+1){
if(tokens[i]=="pp_comm"||tokens[i]=="pp_comm:"){
pp_commsize = std::stoi(tokens[i+1]);
}
}
if (generator->id == 0) {
std::cout <<"pp_commize:"<< pp_commsize << std::endl;
}
if(generator->id == 0){
if (model_parallel_npu_group == 0 || expert_parallel_npu_group == 0 || pipeline_model_parallelism == 0
|| vpp==0 || GA == 0 || all_gpus == 0 ||(pipeline_model_parallelism !=1 && pp_commsize ==0)||(pipeline_model_parallelism == 1 && pp_commsize !=0)){
std::cerr << "*****Warining: Input workload format mismatch. It may cause simulation error. Pleased use the latest AICB to generate.*****" << std::endl;
}
}
run_type = tokens[0];
std::string secondline;
std::getline(inFile,secondline);
int lines;
// std::cout << "Second line content: '" << secondline << "'" << std::endl;
lines = std::stoi(secondline);
SIZE = lines;
layers = new Layer*[SIZE];
for (int i = 0; i < lines; i++) {
std::string id;
inFile >> id;
int depen;
inFile >> depen;
Tick fp_compute_time;
inFile >> fp_compute_time;
std::string fp_comm_type_s;
inFile >> fp_comm_type_s;
uint64_t fp_comm_size;
inFile >> fp_comm_size;
Tick ig_compute_time;
inFile >> ig_compute_time;
std::string ig_comm_type_s;
inFile >> ig_comm_type_s;
uint64_t ig_comm_size;
inFile >> ig_comm_size;
Tick wg_compute_time;
inFile >> wg_compute_time;
std::string wg_comm_type_s;
inFile >> wg_comm_type_s;
uint64_t wg_comm_size;
inFile >> wg_comm_size;
Tick wg_update_time;
inFile >> wg_update_time;
ParallelismPolicy specific_policy = ParallelismPolicy::None;
std::map<std::string, std::vector<bool>> selected_involved_dimensions;
ComType fp_type = ComType::None;
ComType ig_type = ComType::None;
ComType wg_type = ComType::None;
MockNccl::GroupType fp_group_type = MockNccl::GroupType::NONE;
MockNccl::GroupType ig_group_type = MockNccl::GroupType::NONE;
MockNccl::GroupType wg_group_type = MockNccl::GroupType::NONE;
if (wg_comm_type_s.substr(0,9) == "ALLREDUCE") {
wg_type = ComType::All_Reduce;
if(wg_comm_type_s == "ALLREDUCE"){
wg_group_type = MockNccl::GroupType::DP;
} else if(wg_comm_type_s == "ALLREDUCE_EP"){
wg_group_type = MockNccl::GroupType::EP;
} else if(wg_comm_type_s == "ALLREDUCE_DP_EP"){
wg_group_type = MockNccl::GroupType::DP_EP;
} else{
wg_group_type = MockNccl::GroupType::NONE;
}
} else if (wg_comm_type_s.substr(0,8) == "ALLTOALL") {
wg_type = ComType::All_to_All;
if(wg_comm_type_s == "ALLTOALL"){
wg_group_type = MockNccl::GroupType::DP;
} else if(wg_comm_type_s == "ALLTOALL_EP"){
wg_group_type = MockNccl::GroupType::EP;
} else if(wg_comm_type_s == "ALLTOALL_DP_EP"){
wg_group_type = MockNccl::GroupType::DP_EP;
} else{
wg_group_type = MockNccl::GroupType::NONE;
}
} else if (wg_comm_type_s.substr(0,17) == "ALLREDUCEALLTOALL") {
wg_type = ComType::All_Reduce_All_to_All;
if(wg_comm_type_s == "ALLREDUCEALLTOALL"){
wg_group_type = MockNccl::GroupType::DP;
} else if(wg_comm_type_s == "ALLREDUCEALLTOALL_EP"){
wg_group_type = MockNccl::GroupType::EP;
} else if(wg_comm_type_s == "ALLREDUCEALLTOALL_DP_EP"){
wg_group_type = MockNccl::GroupType::DP_EP;
} else{
wg_group_type = MockNccl::GroupType::NONE;
}
} else if (wg_comm_type_s.substr(0,9) == "ALLGATHER") {
wg_type = ComType::All_Gather;
if(wg_comm_type_s == "ALLGATHER"){
wg_group_type = MockNccl::GroupType::DP;
} else if(wg_comm_type_s == "ALLGATHER_EP"){
wg_group_type = MockNccl::GroupType::EP;
} else if(wg_comm_type_s == "ALLGATHER_DP_EP"){
wg_group_type = MockNccl::GroupType::DP_EP;
} else{
wg_group_type = MockNccl::GroupType::NONE;
}
} else if (wg_comm_type_s.substr(0,13) == "REDUCESCATTER") {
wg_type = ComType::Reduce_Scatter;
if(wg_comm_type_s == "REDUCESCATTER"){
wg_group_type = MockNccl::GroupType::DP;
} else if(wg_comm_type_s == "REDUCESCATTER_EP"){
wg_group_type = MockNccl::GroupType::EP;
} else if(wg_comm_type_s == "REDUCESCATTER_DP_EP"){
wg_group_type = MockNccl::GroupType::DP_EP;
} else{
wg_group_type = MockNccl::GroupType::NONE;
}
}
// generate flow model
if (ig_comm_type_s.substr(0,9) == "ALLREDUCE") {
ig_type = ComType::All_Reduce;
if(ig_comm_type_s == "ALLREDUCE"){
ig_group_type = MockNccl::GroupType::TP;
} else if(ig_comm_type_s == "ALLREDUCE_EP"){
ig_group_type = MockNccl::GroupType::EP;
} else if(ig_comm_type_s == "ALLREDUCE_DP_EP"){
ig_group_type = MockNccl::GroupType::DP_EP;
} else{
ig_group_type = MockNccl::GroupType::NONE;
}
} else if (ig_comm_type_s.substr(0,8) == "ALLTOALL") {
ig_type = ComType::All_to_All;
if(ig_comm_type_s == "ALLTOALL"){
ig_group_type = MockNccl::GroupType::TP;
} else if(ig_comm_type_s == "ALLTOALL_EP"){
ig_group_type = MockNccl::GroupType::EP;
} else if(ig_comm_type_s == "ALLTOALL_DP_EP"){
ig_group_type = MockNccl::GroupType::DP_EP;
} else{
ig_group_type = MockNccl::GroupType::NONE;
}
} else if (ig_comm_type_s.substr(0,17) == "ALLREDUCEALLTOALL") {
ig_type = ComType::All_Reduce_All_to_All;
if(ig_comm_type_s == "ALLREDUCEALLTOALL"){
ig_group_type = MockNccl::GroupType::TP;
} else if(ig_comm_type_s == "ALLREDUCEALLTOALL_EP"){
ig_group_type = MockNccl::GroupType::EP;
} else if(ig_comm_type_s == "ALLREDUCEALLTOALL_DP_EP"){
ig_group_type = MockNccl::GroupType::DP_EP;
} else{
ig_group_type = MockNccl::GroupType::NONE;
}
} else if (ig_comm_type_s.substr(0,9) == "ALLGATHER") {
ig_type = ComType::All_Gather;
if(ig_comm_type_s == "ALLGATHER"){
ig_group_type = MockNccl::GroupType::TP;
} else if(ig_comm_type_s == "ALLGATHER_EP"){
ig_group_type = MockNccl::GroupType::EP;
} else if(ig_comm_type_s == "ALLGATHER_DP_EP"){
ig_group_type = MockNccl::GroupType::DP_EP;
} else{
ig_group_type = MockNccl::GroupType::NONE;
}
} else if (ig_comm_type_s.substr(0,13) == "REDUCESCATTER") {
ig_type = ComType::Reduce_Scatter;
if(ig_comm_type_s == "REDUCESCATTER"){
ig_group_type = MockNccl::GroupType::TP;
} else if(ig_comm_type_s == "REDUCESCATTER_EP"){
ig_group_type = MockNccl::GroupType::EP;
} else if(ig_comm_type_s == "REDUCESCATTER_DP_EP"){
ig_group_type = MockNccl::GroupType::DP_EP;
} else{
ig_group_type = MockNccl::GroupType::NONE;
}
}
if (fp_comm_type_s.substr(0,9) == "ALLREDUCE") {
fp_type = ComType::All_Reduce;
if(fp_comm_type_s == "ALLREDUCE"){
fp_group_type = MockNccl::GroupType::TP;
} else if(fp_comm_type_s == "ALLREDUCE_EP"){
fp_group_type = MockNccl::GroupType::EP;
} else if(fp_comm_type_s == "ALLREDUCE_DP_EP"){
fp_group_type = MockNccl::GroupType::DP_EP;
} else{
fp_group_type = MockNccl::GroupType::NONE;
}
} else if (fp_comm_type_s.substr(0,8) == "ALLTOALL") {
fp_type = ComType::All_to_All;
if(fp_comm_type_s == "ALLTOALL"){
fp_group_type = MockNccl::GroupType::TP;
} else if(fp_comm_type_s == "ALLTOALL_EP"){
fp_group_type = MockNccl::GroupType::EP;
} else if(fp_comm_type_s == "ALLTOALL_DP_EP"){
fp_group_type = MockNccl::GroupType::DP_EP;
} else{
fp_group_type = MockNccl::GroupType::NONE;
}
} else if (fp_comm_type_s.substr(0,17) == "ALLREDUCEALLTOALL") {
fp_type = ComType::All_Reduce_All_to_All;
if(fp_comm_type_s == "ALLREDUCEALLTOALL"){
fp_group_type = MockNccl::GroupType::TP;
} else if(fp_comm_type_s == "ALLREDUCEALLTOALL_EP"){
fp_group_type = MockNccl::GroupType::EP;
} else if(fp_comm_type_s == "ALLREDUCEALLTOALL_DP_EP"){
fp_group_type = MockNccl::GroupType::DP_EP;
} else{
fp_group_type = MockNccl::GroupType::NONE;
}
} else if (fp_comm_type_s.substr(0,9) == "ALLGATHER") {
fp_type = ComType::All_Gather;
if(fp_comm_type_s == "ALLGATHER"){
fp_group_type = MockNccl::GroupType::TP;
} else if(fp_comm_type_s == "ALLGATHER_EP"){
fp_group_type = MockNccl::GroupType::EP;
} else if(fp_comm_type_s == "ALLGATHER_DP_EP"){
fp_group_type = MockNccl::GroupType::DP_EP;
} else{
fp_group_type = MockNccl::GroupType::NONE;
}
} else if (fp_comm_type_s.substr(0,13) == "REDUCESCATTER") {
fp_type = ComType::Reduce_Scatter;
if(fp_comm_type_s == "REDUCESCATTER"){
fp_group_type = MockNccl::GroupType::TP;
} else if(fp_comm_type_s == "REDUCESCATTER_EP"){
fp_group_type = MockNccl::GroupType::EP;
} else if(fp_comm_type_s == "REDUCESCATTER_DP_EP"){
fp_group_type = MockNccl::GroupType::DP_EP;
} else{
fp_group_type = MockNccl::GroupType::NONE;
}
}
if (generator->id == 0) {
std::cout << "id: " << id << " , depen: " << depen
<< " , wg_comp_time: " << wg_compute_time << std::endl;
}
if (parallelismPolicy == ParallelismPolicy::HybridCustomized) {
std::string specific_parallelsim;
inFile >> specific_parallelsim;
specific_policy = decode_parallelsim(specific_parallelsim);
}
if ((parallelismPolicy == ParallelismPolicy::DLRM ||
parallelismPolicy == ParallelismPolicy::DLRMEnhanced) &&
i == 0) {
specific_policy = ParallelismPolicy::All;
}
if (specific_policy != ParallelismPolicy::None) {
selected_involved_dimensions =
decode_involved_dimensions(specific_policy, model_parallel_npu_group);
} else {
selected_involved_dimensions = general_involved_dimensions;
}
Layer* l = new Layer(
id,
i,
generator,
this,
fp_compute_time * generator->compute_scale,
fp_type,
fp_group_type,
fp_comm_size * generator->comm_scale,
selected_involved_dimensions["fwd"],
ig_compute_time * generator->compute_scale,
ig_type,
ig_group_type,
ig_comm_size * generator->comm_scale,
selected_involved_dimensions["ig"],
wg_compute_time * generator->compute_scale,
wg_type,
wg_group_type,
wg_comm_size * generator->comm_scale,
selected_involved_dimensions["wg"],
wg_update_time,
specific_policy);
if (chekpoints.find(i) != chekpoints.end()) {
l->is_checkpoint = true;
}
if (need_checkpoint_initiation.find(i) !=
need_checkpoint_initiation.end()) {
l->needs_fwd_in_bckwd_initiation = true;
}
layers[i] = l;
}
if (generator->id == 0) {
std::cout << "type: " << run_type << " ,num passes: " << TOTAL_PASS
<< " ,lines: " << lines
<< " compute scale: " << generator->compute_scale
<< " ,comm scale: " << generator->comm_scale << std::endl;
}
inFile.close();
return true;
}
void Workload::fire() {
call(EventType::General, NULL);
}
} // namespace AstraSim