maga_transformer/cpp/normal_engine/NormalExecutor.cc (179 lines of code) (raw):

#include "maga_transformer/cpp/normal_engine/NormalExecutor.h" #include <cstdlib> #include <memory> #include "maga_transformer/cpp/utils/StatusUtil.h" #include "maga_transformer/cpp/models/GptModel.h" #include "maga_transformer/cpp/models/Sampler.h" #include "maga_transformer/cpp/th_op/GptInitParameter.h" #include "torch/csrc/autograd/profiler_kineto.h" using namespace std; namespace rtp_llm { namespace tap = torch::autograd::profiler; namespace tpi = torch::profiler::impl; class CudaProfiler { public: CudaProfiler(const std::string& prefix): prefix_(prefix) { tap::prepareProfiler(config_, activities_); } ~CudaProfiler() { if (!stoped_) { stoped_ = true; stop(); } } void start() { count += 1; stoped_ = false; tap::enableProfiler(config_, activities_); } void stop() { std::unique_ptr<tap::ProfilerResult> res = tap::disableProfiler(); std::string file_name = prefix_ + std::to_string(count) + ".json"; res->save(file_name); stoped_ = true; } protected: static size_t count; std::string prefix_; tpi::ProfilerConfig config_ = tpi::ProfilerConfig(tpi::ProfilerState::KINETO); std::set<tpi::ActivityType> activities_{tpi::ActivityType::CUDA}; bool stoped_ = true; }; size_t CudaProfiler::count = 0; NormalExecutor::NormalExecutor(const EngineInitParams& params, const std::shared_ptr<CacheManager>& cache_manager, rtp_llm::DeviceBase* device, const std::shared_ptr<lora::LoraManager>& lora_manager, bool warm_up): Executor(device), cache_manager_(cache_manager), lora_manager_(lora_manager), warm_up_(warm_up), gen_timeline_sync_(autil::EnvUtil::getEnv("GEN_TIMELINE_SYNC", 0L)), metrics_reporter_(params.metrics_reporter), tps_reporter_(MetricsLoopReporter<RtpLLMTokenPSMetrics, RtpLLMTokenPSMetricsCollector>(metrics_reporter_)) { auto& gpt_param = params.gpt_init_parameter; if (gpt_param.enable_eplb_ && gpt_param.moe_style_ != 0) { // use first moe layer weight as moe weight type int first_moe_layer = gpt_param.moe_layer_index_.front(); auto moe_weight_type = params.gpt_weights.layers[first_moe_layer].ffn_weights.moe_gate_weight->kernel->type(); expert_balancer_ = make_shared<ExpertBalancer>(gpt_param.expert_num_, gpt_param.phy_exp_num_, gpt_param.num_layers_, gpt_param.moe_inter_padding_size_, gpt_param.hidden_size_, gpt_param.eplb_update_time_, gpt_param.ep_rank_, gpt_param.ep_size_, gpt_param.py_eplb_, moe_weight_type, device_, gpt_param.eplb_mode_, gpt_param.quant_algo_, metrics_reporter_); } int eos_id = params.gpt_init_parameter.special_tokens_.eos_token_id_; SamplerInitParams sampler_params{device_, eos_id, device->initParams().max_batch_size}; // set static max batch size to avoid sampler reset memory sampler_.reset(new Sampler(sampler_params)); // CacheManager::KVCacheBuffer kv_cache_buffer; // CacheConfig cache_config; // if (warmup) { // kv_cache_buffer.k_blocks = // } else { // kv_cache_buffer = cache_manager->kvCacheBuffer(); // cache_config = cache_manager->cacheConfig(); // } model_.reset(new GptModel({ device_, params.gpt_weights, genModelDescription(params.gpt_init_parameter), cache_manager ? ((optional<CacheManager::KVCacheBuffer>)cache_manager->kvCacheBuffer()) : nullopt })); // when warmup, cache manager maybe nullptr const auto& cache_config = cache_manager ? cache_manager->cacheConfig() : CacheConfig(); batch_stream_processor_.reset(new NormalBatchStreamProcessor( params.gpt_init_parameter, cache_config, warm_up_)); } absl::Status NormalExecutor::process(const std::list<GenerateStreamPtr>& streams) { StreamGroups stream_groups(streams); bool gen_timeline = stream_groups.genTimeline(); if (gen_timeline_sync_) { auto gen_timeline_buffer = device_->allocateBuffer( {rtp_llm::DataType::TYPE_BOOL, {device_->getDeviceProperties().dp_size}, rtp_llm::AllocationType::HOST}); *(gen_timeline_buffer->dataWithOffset<bool>(device_->getDeviceProperties().dp_rank)) = gen_timeline; device_->allGather({{gen_timeline_buffer}, rtp_llm::ParallelMode::DP_AND_TP}); device_->syncCommunication(); gen_timeline = std::any_of(gen_timeline_buffer->data<bool>(), gen_timeline_buffer->dataWithOffset<bool>(device_->getDeviceProperties().dp_size), [](auto s) { return s;}); } std::shared_ptr<CudaProfiler> profiler; if (gen_timeline) { profiler = std::make_shared<CudaProfiler>("cuda_profiler_dp" + std::to_string(device_->getDeviceProperties().dp_rank) + "_"); profiler->start(); } RtpLLMExecutorMetricsCollector executor_collector; RtpLLMTokenPSMetricsCollector tps_collector; GptModelInputs model_input; GptModelOutputs model_output; SamplerOutput sampler_output; { int64_t start_time_us = autil::TimeUtility::currentTimeInMicroSeconds(); auto model_input_status = batch_stream_processor_->gatherModelInput(stream_groups); RETURN_IF_STATUS_OR_ERROR(model_input_status); model_input = std::move(model_input_status.value()); executor_collector.gather_model_input_us = autil::TimeUtility::currentTimeInMicroSeconds() - start_time_us; } { int64_t start_time_us = autil::TimeUtility::currentTimeInMicroSeconds(); dpAndTpSyncModelInputs(model_input, device_); executor_collector.tp_sync_input_us = autil::TimeUtility::currentTimeInMicroSeconds() - start_time_us; } // get lora input if (lora_manager_) { model_input.lora_model_input = lora_manager_->makeLoraModelInput(model_input.lora_ids, model_input.lora_input_lengths); } { RTP_LLM_LOG_DEBUG("model_input: %s", model_input.debugString().c_str()); int64_t start_time_us = autil::TimeUtility::currentTimeInMicroSeconds(); model_output = std::move(model_->forward(model_input)); executor_collector.model_forward_us = autil::TimeUtility::currentTimeInMicroSeconds() - start_time_us; RTP_LLM_LOG_DEBUG("model forward done"); } if (expert_balancer_) { int64_t start_time_us = autil::TimeUtility::currentTimeInMicroSeconds(); expert_balancer_->stepForward(*model_, executor_collector); executor_collector.eplb_step_latency_us = autil::TimeUtility::currentTimeInMicroSeconds() - start_time_us; } if (device_->getDeviceProperties().tp_rank > 0 || warm_up_) { return absl::OkStatus(); } { int64_t start_time_us = autil::TimeUtility::currentTimeInMicroSeconds(); CHECK_AND_RETURN_REF(sampler_input, batch_stream_processor_->gatherSamplerInput(stream_groups, model_input, model_output)); sampler_output = std::move(sampler_->forward(sampler_input)); RTP_LLM_LOG_DEBUG("sampler forward done"); executor_collector.sample_input_us = autil::TimeUtility::currentTimeInMicroSeconds() - start_time_us; } { int64_t start_time_us = autil::TimeUtility::currentTimeInMicroSeconds(); auto result = batch_stream_processor_->dispatch(stream_groups, {std::move(model_output), std::move(sampler_output)}); executor_collector.dispatch_output_us = autil::TimeUtility::currentTimeInMicroSeconds() - start_time_us; reportMetrics(stream_groups, executor_collector, tps_collector); return result; } } void NormalExecutor::reportMetrics(const StreamGroups& stream_groups, RtpLLMExecutorMetricsCollector& executor_collector, RtpLLMTokenPSMetricsCollector& tps_collector) { if (device_->getDeviceProperties().tp_rank > 0) { return; } if (metrics_reporter_) { executor_collector.context_batch_size = stream_groups.totalContextBatchSize(); executor_collector.generate_batch_size = stream_groups.totalDecodeBatchSize(); executor_collector.execute_token_size = stream_groups.modelExecuteTokenSize(); executor_collector.max_seq_len = stream_groups.maxSeqLen(); if (executor_collector.context_batch_size != 0) { executor_collector.context_batch_size_when_has_context = executor_collector.context_batch_size; executor_collector.generate_batch_size_when_has_context = executor_collector.generate_batch_size; executor_collector.execute_token_size_when_has_context = executor_collector.execute_token_size; executor_collector.max_seq_len_when_has_context = executor_collector.max_seq_len; } metrics_reporter_->report<RtpLLMExecutorMetrics, RtpLLMExecutorMetricsCollector>(nullptr, &executor_collector); tps_collector.context_tps = stream_groups.modelExecuteTokenSize() - stream_groups.totalDecodeBatchSize(); tps_collector.generate_tps = stream_groups.totalDecodeBatchSize(); tps_collector.total_tps = stream_groups.modelExecuteTokenSize(); tps_reporter_.report(&tps_collector); } } } // namespace rtp_llm