core/ebpf/util/AggregateTree.h (133 lines of code) (raw):
// Copyright 2025 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "common/memory/SourceBuffer.h"
#include "logger/Logger.h"
namespace logtail {
template <class Data, class Value, class KeyType, bool NeedSourceBuffer>
class AggTree;
template <class Data, class KeyType>
class AggNode {
public:
AggNode(const std::shared_ptr<SourceBuffer>& sourceBuffer = nullptr) : mSourceBuffer(sourceBuffer) {}
std::unordered_map<KeyType, std::unique_ptr<AggNode>> mChild;
std::shared_ptr<SourceBuffer> mSourceBuffer;
std::unique_ptr<Data> mData;
};
template <class Data, class Value, class KeyType, bool NeedSourceBuffer>
class AggTree {
private:
size_t mMaxNodes = 0UL;
size_t mNodeCount = 0UL;
std::unique_ptr<AggNode<Data, KeyType>> mRootNode;
std::function<void(std::unique_ptr<Data>& base, const Value& n)> mAggregateFunc;
std::function<std::unique_ptr<Data>(const Value& n, std::shared_ptr<SourceBuffer>& sourceBuffer)> mBuildFunc;
#ifdef APSARA_UNIT_TEST_MAIN
friend class eBPFServerUnittest;
#endif
public:
AggTree(size_t maxNodes,
const std::function<void(std::unique_ptr<Data>&, const Value&)>& aggregateFunc,
const std::function<std::unique_ptr<Data>(const Value& n, std::shared_ptr<SourceBuffer>& sourceBuffer)>&
buildFunc)
: mMaxNodes(maxNodes),
mRootNode(std::make_unique<AggNode<Data, KeyType>>()),
mAggregateFunc(aggregateFunc),
mBuildFunc(buildFunc) {}
AggTree(AggTree<Data, Value, KeyType, NeedSourceBuffer>&& other) noexcept
: mMaxNodes(other.mMaxNodes),
mNodeCount(other.mNodeCount),
mRootNode(std::move(other.mRootNode)),
mAggregateFunc(other.mAggregateFunc),
mBuildFunc(other.mBuildFunc) {}
AggTree& operator=(AggTree<Data, Value, KeyType, NeedSourceBuffer>&& other) noexcept {
mMaxNodes = other.mMaxNodes;
mNodeCount = other.mNodeCount;
mRootNode = std::move(other.mRootNode);
mAggregateFunc = other.mAggregateFunc;
mBuildFunc = other.mBuildFunc;
return *this;
}
AggTree<Data, Value, KeyType, NeedSourceBuffer> GetAndReset() {
AggTree<Data, Value, KeyType, NeedSourceBuffer> res = std::move(*this);
Reset();
return res;
}
template <class ContainerType>
bool Aggregate(const Value& d, const ContainerType& aggKeys) {
auto p = mRootNode.get();
for (auto& val : aggKeys) {
auto result = p->mChild.find(val);
if (result == p->mChild.end() && mNodeCount >= mMaxNodes) {
// when we exceed the maximum limit, we will drop new metrics
LOG_ERROR(sLogger, ("maximum limit exceeded", mMaxNodes));
return false;
}
if (result == p->mChild.end()) {
auto newNode = std::make_unique<AggNode<Data, KeyType>>();
if (!p->mSourceBuffer) {
if (NeedSourceBuffer) {
// level1 nodes will setup new sourcebuffer ...
newNode->mSourceBuffer = std::make_shared<SourceBuffer>();
}
} else {
// level2 or lower nodes will hold the ref of level1 node's
newNode->mSourceBuffer = p->mSourceBuffer;
}
auto ptr = newNode.get();
p->mChild[val] = std::move(newNode);
p = ptr;
mNodeCount++;
} else {
p = result->second.get();
}
}
if (!p->mData) {
// generate new node ...
p->mData = mBuildFunc(d, p->mSourceBuffer);
}
mAggregateFunc(p->mData, d);
return true;
}
std::vector<AggNode<Data, KeyType>*> GetNodesWithAggDepth(size_t i) {
std::vector<AggNode<Data, KeyType>*> ans;
GetNodes(1, mRootNode, i, ans);
return ans;
}
void ForEach(const std::function<void(const Data*)>& call) { ForEach(mRootNode.get(), call); }
void Reset() {
// mRootNode = std::make_unique<AggNode<Data, KeyType>>(nullptr);
mRootNode = std::make_unique<AggNode<Data, KeyType>>();
mNodeCount = 0;
}
void ForEach(const AggNode<Data, KeyType>* root, const std::function<void(const Data*)>& call) {
if (root == nullptr) {
return;
}
if (root->mData != nullptr) {
call(root->mData.get());
}
for (auto& entry : root->mChild) {
ForEach(entry.second.get(), call);
}
}
[[nodiscard]] size_t NodeCount() const { return mNodeCount; }
private:
void GetNodes(size_t depth,
const std::unique_ptr<AggNode<Data, KeyType>>& root,
size_t targetDepth,
std::vector<AggNode<Data, KeyType>*>& ans) {
if (depth >= targetDepth) {
// we are done, add mChild to ans and return
for (auto& c : root->mChild) {
ans.push_back(c.second.get());
}
return;
}
// go to next depth
for (auto& c : root->mChild) {
GetNodes(depth + 1, c.second, targetDepth, ans);
}
}
};
// template <typename T, typename U>
// using StringAggTree = AggTree<T, U, std::string, false>;
// template <typename T>
// using StringAggNode = AggNode<T, std::string, false>;
template <typename T, typename U>
using SIZETAggTree = AggTree<T, U, size_t, false>;
template <typename T, typename U>
using SIZETAggTreeWithSourceBuffer = AggTree<T, U, size_t, true>;
// template <typename T>
// using SIZETAggNode = AggNode<T, size_t, false>;
} // namespace logtail