core/unittest/plugin/PluginMock.h (157 lines of code) (raw):
/*
* Copyright 2023 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <string>
#include "collection_pipeline/CollectionPipeline.h"
#include "collection_pipeline/plugin/PluginRegistry.h"
#include "collection_pipeline/plugin/creator/StaticFlusherCreator.h"
#include "collection_pipeline/plugin/creator/StaticInputCreator.h"
#include "collection_pipeline/plugin/creator/StaticProcessorCreator.h"
#include "collection_pipeline/plugin/interface/Flusher.h"
#include "collection_pipeline/plugin/interface/HttpFlusher.h"
#include "collection_pipeline/plugin/interface/Input.h"
#include "collection_pipeline/plugin/interface/Processor.h"
#include "collection_pipeline/queue/SLSSenderQueueItem.h"
#include "collection_pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "task_pipeline/Task.h"
#include "task_pipeline/TaskRegistry.h"
namespace logtail {
class ProcessorInnerMock : public Processor {
public:
static const std::string sName;
const std::string& Name() const override { return sName; }
bool Init(const Json::Value& config) override { return true; }
void Process(PipelineEventGroup& logGroup) override { ++mCnt; };
uint32_t mCnt = 0;
protected:
bool IsSupportedEvent(const PipelineEventPtr& e) const override { return true; };
};
const std::string ProcessorInnerMock::sName = "processor_inner_mock";
class InputMock : public Input {
public:
static const std::string sName;
const std::string& Name() const override { return sName; }
bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) override {
if (config.isMember("SupportAck")) {
mSupportAck = config["SupportAck"].asBool();
}
auto processor = PluginRegistry::GetInstance()->CreateProcessor(
ProcessorInnerMock::sName, mContext->GetPipeline().GenNextPluginMeta(false));
processor->Init(Json::Value(), *mContext);
mInnerProcessors.emplace_back(std::move(processor));
return true;
}
bool Start() override { return true; }
bool Stop(bool isPipelineRemoving) override {
while (mBlockFlag) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return true;
}
bool SupportAck() const override { return mSupportAck; }
void Block() { mBlockFlag = true; }
void Unblock() { mBlockFlag = false; }
bool mSupportAck = true;
private:
std::atomic_bool mBlockFlag = false;
};
const std::string InputMock::sName = "input_mock";
class ProcessorMock : public Processor {
public:
static const std::string sName;
const std::string& Name() const override { return sName; }
bool Init(const Json::Value& config) override { return true; }
void Process(PipelineEventGroup& logGroup) override {
while (mBlockFlag) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
++mCnt;
};
void Block() { mBlockFlag = true; }
void Unblock() { mBlockFlag = false; }
uint32_t mCnt = 0;
protected:
bool IsSupportedEvent(const PipelineEventPtr& e) const override { return true; };
std::atomic_bool mBlockFlag = false;
};
const std::string ProcessorMock::sName = "processor_mock";
class FlusherMock : public Flusher {
public:
static const std::string sName;
const std::string& Name() const override { return sName; }
bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) override {
GenerateQueueKey("mock");
SenderQueueManager::GetInstance()->CreateQueue(mQueueKey, mPluginID, *mContext);
return true;
}
bool Send(PipelineEventGroup&& g) override { return mIsValid; }
bool Flush(size_t key) override {
mFlushedQueues.push_back(key);
return true;
}
bool FlushAll() override { return mIsValid; }
bool mIsValid = true;
std::vector<size_t> mFlushedQueues;
};
const std::string FlusherMock::sName = "flusher_mock";
class FlusherHttpMock : public HttpFlusher {
public:
static const std::string sName;
const std::string& Name() const override { return sName; }
bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) override {
GenerateQueueKey("mock");
SenderQueueManager::GetInstance()->CreateQueue(mQueueKey, mPluginID, *mContext);
return true;
}
bool Send(PipelineEventGroup&& g) override { return mIsValid; }
bool Flush(size_t key) override {
mFlushedQueues.push_back(key);
return true;
}
bool FlushAll() override { return mIsValid; }
bool BuildRequest(SenderQueueItem* item,
std::unique_ptr<HttpSinkRequest>& req,
bool* keepItem,
std::string* errMsg) override {
if (item->mData == "invalid_keep") {
*keepItem = true;
return false;
}
if (item->mData == "invalid_discard") {
*keepItem = false;
return false;
}
req = std::make_unique<HttpSinkRequest>(
"", false, "", 80, "", "", std::map<std::string, std::string>(), "", nullptr);
return true;
}
void OnSendDone(const HttpResponse& response, SenderQueueItem* item) override {}
bool mIsValid = true;
std::vector<size_t> mFlushedQueues;
};
const std::string FlusherHttpMock::sName = "flusher_http_mock";
class TaskMock : public Task {
public:
static const std::string sName;
const std::string& Name() const override { return sName; }
bool Init(const Json::Value& config) override {
if (config.isMember("Valid")) {
return config["Valid"].asBool();
}
return true;
}
void Start() override { mIsRunning = true; }
void Stop(bool isRemoving) { mIsRunning = false; }
bool mIsRunning = false;
};
const std::string TaskMock::sName = "task_mock";
void LoadPluginMock() {
PluginRegistry::GetInstance()->RegisterInputCreator(new StaticInputCreator<InputMock>());
PluginRegistry::GetInstance()->RegisterProcessorCreator(new StaticProcessorCreator<ProcessorInnerMock>());
PluginRegistry::GetInstance()->RegisterProcessorCreator(new StaticProcessorCreator<ProcessorMock>());
PluginRegistry::GetInstance()->RegisterFlusherCreator(new StaticFlusherCreator<FlusherMock>());
PluginRegistry::GetInstance()->RegisterFlusherCreator(new StaticFlusherCreator<FlusherHttpMock>());
}
void LoadTaskMock() {
TaskRegistry::GetInstance()->RegisterCreator(TaskMock::sName, []() { return std::make_unique<TaskMock>(); });
}
} // namespace logtail