maga_transformer/cpp/schedulers/FIFOScheduler.cc (259 lines of code) (raw):

#include "maga_transformer/cpp/schedulers/FIFOScheduler.h" #include "maga_transformer/cpp/metrics/RtpLLMMetrics.h" #include "maga_transformer/cpp/utils/Logger.h" #include <chrono> #include <memory> #include <mutex> using namespace std; namespace rtp_llm { FIFOScheduler::FIFOScheduler(const rtp_llm::GptInitParameter& params, const std::shared_ptr<CacheManager>& cache_manager, const kmonitor::MetricsReporterPtr metrics_reporter): params_(params), cache_manager_(cache_manager), max_seq_len_(params.max_seq_len_), max_context_batch_size_(params.max_context_batch_size_), max_generate_batch_size_(params.max_generate_batch_size_), reserve_block_num_(params.scheduler_reserve_resource_ratio_ * cache_manager->availableBlockNums() / 100), // not support fallback when use pd_speration:use_cache_store enable_partial_fallback_(params.enable_partial_fallback_ && params.use_cache_store_ == false), enable_whole_fallback_(params.use_cache_store_ == false), enable_fast_gen_(params.enable_fast_gen_), need_fill_fake_stream_(params.dp_size_ > 1 && params.tp_rank_ == 0), fast_gen_max_context_len_(params.fast_gen_max_context_len_), metrics_reporter_(metrics_reporter) {} FIFOScheduler::~FIFOScheduler() { (void)stop(); RTP_LLM_LOG_INFO("destory FIFOScheduler"); } bool FIFOScheduler::empty() { return waiting_streams_.empty() && running_streams_.empty(); } absl::Status FIFOScheduler::stop() { RTP_LLM_LOG_INFO("stop FIFOScheduler"); { lock_guard<mutex> lock(lock_); stop_ = true; } cond_.notify_all(); return absl::OkStatus(); } void FIFOScheduler::evaluateRunningRemote() { for (auto it = running_streams_.begin(); it != running_streams_.end();) { if ((*it)->needRemoteGenerate()) { (*it)->setRemoteGenerate(); remote_running_streams_.emplace_back(*it); RTP_LLM_LOG_DEBUG("stream [%ld] move to remote running streams", (*it)->streamId()); it = running_streams_.erase(it); } else { ++it; } } } int64_t FIFOScheduler::lastScheduleTime() { lock_guard<mutex> lock(lock_); return empty() ? autil::TimeUtility::currentTimeInMilliSeconds() : last_schedule_time_.load(); } void FIFOScheduler::evictDoneStreams(list<GenerateStreamPtr>& streams) const { for (auto it = streams.begin(); it != streams.end();) { (*it)->checkTimeout(); if ((*it)->stopped() || (*it)->finished()) { // Immediately free resources to run more streams (*it)->releaseResource(); RTP_LLM_LOG_DEBUG("evict stream [%ld]", (*it)->streamId()); it = streams.erase(it); } else { ++it; } } } absl::Status FIFOScheduler::enqueue(const GenerateStreamPtr& stream) { { lock_guard<mutex> lock(lock_); waiting_streams_.emplace_back(stream); } cond_.notify_all(); return absl::OkStatus(); } int FIFOScheduler::runningNextBlockNum(size_t reserve_step) const { int total_need_block_nums = 0; for (auto& stream : running_streams_) { total_need_block_nums += stream->nextNeedBlockNums(reserve_step); } return total_need_block_nums; } // TODO(xinfei.sxf) Is there any situation where the request cannot be ended? tuple<int, int> FIFOScheduler::evaluateRunningNext(size_t reserve_step) { // Only in the case of partial fallback, the stream in the waiting queue may hold blocks resources. int fallback_streams = 0; int error_streams = 0; if (enable_partial_fallback_) { for (auto& stream : waiting_streams_) { int need_block_num = (int)runningNextBlockNum(reserve_step) - (int)cache_manager_->availableBlockNums(); if (need_block_num <= 0) { break; } if (stream->maxBlockSize()) { RTP_LLM_LOG_INFO("lack mem, stream [%ld] in watting queue try release blocks, " "it's input_length:%d seq_length:%d, hold block size:%d, release block size:%d", stream->streamId(), stream->inputLength(), stream->seqLength(), stream->maxBlockSize(), need_block_num); stream->tryReleaseKVBlock(need_block_num); fallback_streams++; } } } if (enable_whole_fallback_) { while (!running_streams_.empty()) { int need_block_num = (int)runningNextBlockNum(reserve_step) - (int)cache_manager_->availableBlockNums(); if (need_block_num <= 0) { break; } auto& last_stream = *(running_streams_.rbegin()); int need_release_blocks = enable_partial_fallback_ ? need_block_num : last_stream->maxBlockSize(); RTP_LLM_LOG_INFO("lack mem, stream [%ld] fallback to wait, it's input_length:%d seq_length:%d, hold block size:%d, release block size:%d", last_stream->streamId(), last_stream->inputLength(), last_stream->seqLength(), last_stream->maxBlockSize(), need_release_blocks); last_stream->tryReleaseKVBlock(need_release_blocks); last_stream->setPaused(); waiting_streams_.emplace_front(last_stream); running_streams_.pop_back(); fallback_streams++; } } if (enable_fast_gen_) { token_capacity_ = fast_gen_max_context_len_; RTP_LLM_LOG_DEBUG("initial token_capacity is %d", token_capacity_); } for (auto it = running_streams_.begin(); it != running_streams_.end();) { auto result = (*it)->incrKVBlock(token_capacity_, reserve_step); if (!result.ok()) { (*it)->stopAndRelease(ErrorCode::MALLOC_FAILED, "incrKVBlock failed"); RTP_LLM_LOG_WARNING("stream [%ld] incr block failed", (*it)->streamId()); it = running_streams_.erase(it); error_streams++; } else { if (enable_fast_gen_) { token_capacity_ -= result.value(); RTP_LLM_LOG_DEBUG("after stream [%d] acquireCapacity, token_capacity is %d", (*it)->streamId(), token_capacity_); } it++; } } return {fallback_streams, error_streams}; } bool FIFOScheduler::evaluateRunningMemory(const list<GenerateStreamPtr>& streams, const GenerateStreamPtr& new_stream) const { if (params_.isDecodeRole()) { if (running_streams_.size() + streams.size() + 1 < max_generate_batch_size_) { return true; } } if (running_streams_.size() + streams.size() + 1 > max_generate_batch_size_) { return false; } if (!enable_fast_gen_) { int max_token_size = new_stream->contextLength(); for (auto& stream : streams) { max_token_size = std::max(max_token_size, stream->contextLength()); } return max_token_size * (streams.size() + 1) + running_streams_.size() < int(max_seq_len_ * max_context_batch_size_); } else { return true; } } bool FIFOScheduler::evaluateNewStream(const list<GenerateStreamPtr>& streams, const GenerateStreamPtr& new_stream, size_t reserve_step) { if (!evaluateRunningMemory(streams, new_stream)) { return false; } auto result = new_stream->initKVBlock(token_capacity_, reserve_step); if (result.ok() && enable_fast_gen_) { token_capacity_ -= result.value(); RTP_LLM_LOG_DEBUG("after stream [%d] acquireCapacity, token_capacity is %d", new_stream->streamId(), token_capacity_); } return result.ok() && cache_manager_->availableBlockNums() >= reserve_block_num_; } list<GenerateStreamPtr> FIFOScheduler::scheduleNew(size_t reserve_step) { list<GenerateStreamPtr> new_streams; for (auto it = waiting_streams_.begin(); it != waiting_streams_.end();) { auto& stream = *it; if (evaluateNewStream(new_streams, *it, reserve_step)) { RTP_LLM_LOG_DEBUG("stream [%ld] add to new queue", stream->streamId()); // if setRunning fails, it must be in stopped state, evict it in next iteration if (stream->setRunning()) { new_streams.emplace_back(stream); it = waiting_streams_.erase(it); } else { RTP_LLM_LOG_WARNING("stream [%ld] set running failed", stream->streamId()); stream->releaseResource(); it++; } } else if (running_streams_.empty() && new_streams.empty() && remote_running_streams_.empty()) { // TODO(xinfei.sxf) At this time, we can also release the blocks held by other waiting streams RTP_LLM_LOG_WARNING("stream [%ld] can not add to new queue", stream->streamId()); if (stream->inputLength() > cache_manager_->maxSeqLen()) { stream->stopAndRelease(ErrorCode::EXCEEDS_KV_CACHE_MAX_LEN, "input len " + std::to_string(stream->inputLength()) + " is greater than kv cache max seq len " + std::to_string(cache_manager_->maxSeqLen())); } else { stream->stopAndRelease(ErrorCode::MALLOC_FAILED, "LACK MEM"); } it++; } else { // try to join new streams in the next schedule cycle break; } } return new_streams; } void FIFOScheduler::accountBatchMetrics(const list<GenerateStreamPtr>& new_streams, const list<GenerateStreamPtr>& running_streams) { size_t total_prefill_len = 0; for (auto& stream : new_streams) { total_prefill_len += stream->currentExecuteTokenSize(); } for (auto& stream : running_streams) { stream->incBatchWithPrefillTimes(new_streams.size()); stream->incBatchWithPrefillLen(total_prefill_len); } } bool FIFOScheduler::waitPredicate() { return stop_ || !waiting_streams_.empty() || !running_streams_.empty() || !remote_running_streams_.empty(); } absl::StatusOr<list<GenerateStreamPtr>> FIFOScheduler::schedule(size_t reserve_step) { unique_lock<mutex> lock(lock_); if (need_fill_fake_stream_) { cond_.wait_for(lock, std::chrono::milliseconds(10), [this]{ return waitPredicate(); }); } else { cond_.wait(lock, [this]{ return waitPredicate(); }); } evaluateRunningRemote(); evictDoneStreams(waiting_streams_); evictDoneStreams(running_streams_); evictDoneStreams(remote_running_streams_); // TODO(xinfei.sxf) Those who just kicked out of running may join running again immediately. auto [fallback_streams, error_streams] = evaluateRunningNext(reserve_step); auto new_streams = scheduleNew(reserve_step); accountBatchMetrics(new_streams, running_streams_); running_streams_.insert(running_streams_.end(), new_streams.begin(), new_streams.end()); reportMetrics(fallback_streams); last_schedule_time_ = autil::TimeUtility::currentTimeInMilliSeconds(); return running_streams_; } int64_t FIFOScheduler::waitingStreamsSize() { return waiting_streams_.size(); } int64_t FIFOScheduler::runningStreamsSize() { return running_streams_.size(); } int64_t FIFOScheduler::onflightStreams() { unique_lock<mutex> lock(lock_); return waiting_streams_.size() + running_streams_.size(); } void FIFOScheduler::reportMetrics(size_t fallback_stream_size) { if (metrics_reporter_) { RtpLLMSchedulerMetricsCollector collector; collector.wait_stream_size = waiting_streams_.size(); collector.running_stream_size = running_streams_.size(); collector.remote_running_stream_size = remote_running_streams_.size(); collector.fallback_stream_size = fallback_stream_size; metrics_reporter_->report<RtpLLMSchedulerMetrics, RtpLLMSchedulerMetricsCollector>(nullptr, &collector); } } } // namespace rtp_llm