core/collection_pipeline/queue/SenderQueueManager.cpp (208 lines of code) (raw):

// Copyright 2024 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "collection_pipeline/queue/SenderQueueManager.h" #include "collection_pipeline/queue/ExactlyOnceQueueManager.h" #include "collection_pipeline/queue/QueueKeyManager.h" #include "common/Flags.h" DEFINE_FLAG_INT32(sender_queue_gc_threshold_sec, "30s", 30); DEFINE_FLAG_INT32(sender_queue_capacity, "", 15); using namespace std; namespace logtail { SenderQueueManager::SenderQueueManager() : mDefaultQueueParam(INT32_FLAG(sender_queue_capacity), 1.0) { } bool SenderQueueManager::CreateQueue( QueueKey key, const string& flusherId, const CollectionPipelineContext& ctx, std::unordered_map<std::string, std::shared_ptr<ConcurrencyLimiter>>&& concurrencyLimitersMap, uint32_t maxRate) { lock_guard<mutex> lock(mQueueMux); auto iter = mQueues.find(key); if (iter == mQueues.end()) { mQueues.try_emplace(key, mDefaultQueueParam.GetCapacity(), mDefaultQueueParam.GetLowWatermark(), mDefaultQueueParam.GetHighWatermark(), key, flusherId, ctx); iter = mQueues.find(key); } iter->second.SetConcurrencyLimiters(std::move(concurrencyLimitersMap)); iter->second.SetRateLimiter(maxRate); return true; } SenderQueue* SenderQueueManager::GetQueue(QueueKey key) { lock_guard<mutex> lock(mQueueMux); auto iter = mQueues.find(key); if (iter != mQueues.end()) { return &iter->second; } return nullptr; } bool SenderQueueManager::DeleteQueue(QueueKey key) { { lock_guard<mutex> lock(mQueueMux); auto iter = mQueues.find(key); if (iter == mQueues.end()) { return false; } } { lock_guard<mutex> lock(mGCMux); if (mQueueDeletionTimeMap.find(key) != mQueueDeletionTimeMap.end()) { return false; } mQueueDeletionTimeMap[key] = time(nullptr); } return true; } bool SenderQueueManager::ReuseQueue(QueueKey key) { lock_guard<mutex> lock(mGCMux); auto iter = mQueueDeletionTimeMap.find(key); if (iter == mQueueDeletionTimeMap.end()) { return false; } mQueueDeletionTimeMap.erase(iter); return true; } int SenderQueueManager::PushQueue(QueueKey key, unique_ptr<SenderQueueItem>&& item) { { lock_guard<mutex> lock(mQueueMux); auto iter = mQueues.find(key); if (iter != mQueues.end()) { if (!iter->second.Push(std::move(item))) { return 1; } } else { int res = ExactlyOnceQueueManager::GetInstance()->PushSenderQueue(key, std::move(item)); if (res != 0) { return res; } } } Trigger(); return 0; } void SenderQueueManager::GetAvailableItems(vector<SenderQueueItem*>& items, int32_t itemsCntLimit) { { lock_guard<mutex> lock(mQueueMux); if (mQueues.empty()) { return; } if (itemsCntLimit == -1) { for (auto iter = mQueues.begin(); iter != mQueues.end(); ++iter) { iter->second.GetAvailableItems(items, -1); } } else { int cntLimitPerQueue = std::max((int)(mDefaultQueueParam.GetCapacity() * 0.3), (int)(itemsCntLimit / mQueues.size())); // must check index before moving iterator mSenderQueueBeginIndex = mSenderQueueBeginIndex % mQueues.size(); // here we set sender queue begin index, let the sender order be different each time auto beginIter = mQueues.begin(); std::advance(beginIter, mSenderQueueBeginIndex++); for (auto iter = beginIter; iter != mQueues.end(); ++iter) { iter->second.GetAvailableItems(items, cntLimitPerQueue); } for (auto iter = mQueues.begin(); iter != beginIter; ++iter) { iter->second.GetAvailableItems(items, cntLimitPerQueue); } } } ExactlyOnceQueueManager::GetInstance()->GetAvailableSenderQueueItems(items, itemsCntLimit); } bool SenderQueueManager::RemoveItem(QueueKey key, SenderQueueItem* item) { { lock_guard<mutex> lock(mQueueMux); auto iter = mQueues.find(key); if (iter != mQueues.end()) { return iter->second.Remove(item); } } return ExactlyOnceQueueManager::GetInstance()->RemoveSenderQueueItem(key, item); } void SenderQueueManager::DecreaseConcurrencyLimiterInSendingCnt(QueueKey key) { lock_guard<mutex> lock(mQueueMux); auto iter = mQueues.find(key); if (iter != mQueues.end()) { iter->second.DecreaseSendingCnt(); } } bool SenderQueueManager::IsAllQueueEmpty() const { { lock_guard<mutex> lock(mQueueMux); for (const auto& q : mQueues) { if (!q.second.Empty()) { return false; } } } return ExactlyOnceQueueManager::GetInstance()->IsAllSenderQueueEmpty(); } void SenderQueueManager::ClearUnusedQueues() { auto const curTime = time(nullptr); lock_guard<mutex> lock(mGCMux); auto iter = mQueueDeletionTimeMap.begin(); while (iter != mQueueDeletionTimeMap.end()) { if (!(curTime >= iter->second && curTime - iter->second >= INT32_FLAG(sender_queue_gc_threshold_sec))) { ++iter; continue; } { lock_guard<mutex> lock(mQueueMux); auto itr = mQueues.find(iter->first); if (itr == mQueues.end()) { // should not happen continue; } if (!itr->second.Empty()) { ++iter; continue; } mQueues.erase(itr); } QueueKeyManager::GetInstance()->RemoveKey(iter->first); iter = mQueueDeletionTimeMap.erase(iter); } } bool SenderQueueManager::IsValidToPush(QueueKey key) const { lock_guard<mutex> lock(mQueueMux); auto iter = mQueues.find(key); if (iter != mQueues.end()) { return iter->second.IsValidToPush(); } // no need to check exactly once queue, since the caller does not support exactly once // should not happen return false; } bool SenderQueueManager::Wait(uint64_t ms) { // TODO: use semaphore instead unique_lock<mutex> lock(mStateMux); mCond.wait_for(lock, chrono::milliseconds(ms), [this] { return mValidToPop; }); if (mValidToPop) { mValidToPop = false; return true; } return false; } void SenderQueueManager::Trigger() { { lock_guard<mutex> lock(mStateMux); mValidToPop = true; } mCond.notify_one(); } void SenderQueueManager::SetPipelineForItems(QueueKey key, const std::shared_ptr<CollectionPipeline>& p) { lock_guard<mutex> lock(mQueueMux); auto iter = mQueues.find(key); if (iter != mQueues.end()) { iter->second.SetPipelineForItems(p); } else { ExactlyOnceQueueManager::GetInstance()->SetPipelineForSenderItems(key, p); } } #ifdef APSARA_UNIT_TEST_MAIN void SenderQueueManager::Clear() { lock_guard<mutex> lock(mQueueMux); mQueues.clear(); mQueueDeletionTimeMap.clear(); } bool SenderQueueManager::IsQueueMarkedDeleted(QueueKey key) { lock_guard<mutex> lock(mGCMux); return mQueueDeletionTimeMap.find(key) != mQueueDeletionTimeMap.end(); } #endif } // namespace logtail