core/collection_pipeline/queue/ExactlyOnceQueueManager.cpp (243 lines of code) (raw):
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "collection_pipeline/queue/ExactlyOnceQueueManager.h"
#include "collection_pipeline/queue/ProcessQueueManager.h"
#include "collection_pipeline/queue/QueueKeyManager.h"
#include "common/Flags.h"
#include "common/TimeUtil.h"
#include "logger/Logger.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "plugin/input/InputFile.h"
DEFINE_FLAG_INT32(logtail_queue_gc_threshold_sec, "2min", 2 * 60);
DEFINE_FLAG_INT64(logtail_queue_max_used_time_per_round_in_msec, "500ms", 500);
DECLARE_FLAG_INT32(bounded_process_queue_capacity);
using namespace std;
namespace logtail {
ExactlyOnceQueueManager::ExactlyOnceQueueManager() : mProcessQueueParam(INT32_FLAG(bounded_process_queue_capacity)) {
}
bool ExactlyOnceQueueManager::CreateOrUpdateQueue(QueueKey key,
uint32_t priority,
const CollectionPipelineContext& ctx,
const vector<RangeCheckpointPtr>& checkpoints) {
{
lock_guard<mutex> lock(mGCMux);
mQueueDeletionTimeMap.erase(key);
}
vector<BoundedSenderQueueInterface*> senderQueue;
{
lock_guard<mutex> lock(mSenderQueueMux);
auto iter = mSenderQueues.find(key);
if (iter != mSenderQueues.end()) {
iter->second.Reset(checkpoints);
} else {
mSenderQueues.try_emplace(key, checkpoints, key, ctx);
iter = mSenderQueues.find(key);
}
// limiters are set on first push to the queue
senderQueue.emplace_back(&iter->second);
}
{
lock_guard<mutex> lock(mProcessQueueMux);
auto iter = mProcessQueues.find(key);
if (iter != mProcessQueues.end()) {
if (iter->second->GetPriority() != priority) {
mProcessPriorityQueue[priority].splice(mProcessPriorityQueue[priority].end(),
mProcessPriorityQueue[iter->second->GetPriority()],
iter->second);
iter->second->SetPriority(priority);
}
iter->second->SetConfigName(ctx.GetConfigName());
// note: do not reset process queue, to be the same as original implementation
} else {
// note: Ideally, queue capacity should be the same as checkpoint size. However, since process queue cannot
// be reset during update, we temporarily use common param. If checkpoints size is larger than common
// capacity, performance will restricted.
mProcessPriorityQueue[priority].emplace_back(mProcessQueueParam.GetCapacity(),
mProcessQueueParam.GetLowWatermark(),
mProcessQueueParam.GetHighWatermark(),
key,
priority,
ctx);
// mProcessPriorityQueue[priority].emplace_back(
// checkpoints.size(), checkpoints.size() - 1, checkpoints.size(), key, priority, config);
mProcessQueues[key] = prev(mProcessPriorityQueue[priority].end());
}
// for exactly once, the feedback is one to one
mProcessQueues[key]->SetDownStreamQueues(std::move(senderQueue));
// exactly once can only be applied to input_file
vector<FeedbackInterface*> feedbacks{
InputFeedbackInterfaceRegistry::GetInstance()->GetFeedbackInterface(InputFile::sName)};
mProcessQueues[key]->SetUpStreamFeedbacks(std::move(feedbacks));
}
return true;
}
bool ExactlyOnceQueueManager::DeleteQueue(QueueKey key) {
bool isProcessQueueExisted = false, isSenderQueueExisted = false;
{
lock_guard<mutex> lock(mProcessQueueMux);
isProcessQueueExisted = mProcessQueues.find(key) != mProcessQueues.end();
}
{
lock_guard<mutex> lock(mSenderQueueMux);
isSenderQueueExisted = mSenderQueues.find(key) != mSenderQueues.end();
}
if (!isProcessQueueExisted && !isSenderQueueExisted) {
return false;
}
if (!isProcessQueueExisted || !isSenderQueueExisted) {
// should not happen
}
{
lock_guard<mutex> lock(mGCMux);
if (mQueueDeletionTimeMap.find(key) != mQueueDeletionTimeMap.end()) {
return false;
}
mQueueDeletionTimeMap[key] = time(nullptr);
}
return true;
}
bool ExactlyOnceQueueManager::IsValidToPushProcessQueue(QueueKey key) const {
lock_guard<mutex> lock(mProcessQueueMux);
auto iter = mProcessQueues.find(key);
if (iter == mProcessQueues.end()) {
return false;
}
return iter->second->IsValidToPush();
}
QueueStatus ExactlyOnceQueueManager::PushProcessQueue(QueueKey key, unique_ptr<ProcessQueueItem>&& item) {
lock_guard<mutex> lock(mProcessQueueMux);
auto iter = mProcessQueues.find(key);
if (iter == mProcessQueues.end()) {
return QueueStatus::QUEUE_NOT_EXIST;
}
if (!iter->second->Push(std::move(item))) {
return QueueStatus::QUEUE_FULL;
}
return QueueStatus::OK;
}
bool ExactlyOnceQueueManager::IsAllProcessQueueEmpty() const {
lock_guard<mutex> lock(mProcessQueueMux);
for (const auto& q : mProcessQueues) {
if (!q.second->Empty()) {
return false;
}
}
return true;
}
void ExactlyOnceQueueManager::DisablePopProcessQueue(const string& configName, bool isPipelineRemoving) {
lock_guard<mutex> lock(mProcessQueueMux);
for (auto& iter : mProcessQueues) {
if (iter.second->GetConfigName() == configName) {
iter.second->DisablePop();
if (!isPipelineRemoving) {
const auto& p = CollectionPipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
iter.second->SetPipelineForItems(p);
}
}
}
}
}
void ExactlyOnceQueueManager::EnablePopProcessQueue(const string& configName) {
lock_guard<mutex> lock(mProcessQueueMux);
for (auto& iter : mProcessQueues) {
if (iter.second->GetConfigName() == configName) {
iter.second->EnablePop();
}
}
}
int ExactlyOnceQueueManager::PushSenderQueue(QueueKey key, unique_ptr<SenderQueueItem>&& item) {
lock_guard<mutex> lock(mSenderQueueMux);
auto iter = mSenderQueues.find(key);
if (iter == mSenderQueues.end()) {
return 2;
}
if (!iter->second.Push(std::move(item))) {
return 1;
}
return 0;
}
void ExactlyOnceQueueManager::GetAvailableSenderQueueItems(std::vector<SenderQueueItem*>& item, int32_t itemsCntLimit) {
lock_guard<mutex> lock(mSenderQueueMux);
for (auto iter = mSenderQueues.begin(); iter != mSenderQueues.end(); ++iter) {
iter->second.GetAvailableItems(item, itemsCntLimit);
}
}
bool ExactlyOnceQueueManager::RemoveSenderQueueItem(QueueKey key, SenderQueueItem* item) {
lock_guard<mutex> lock(mSenderQueueMux);
auto iter = mSenderQueues.find(key);
if (iter == mSenderQueues.end()) {
return false;
}
return iter->second.Remove(item);
}
bool ExactlyOnceQueueManager::IsAllSenderQueueEmpty() const {
lock_guard<mutex> lock(mSenderQueueMux);
for (const auto& q : mSenderQueues) {
if (!q.second.Empty()) {
return false;
}
}
return true;
}
void ExactlyOnceQueueManager::ClearTimeoutQueues() {
auto const curTime = time(nullptr);
const auto startTimeMs = GetCurrentTimeInMilliSeconds();
lock_guard<mutex> lock(mGCMux);
auto iter = mQueueDeletionTimeMap.begin();
while (iter != mQueueDeletionTimeMap.end()) {
if (GetCurrentTimeInMilliSeconds() - startTimeMs
>= static_cast<uint64_t>(INT64_FLAG(logtail_queue_max_used_time_per_round_in_msec))) {
break;
}
if (!(curTime >= iter->second && curTime - iter->second >= INT32_FLAG(logtail_queue_gc_threshold_sec))) {
++iter;
continue;
}
{
lock_guard<mutex> lock(mProcessQueueMux);
auto itr = mProcessQueues.find(iter->first);
if (itr == mProcessQueues.end()) {
// should not happen
continue;
}
if (!itr->second->Empty()) {
++iter;
continue;
}
}
{
lock_guard<mutex> lock(mSenderQueueMux);
auto itr = mSenderQueues.find(iter->first);
if (itr == mSenderQueues.end()) {
// should not happen
continue;
}
if (!itr->second.Empty()) {
++iter;
continue;
}
}
{
lock_guard<mutex> lock(mProcessQueueMux);
auto queueItr = mProcessQueues.find(iter->first);
mProcessPriorityQueue[queueItr->second->GetPriority()].erase(queueItr->second);
mProcessQueues.erase(queueItr);
}
{
lock_guard<mutex> lock(mSenderQueueMux);
mSenderQueues.erase(iter->first);
}
QueueKeyManager::GetInstance()->RemoveKey(iter->first);
iter = mQueueDeletionTimeMap.erase(iter);
}
}
void ExactlyOnceQueueManager::SetPipelineForSenderItems(QueueKey key, const std::shared_ptr<CollectionPipeline>& p) {
lock_guard<mutex> lock(mSenderQueueMux);
auto iter = mSenderQueues.find(key);
if (iter != mSenderQueues.end()) {
iter->second.SetPipelineForItems(p);
}
}
#ifdef APSARA_UNIT_TEST_MAIN
void ExactlyOnceQueueManager::Clear() {
{
lock_guard<mutex> lock(mProcessQueueMux);
mProcessQueues.clear();
for (size_t i = 0; i <= ProcessQueueManager::sMaxPriority; ++i) {
mProcessPriorityQueue[i].clear();
}
}
{
lock_guard<mutex> lock(mSenderQueueMux);
mSenderQueues.clear();
}
}
#endif
} // namespace logtail