core/collection_pipeline/queue/ProcessQueueManager.cpp (298 lines of code) (raw):
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "collection_pipeline/queue/ProcessQueueManager.h"
#include "collection_pipeline/queue/BoundedProcessQueue.h"
#include "collection_pipeline/queue/CircularProcessQueue.h"
#include "collection_pipeline/queue/ExactlyOnceQueueManager.h"
#include "collection_pipeline/queue/QueueKeyManager.h"
#include "common/Flags.h"
DEFINE_FLAG_INT32(bounded_process_queue_capacity, "", 5);
DECLARE_FLAG_INT32(process_thread_count);
using namespace std;
namespace logtail {
ProcessQueueManager::ProcessQueueManager() : mBoundedQueueParam(INT32_FLAG(bounded_process_queue_capacity)) {
ResetCurrentQueueIndex();
}
bool ProcessQueueManager::CreateOrUpdateBoundedQueue(QueueKey key,
uint32_t priority,
const CollectionPipelineContext& ctx) {
lock_guard<mutex> lock(mQueueMux);
auto iter = mQueues.find(key);
if (iter != mQueues.end()) {
if (iter->second.second != QueueType::BOUNDED) {
// queue type change only happen when all input plugin types are changed. in such case, old input data not
// been processed can be discarded since whole pipeline is actually changed.
DeleteQueueEntity(iter->second.first);
CreateBoundedQueue(key, priority, ctx);
} else {
if ((*iter->second.first)->GetPriority() == priority) {
return false;
}
AdjustQueuePriority(iter->second.first, priority);
}
} else {
CreateBoundedQueue(key, priority, ctx);
}
if (mCurrentQueueIndex.second == mPriorityQueue[mCurrentQueueIndex.first].end()) {
mCurrentQueueIndex.second = mPriorityQueue[mCurrentQueueIndex.first].begin();
}
return true;
}
bool ProcessQueueManager::CreateOrUpdateCircularQueue(QueueKey key,
uint32_t priority,
size_t capacity,
const CollectionPipelineContext& ctx) {
lock_guard<mutex> lock(mQueueMux);
auto iter = mQueues.find(key);
if (iter != mQueues.end()) {
if (iter->second.second != QueueType::CIRCULAR) {
// queue type change only happen when all input plugin types are changed. in such case, old input data not
// been processed can be discarded since whole pipeline is actually changed.
DeleteQueueEntity(iter->second.first);
CreateCircularQueue(key, priority, capacity, ctx);
} else {
static_cast<CircularProcessQueue*>(iter->second.first->get())->Reset(capacity);
if ((*iter->second.first)->GetPriority() == priority) {
return false;
}
AdjustQueuePriority(iter->second.first, priority);
}
} else {
CreateCircularQueue(key, priority, capacity, ctx);
}
if (mCurrentQueueIndex.second == mPriorityQueue[mCurrentQueueIndex.first].end()) {
mCurrentQueueIndex.second = mPriorityQueue[mCurrentQueueIndex.first].begin();
}
return true;
}
bool ProcessQueueManager::DeleteQueue(QueueKey key) {
lock_guard<mutex> lock(mQueueMux);
auto iter = mQueues.find(key);
if (iter == mQueues.end()) {
return false;
}
DeleteQueueEntity(iter->second.first);
QueueKeyManager::GetInstance()->RemoveKey(iter->first);
mQueues.erase(iter);
return true;
}
bool ProcessQueueManager::IsValidToPush(QueueKey key) const {
lock_guard<mutex> lock(mQueueMux);
auto iter = mQueues.find(key);
if (iter != mQueues.end()) {
if (iter->second.second == QueueType::BOUNDED) {
return static_cast<BoundedProcessQueue*>(iter->second.first->get())->IsValidToPush();
} else {
return true;
}
}
return ExactlyOnceQueueManager::GetInstance()->IsValidToPushProcessQueue(key);
}
QueueStatus ProcessQueueManager::PushQueue(QueueKey key, unique_ptr<ProcessQueueItem>&& item) {
{
lock_guard<mutex> lock(mQueueMux);
auto iter = mQueues.find(key);
if (iter != mQueues.end()) {
if (!(*iter->second.first)->Push(std::move(item))) {
return QueueStatus::QUEUE_FULL;
}
} else {
auto res = ExactlyOnceQueueManager::GetInstance()->PushProcessQueue(key, std::move(item));
if (res != QueueStatus::OK) {
return res;
}
}
}
Trigger();
return QueueStatus::OK;
}
bool ProcessQueueManager::PopItem(int64_t threadNo, unique_ptr<ProcessQueueItem>& item, string& configName) {
configName.clear();
lock_guard<mutex> lock(mQueueMux);
for (size_t i = 0; i <= sMaxPriority; ++i) {
ProcessQueueIterator iter;
if (mCurrentQueueIndex.first == i) {
for (iter = mCurrentQueueIndex.second; iter != mPriorityQueue[i].end(); ++iter) {
if (!(*iter)->Pop(item)) {
continue;
}
configName = (*iter)->GetConfigName();
break;
}
if (configName.empty()) {
for (iter = mPriorityQueue[i].begin(); iter != mCurrentQueueIndex.second; ++iter) {
if (!(*iter)->Pop(item)) {
continue;
}
configName = (*iter)->GetConfigName();
break;
}
}
} else {
for (iter = mPriorityQueue[i].begin(); iter != mPriorityQueue[i].end(); ++iter) {
if (!(*iter)->Pop(item)) {
continue;
}
configName = (*iter)->GetConfigName();
break;
}
}
if (!configName.empty()) {
mCurrentQueueIndex.first = i;
mCurrentQueueIndex.second = ++iter;
if (mCurrentQueueIndex.second == mPriorityQueue[i].end()) {
mCurrentQueueIndex.second = mPriorityQueue[i].begin();
}
return true;
}
// find exactly once queues next
{
lock_guard<mutex> lock(ExactlyOnceQueueManager::GetInstance()->mProcessQueueMux);
for (auto iter = ExactlyOnceQueueManager::GetInstance()->mProcessPriorityQueue[i].begin();
iter != ExactlyOnceQueueManager::GetInstance()->mProcessPriorityQueue[i].end();
++iter) {
// process queue for exactly once can only be assgined to one specific thread
if (iter->GetKey() % INT32_FLAG(process_thread_count) != threadNo) {
continue;
}
if (!iter->Pop(item)) {
continue;
}
configName = iter->GetConfigName();
ResetCurrentQueueIndex();
return true;
}
}
}
ResetCurrentQueueIndex();
{
unique_lock<mutex> lock(mStateMux);
mValidToPop = false;
}
return false;
}
bool ProcessQueueManager::IsAllQueueEmpty() const {
{
lock_guard<mutex> lock(mQueueMux);
for (const auto& q : mQueues) {
if (!(*q.second.first)->Empty()) {
return false;
}
}
}
return ExactlyOnceQueueManager::GetInstance()->IsAllProcessQueueEmpty();
}
bool ProcessQueueManager::SetDownStreamQueues(QueueKey key, vector<BoundedSenderQueueInterface*>&& ques) {
lock_guard<mutex> lock(mQueueMux);
auto iter = mQueues.find(key);
if (iter == mQueues.end()) {
return false;
}
(*iter->second.first)->SetDownStreamQueues(std::move(ques));
return true;
}
bool ProcessQueueManager::SetFeedbackInterface(QueueKey key, vector<FeedbackInterface*>&& feedback) {
lock_guard<mutex> lock(mQueueMux);
auto iter = mQueues.find(key);
if (iter == mQueues.end()) {
return false;
}
if (iter->second.second == QueueType::CIRCULAR) {
return false;
}
static_cast<BoundedProcessQueue*>(iter->second.first->get())->SetUpStreamFeedbacks(std::move(feedback));
return true;
}
void ProcessQueueManager::DisablePop(const string& configName, bool isPipelineRemoving) {
if (QueueKeyManager::GetInstance()->HasKey(configName)) {
auto key = QueueKeyManager::GetInstance()->GetKey(configName);
lock_guard<mutex> lock(mQueueMux);
auto iter = mQueues.find(key);
if (iter != mQueues.end()) {
(*iter->second.first)->DisablePop();
if (!isPipelineRemoving) {
const auto& p = CollectionPipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
(*iter->second.first)->SetPipelineForItems(p);
}
}
}
} else {
ExactlyOnceQueueManager::GetInstance()->DisablePopProcessQueue(configName, isPipelineRemoving);
}
}
void ProcessQueueManager::EnablePop(const string& configName) {
if (QueueKeyManager::GetInstance()->HasKey(configName)) {
auto key = QueueKeyManager::GetInstance()->GetKey(configName);
lock_guard<mutex> lock(mQueueMux);
auto iter = mQueues.find(key);
if (iter != mQueues.end()) {
(*iter->second.first)->EnablePop();
}
} else {
ExactlyOnceQueueManager::GetInstance()->EnablePopProcessQueue(configName);
}
}
bool ProcessQueueManager::Wait(uint64_t ms) {
// TODO: use semaphore instead
unique_lock<mutex> lock(mStateMux);
mCond.wait_for(lock, chrono::milliseconds(ms), [this] { return mValidToPop; });
if (mValidToPop) {
mValidToPop = false;
return true;
}
return false;
}
void ProcessQueueManager::Trigger() {
{
lock_guard<mutex> lock(mStateMux);
mValidToPop = true;
}
mCond.notify_one();
}
void ProcessQueueManager::CreateBoundedQueue(QueueKey key, uint32_t priority, const CollectionPipelineContext& ctx) {
mPriorityQueue[priority].emplace_back(make_unique<BoundedProcessQueue>(mBoundedQueueParam.GetCapacity(),
mBoundedQueueParam.GetLowWatermark(),
mBoundedQueueParam.GetHighWatermark(),
key,
priority,
ctx));
mQueues[key] = make_pair(prev(mPriorityQueue[priority].end()), QueueType::BOUNDED);
}
void ProcessQueueManager::CreateCircularQueue(QueueKey key,
uint32_t priority,
size_t capacity,
const CollectionPipelineContext& ctx) {
mPriorityQueue[priority].emplace_back(make_unique<CircularProcessQueue>(capacity, key, priority, ctx));
mQueues[key] = make_pair(prev(mPriorityQueue[priority].end()), QueueType::CIRCULAR);
}
void ProcessQueueManager::AdjustQueuePriority(const ProcessQueueIterator& iter, uint32_t priority) {
uint32_t oldPriority = (*iter)->GetPriority();
auto nextQueIter = next(iter);
mPriorityQueue[priority].splice(mPriorityQueue[priority].end(), mPriorityQueue[oldPriority], iter);
(*iter)->SetPriority(priority);
if (mCurrentQueueIndex.first == oldPriority && mCurrentQueueIndex.second == iter) {
if (nextQueIter == mPriorityQueue[oldPriority].end()) {
mCurrentQueueIndex.second = mPriorityQueue[oldPriority].begin();
} else {
mCurrentQueueIndex.second = nextQueIter;
}
}
}
void ProcessQueueManager::DeleteQueueEntity(const ProcessQueueIterator& iter) {
uint32_t priority = (*iter)->GetPriority();
auto nextQueIter = mPriorityQueue[priority].erase(iter);
if (mCurrentQueueIndex.first == priority && mCurrentQueueIndex.second == iter) {
if (nextQueIter == mPriorityQueue[priority].end()) {
mCurrentQueueIndex.second = mPriorityQueue[priority].begin();
} else {
mCurrentQueueIndex.second = nextQueIter;
}
}
}
void ProcessQueueManager::ResetCurrentQueueIndex() {
mCurrentQueueIndex.first = 0;
mCurrentQueueIndex.second = mPriorityQueue[0].begin();
}
#ifdef APSARA_UNIT_TEST_MAIN
void ProcessQueueManager::Clear() {
lock_guard<mutex> lock(mQueueMux);
mQueues.clear();
for (size_t i = 0; i <= sMaxPriority; ++i) {
mPriorityQueue[i].clear();
}
ResetCurrentQueueIndex();
}
#endif
} // namespace logtail