core/unittest/reader/ForceReadUnittest.cpp (358 lines of code) (raw):
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <memory>
#include <string>
#include "collection_pipeline/CollectionPipeline.h"
#include "collection_pipeline/queue/ProcessQueueManager.h"
#include "common/FileSystemUtil.h"
#include "common/Flags.h"
#include "common/JsonUtil.h"
#include "config/CollectionConfig.h"
#include "constants/Constants.h"
#include "file_server/ConfigManager.h"
#include "file_server/FileServer.h"
#include "file_server/event/BlockEventManager.h"
#include "file_server/event/Event.h"
#include "file_server/event_handler/EventHandler.h"
#include "logger/Logger.h"
#include "unittest/Unittest.h"
using namespace std;
namespace logtail {
class ForceReadUnittest : public testing::Test {
public:
void TestTimeoutForceRead();
void TestFileCloseForceRead();
void TestAddTimeoutEvent();
protected:
void SetUp() override {
logPathDir = GetProcessExecutionDir();
if (PATH_SEPARATOR[0] == logPathDir.back()) {
logPathDir.resize(logPathDir.size() - 1);
}
logPathDir += PATH_SEPARATOR + "testDataSet" + PATH_SEPARATOR + "ForceReadUnittest";
utf8File = "utf8.txt";
std::string filepath = logPathDir + PATH_SEPARATOR + utf8File;
std::unique_ptr<FILE, decltype(&std::fclose)> fp(std::fopen(filepath.c_str(), "r"), &std::fclose);
if (!fp.get()) {
return;
}
std::fseek(fp.get(), 0, SEEK_END);
long filesize = std::ftell(fp.get());
std::fseek(fp.get(), 0, SEEK_SET);
expectedContent.reset(new char[filesize + 1]);
fread(expectedContent.get(), filesize, 1, fp.get());
expectedContent[filesize] = '\0';
}
void Init() {
// init pipeline and config
unique_ptr<Json::Value> configJson;
string configStr, errorMsg;
unique_ptr<CollectionConfig> config;
unique_ptr<CollectionPipeline> pipeline;
// new pipeline
configStr = R"(
{
"inputs": [
{
"Type": "input_file",
"FilePaths": [
")"
+ logPathDir + R"(/utf8.txt"
]
}
],
"flushers": [
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_endpoint"
}
]
}
)";
configJson.reset(new Json::Value());
APSARA_TEST_TRUE(ParseJsonTable(configStr, *configJson, errorMsg));
Json::Value inputConfigJson = (*configJson)["inputs"][0];
config.reset(new CollectionConfig(mConfigName, std::move(configJson)));
APSARA_TEST_TRUE(config->Parse());
pipeline.reset(new CollectionPipeline());
APSARA_TEST_TRUE(pipeline->Init(std::move(*config)));
ctx.SetPipeline(*pipeline.get());
ctx.SetConfigName(mConfigName);
ctx.SetProcessQueueKey(0);
discoveryOpts = FileDiscoveryOptions();
discoveryOpts.Init(inputConfigJson, ctx, "test");
mConfig = std::make_pair(&discoveryOpts, &ctx);
readerOpts.mInputType = FileReaderOptions::InputType::InputFile;
FileServer::GetInstance()->AddFileDiscoveryConfig(mConfigName, &discoveryOpts, &ctx);
FileServer::GetInstance()->AddFileReaderConfig(mConfigName, &readerOpts, &ctx);
FileServer::GetInstance()->AddMultilineConfig(mConfigName, &multilineOpts, &ctx);
FileServer::GetInstance()->AddFileTagConfig(mConfigName, &tagOpts, &ctx);
ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(0, 0, ctx);
ProcessQueueManager::GetInstance()->EnablePop(mConfigName);
}
void TearDown() override {
remove(utf8File.c_str());
for (auto iter = BlockedEventManager::GetInstance()->mEventMap.begin();
iter != BlockedEventManager::GetInstance()->mEventMap.end();
++iter) {
if (iter->second.mEvent != nullptr) {
delete iter->second.mEvent;
}
}
BlockedEventManager::GetInstance()->mEventMap.clear();
}
private:
std::unique_ptr<char[]> expectedContent;
static std::string logPathDir;
static std::string utf8File;
const std::string mConfigName = "##1.0##project-0$config-0";
FileDiscoveryOptions discoveryOpts;
FileReaderOptions readerOpts;
MultilineOptions multilineOpts;
FileTagOptions tagOpts;
CollectionPipelineContext ctx;
FileDiscoveryConfig mConfig;
};
std::string ForceReadUnittest::logPathDir;
std::string ForceReadUnittest::utf8File;
void ForceReadUnittest::TestTimeoutForceRead() {
{
// read -> add timeout event -> handle timeout -> valid -> read empty -> not rollback
Init();
LogFileReader reader(logPathDir,
utf8File,
DevInode(),
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
reader.UpdateReaderManual();
reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING);
reader.CheckFileSignatureAndOffset(true);
auto pHanlder = make_unique<ModifyHandler>(mConfigName, mConfig);
pHanlder->mReadFileTimeSlice = 0; // force one read for one event
Event e1 = Event(reader.mHostLogPathDir,
reader.mHostLogPathFile,
EVENT_MODIFY,
-1,
0,
reader.mDevInode.dev,
reader.mDevInode.inode);
pHanlder->Handle(e1);
std::unique_ptr<ProcessQueueItem> item;
std::string configName;
bool result = ProcessQueueManager::GetInstance()->PopItem(1, item, configName);
APSARA_TEST_TRUE_FATAL(result);
LogEvent& sourceEvent1 = item.get()->mEventGroup.MutableEvents()[0].Cast<LogEvent>();
std::string expectedPart1(expectedContent.get());
expectedPart1.resize(expectedPart1.rfind("\n"));
APSARA_TEST_STREQ_FATAL(sourceEvent1.GetContent(DEFAULT_CONTENT_KEY).data(), expectedPart1.c_str());
Event e2 = *pHanlder->mNameReaderMap[utf8File][0]->CreateFlushTimeoutEvent().get();
pHanlder->Handle(e2);
result = ProcessQueueManager::GetInstance()->PopItem(1, item, configName);
APSARA_TEST_TRUE_FATAL(result);
LogEvent& sourceEvent2 = item.get()->mEventGroup.MutableEvents()[0].Cast<LogEvent>();
std::string expectedPart2(expectedContent.get());
expectedPart2 = expectedPart2.substr(expectedPart2.rfind("\n") + 1);
APSARA_TEST_STREQ_FATAL(sourceEvent2.GetContent(DEFAULT_CONTENT_KEY).data(), expectedPart2.c_str());
}
{
// read -> write -> add timeout event -> handle timeout -> valid -> read not empty -> rollback
Init();
LogFileReader reader(logPathDir,
utf8File,
DevInode(),
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
reader.UpdateReaderManual();
reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING);
reader.CheckFileSignatureAndOffset(true);
std::string expectedPart1(expectedContent.get());
expectedPart1.resize(expectedPart1.find("\n"));
LogFileReader::BUFFER_SIZE = expectedPart1.size() + 1;
auto pHanlder = make_unique<ModifyHandler>(mConfigName, mConfig);
pHanlder->mReadFileTimeSlice = 0; // force one read for one event
Event e1 = Event(reader.mHostLogPathDir,
reader.mHostLogPathFile,
EVENT_MODIFY,
-1,
0,
reader.mDevInode.dev,
reader.mDevInode.inode);
pHanlder->Handle(e1);
std::unique_ptr<ProcessQueueItem> item;
std::string configName;
bool result = ProcessQueueManager::GetInstance()->PopItem(1, item, configName);
APSARA_TEST_TRUE_FATAL(result);
LogEvent& sourceEvent1 = item.get()->mEventGroup.MutableEvents()[0].Cast<LogEvent>();
APSARA_TEST_STREQ_FATAL(sourceEvent1.GetContent(DEFAULT_CONTENT_KEY).data(), expectedPart1.c_str());
Event e2 = *pHanlder->mNameReaderMap[utf8File][0]->CreateFlushTimeoutEvent().get();
pHanlder->Handle(e2);
result = ProcessQueueManager::GetInstance()->PopItem(1, item, configName);
APSARA_TEST_TRUE_FATAL(result);
LogEvent& sourceEvent2 = item.get()->mEventGroup.MutableEvents()[0].Cast<LogEvent>();
std::string expectedPart2(expectedContent.get());
// should rollback
expectedPart2
= expectedPart2.substr(expectedPart2.find("\n") + 1, expectedPart2.rfind("\n") - expectedPart1.size() - 1);
APSARA_TEST_STREQ_FATAL(sourceEvent2.GetContent(DEFAULT_CONTENT_KEY).data(), expectedPart2.c_str());
}
{
// read -> add timeout event -> write -> read -> handle timeout -> event invalid
LOG_WARNING(sLogger, ("This case is difficult to test", "test"));
Init();
LogFileReader reader(logPathDir,
utf8File,
DevInode(),
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
reader.UpdateReaderManual();
reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING);
reader.CheckFileSignatureAndOffset(true);
std::string expectedPart1(expectedContent.get());
expectedPart1.resize(expectedPart1.find("\n"));
LogFileReader::BUFFER_SIZE = expectedPart1.size() + 1;
auto pHanlder = make_unique<ModifyHandler>(mConfigName, mConfig);
pHanlder->mReadFileTimeSlice = 0; // force one read for one event
Event e1 = Event(reader.mHostLogPathDir,
reader.mHostLogPathFile,
EVENT_MODIFY,
-1,
0,
reader.mDevInode.dev,
reader.mDevInode.inode);
pHanlder->Handle(e1);
std::unique_ptr<ProcessQueueItem> item;
std::string configName;
bool result = ProcessQueueManager::GetInstance()->PopItem(1, item, configName);
APSARA_TEST_TRUE_FATAL(result);
LogEvent& sourceEvent1 = item.get()->mEventGroup.MutableEvents()[0].Cast<LogEvent>();
APSARA_TEST_STREQ_FATAL(sourceEvent1.GetContent(DEFAULT_CONTENT_KEY).data(), expectedPart1.c_str());
LogFileReader::BUFFER_SIZE = 1024 * 512;
Event e3 = *pHanlder->mNameReaderMap[utf8File][0]->CreateFlushTimeoutEvent().get();
Event e2 = Event(reader.mHostLogPathDir,
reader.mHostLogPathFile,
EVENT_MODIFY,
-1,
0,
reader.mDevInode.dev,
reader.mDevInode.inode);
pHanlder->Handle(e2);
result = ProcessQueueManager::GetInstance()->PopItem(1, item, configName);
APSARA_TEST_TRUE_FATAL(result);
LogEvent& sourceEvent2 = item.get()->mEventGroup.MutableEvents()[0].Cast<LogEvent>();
std::string expectedPart2(expectedContent.get());
// should rollback
expectedPart2
= expectedPart2.substr(expectedPart2.find("\n") + 1, expectedPart2.rfind("\n") - expectedPart1.size() - 1);
APSARA_TEST_STREQ_FATAL(sourceEvent2.GetContent(DEFAULT_CONTENT_KEY).data(), expectedPart2.c_str());
pHanlder->Handle(e3);
// Current timeout event is invalid
result = ProcessQueueManager::GetInstance()->PopItem(1, item, configName);
APSARA_TEST_FALSE_FATAL(result);
}
{
// TODO: difficult to test, the behavior should be
// read -> add timeout event -> handle timeout -> write -> event valid -> read not empty -> rollback
}
{
// TODO: difficult to test, the behavior should be
// read -> add timeout event -> handle timeout -> event valid -> write -> read not empty -> rollback
}
{
// TODO: difficult to test, the behavior should be
// read -> add timeout event -> handle timeout -> event valid -> read empty -> write -> not rollback
}
{
// TODO: difficult to test, the behavior should be
// read -> add timeout event -> handle timeout -> event valid -> read empty -> not rollback -> write
}
}
void ForceReadUnittest::TestFileCloseForceRead() {
{
// file close -> handle timeout -> valid -> not rollback
Init();
LogFileReader reader(logPathDir,
utf8File,
DevInode(),
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
reader.UpdateReaderManual();
reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING);
reader.CheckFileSignatureAndOffset(true);
LogFileReader::BUFFER_SIZE = 1024 * 512;
auto pHanlder = make_unique<ModifyHandler>(mConfigName, mConfig);
pHanlder->mReadFileTimeSlice = 0; // force one read for one event
Event e1 = Event(reader.mHostLogPathDir,
reader.mHostLogPathFile,
EVENT_MODIFY,
-1,
0,
reader.mDevInode.dev,
reader.mDevInode.inode);
pHanlder->Handle(e1);
std::unique_ptr<ProcessQueueItem> item;
std::string configName;
bool result = ProcessQueueManager::GetInstance()->PopItem(1, item, configName);
APSARA_TEST_TRUE_FATAL(result);
LogEvent& sourceEvent1 = item.get()->mEventGroup.MutableEvents()[0].Cast<LogEvent>();
std::string expectedPart1(expectedContent.get());
expectedPart1.resize(expectedPart1.rfind("\n"));
APSARA_TEST_STREQ_FATAL(sourceEvent1.GetContent(DEFAULT_CONTENT_KEY).data(), expectedPart1.c_str());
Event e2 = *pHanlder->mNameReaderMap[utf8File][0]->CreateFlushTimeoutEvent().get();
pHanlder->mNameReaderMap[utf8File][0]->CloseFilePtr();
pHanlder->Handle(e2);
result = ProcessQueueManager::GetInstance()->PopItem(1, item, configName);
APSARA_TEST_TRUE_FATAL(result);
LogEvent& sourceEvent2 = item.get()->mEventGroup.MutableEvents()[0].Cast<LogEvent>();
std::string expectedPart2(expectedContent.get());
expectedPart2 = expectedPart2.substr(expectedPart2.rfind("\n") + 1);
APSARA_TEST_STREQ_FATAL(sourceEvent2.GetContent(DEFAULT_CONTENT_KEY).data(), expectedPart2.c_str());
}
}
void ForceReadUnittest::TestAddTimeoutEvent() {
{
// read part -> not add timeout event
Init();
LogFileReader reader(logPathDir,
utf8File,
DevInode(),
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
reader.UpdateReaderManual();
reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING);
reader.CheckFileSignatureAndOffset(true);
LogFileReader::BUFFER_SIZE = 10;
auto pHanlder = make_unique<ModifyHandler>(mConfigName, mConfig);
pHanlder->mReadFileTimeSlice = 0; // force one read for one event
Event e1 = Event(reader.mHostLogPathDir,
reader.mHostLogPathFile,
EVENT_MODIFY,
-1,
0,
reader.mDevInode.dev,
reader.mDevInode.inode);
pHanlder->Handle(e1);
APSARA_TEST_EQUAL_FATAL(BlockedEventManager::GetInstance()->mEventMap.size(), 0U);
}
{
// read all -> add timeout event
Init();
LogFileReader reader(logPathDir,
utf8File,
DevInode(),
std::make_pair(&readerOpts, &ctx),
std::make_pair(&multilineOpts, &ctx),
std::make_pair(&tagOpts, &ctx));
reader.UpdateReaderManual();
reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING);
reader.CheckFileSignatureAndOffset(true);
LogFileReader::BUFFER_SIZE = 1024 * 512;
BlockedEventManager::GetInstance()->mEventMap.clear();
APSARA_TEST_EQUAL_FATAL(BlockedEventManager::GetInstance()->mEventMap.size(), 0U);
auto pHanlder = make_unique<ModifyHandler>(mConfigName, mConfig);
pHanlder->mReadFileTimeSlice = 0; // force one read for one event
Event e1 = Event(reader.mHostLogPathDir,
reader.mHostLogPathFile,
EVENT_MODIFY,
-1,
0,
reader.mDevInode.dev,
reader.mDevInode.inode);
pHanlder->Handle(e1);
APSARA_TEST_EQUAL_FATAL(BlockedEventManager::GetInstance()->mEventMap.size(), 1U);
}
}
UNIT_TEST_CASE(ForceReadUnittest, TestTimeoutForceRead)
UNIT_TEST_CASE(ForceReadUnittest, TestFileCloseForceRead)
UNIT_TEST_CASE(ForceReadUnittest, TestAddTimeoutEvent)
} // namespace logtail
UNIT_TEST_MAIN