core/unittest/config/ConfigUpdatorUnittest.cpp (2,419 lines of code) (raw):

// Copyright 2022 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "unittest/Unittest.h" #if defined(__linux__) #include <unistd.h> #endif #include <cstdlib> #include <fcntl.h> #include <string.h> #include <fstream> #include <memory> #include <thread> #include "json/json.h" #include "AlarmManager.h" #include "AppConfig.h" #include "CheckPointManager.h" #include "EventDispatcher.h" #include "Monitor.h" #include "Sender.h" #include "common/FileSystemUtil.h" #include "common/Flags.h" #include "common/Lock.h" #include "constants/Constants.h" #include "file_server/ConfigManager.h" #include "file_server/event_handler/EventHandler.h" #include "file_server/event_handler/LogInput.h" #include "file_server/reader/LogFileReader.h" #include "logger/Logger.h" #include "sdk/Common.h" #include "sls_logs.pb.h" using namespace std; using namespace logtail::sdk; using namespace sls_logs; DECLARE_FLAG_INT32(max_holded_data_size); DECLARE_FLAG_INT32(max_buffer_num); DECLARE_FLAG_BOOL(enable_mock_send); DECLARE_FLAG_STRING(check_point_filename); DECLARE_FLAG_STRING(user_log_config); DECLARE_FLAG_STRING(ilogtail_config); DECLARE_FLAG_INT32(mem_check_point_time_out); DECLARE_FLAG_INT32(file_check_point_time_out); DECLARE_FLAG_INT32(check_point_check_interval); DECLARE_FLAG_STRING(default_global_topic); DECLARE_FLAG_INT32(default_send_byte_per_sec); DECLARE_FLAG_INT32(default_buffer_file_num); DECLARE_FLAG_INT32(default_local_file_size); DECLARE_FLAG_STRING(default_buffer_file_path); DECLARE_FLAG_INT32(default_max_inotify_watch_num); DECLARE_FLAG_STRING(profile_project_name); DECLARE_FLAG_INT32(check_base_dir_interval); DECLARE_FLAG_INT32(batch_send_interval); DECLARE_FLAG_INT32(check_point_version); DECLARE_FLAG_INT32(check_point_dump_interval); DECLARE_FLAG_INT32(global_pub_config_retry_interval); DECLARE_FLAG_INT32(dirfile_check_interval_ms); DECLARE_FLAG_INT32(polling_modify_repush_interval); DECLARE_FLAG_BOOL(default_global_fuse_mode); namespace logtail { string gPath[] = {"apsara_log", "comm"}; const int WRITE_LOG_SLEEP_INTERVAL = 10 * 1000; const int32_t WAIT_CONFIG_UPDATE_INTERVAL = 5; const string LOCAL_UUID = "97cfc7d3-bc21-dc4f-a190-43a83e552156"; SpinLock mResponseLock; std::unique_ptr<std::thread> gDispatchThreadId; void RunningDispatcher() { LOG_INFO(sLogger, ("RunningDispatcher", "Begin")); ConfigManager::GetInstance()->RegisterHandlers(); EventDispatcher::GetInstance()->Dispatch(); LOG_INFO(sLogger, ("RunningDispatcher", "End")); } class ConfigUpdatorUnittest : public ::testing::Test { private: // Shorten typing of PATH_SEPARATOR. const std::string& PS = PATH_SEPARATOR; static std::unordered_map<std::string, std::string> flagMap; static string mRootDir; static string sResponse; static unordered_map<string, int> sProjectNameCountMap; static unordered_map<string, int> sProjectCategoryTopicCountMap; static unordered_map<string, int> sTopicCountMap; static string sDaemonDir; static string sSecurityResponse; static string sAccessKeyResponse; static bool sReplaceKey; static unordered_map<string, string> sRegionResponseMap; static unordered_set<string> sConfigRejectSet; void CaseSetup(bool replaceConfigAllowed = true); void CaseCleanup(); void RemoveConfigFile(); static void GetLogContent(LogType logType, char* buffer); static void DumpLog(int logNum, string path, enum LogType logType, string fileName = string("job.log")); void DumpInitConfigToLocal(string localPath = string("")); void SetupGlobalFuseMode(bool globalFuseMode); void DumpCustomizedConfigToLocal(bool check_ulogfs_env, bool fuse_mode); static bool MockGetLogtailConfig(const std::string& targetURL, const std::string& intf, const std::string& ip, const std::string& config, std::string& response, std::string& errorMessage, bool isCompress); static bool MockGetLogtailSecurity(const std::string& targetURL, const std::string& intf, bool httpsVerifyPeer, const std::string& request, std::string& response, std::string& errorMessage, const std::string& caCert); static bool MockGetAccessKey(const std::string& targetURL, const std::string& intf, bool httpsVerifyPeer, const std::string& request, std::string& response, std::string& errorMessage, const std::string& caCert); static void MockAsyncSend(const std::string& projectName, const std::string& logstore, const std::string& logData, SEND_DATA_TYPE dataType, int32_t rawSize, sls_logs::SlsCompressType compressType, SendClosure* sendClosure); static string GetConfigResponse(); static void SetConfigResponse(const string& response); static void SetConfigReject(const string& configUrl) { ScopedSpinLock lock(mResponseLock); sConfigRejectSet.insert(configUrl); if (0 == configUrl.find("http://")) { sConfigRejectSet.insert("https://" + configUrl.substr(std::string("http://").length())); } } static bool IsConfigRejected(const string& configUrl) { ScopedSpinLock lock(mResponseLock); if (sConfigRejectSet.find(configUrl) != sConfigRejectSet.end()) { return true; } return false; } static void SetRegionConfigResponse(const string& region, const string& response) { LOG_INFO(sLogger, ("SetRegionConfigResponse", region)("response", response)); ScopedSpinLock lock(mResponseLock); sRegionResponseMap[region] = response; if (0 == region.find("http://")) { sRegionResponseMap["https://" + region.substr(std::string("http://").length())] = response; } } static string GetRegionConfigResponse(const string& region) { ScopedSpinLock lock(mResponseLock); return sRegionResponseMap[region]; } static void ClearConfigResponse() { ScopedSpinLock lock(mResponseLock); sRegionResponseMap.clear(); sResponse.clear(); sConfigRejectSet.clear(); } // SetConfigFilePattern sets the file_pattern field for @configJson. // On Windows, bracket pattern is not supported. static void SetConfigFilePattern(Json::Value& configJson) { #if defined(__linux__) configJson["file_pattern"] = Json::Value("*.[Ll][Oo][Gg]"); #elif defined(_MSC_VER) configJson["file_pattern"] = Json::Value("*.log"); #endif } // Wait several seconds to make sure test log files have been read. static void WaitForFileBeenRead() { #if defined(_MSC_VER) // Because of lackness of event based discovery, on Windows, we can only use // polling, so wait for polling interval plus several process time (2s here). logtail::sleep(INT32_FLAG(dirfile_check_interval_ms) / 1000 + 2); #endif } public: static void SetUpTestCase(); void SetUp() { AppConfig::GetInstance()->DumpAllFlagsToMap(flagMap); bfs::create_directories(mRootDir); } void TearDown() { AppConfig::GetInstance()->ReadFlagsFromMap(flagMap); bfs::remove_all(mRootDir); bfs::remove(mRootDir + PS + "loongcollector_config.json"); bfs::remove(STRING_FLAG(check_point_filename)); } void SetupContainerModeConfig(); void ReplaceWithContainerModeConfig(); void TestCheckPointManager(); void TestConfigUpdate(); void TestLocalConfigUpdate(); void TestUpdatePath(); void TestUpdateGlobalConfig(); void TestUpdateProfileProject(); void TestValidPath(); void TestBlackDirList(); void TestDirCheckPoint(); void TestTimeoutCheckPoint(); void TestLoadIlogtailConfig(); void TestUpdateGroupTopic(); void TestCheckPointSaveInterval(); void TestCheckPointUserDefinedFilePath(); void TestCheckPointLoadDefaultFile(); void TestValidWildcardPath(); void TestValidWildcardPath2(); void TestWithinMaxDepth(); void TestParseWildcardPath(); void TestIsWildcardPathMatch(); void TestLogRotateWhenUpdate(); }; APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestLogRotateWhenUpdate, -1); APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestCheckPointManager, 0); // APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestConfigUpdate, 1); // APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestLocalConfigUpdate, 2); // APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestUpdatePath, 3); // APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestUpdateGlobalConfig, 4); APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestUpdateProfileProject, 5); APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestValidPath, 6); APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestBlackDirList, 7); // APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestDirCheckPoint, 8); // APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestTimeoutCheckPoint, 9); APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestLoadIlogtailConfig, 10); APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestUpdateGroupTopic, 11); APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestValidWildcardPath, 14); APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestWithinMaxDepth, 15); APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestParseWildcardPath, 16); APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestIsWildcardPathMatch, 17); APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestCheckPointSaveInterval, 19); APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestCheckPointUserDefinedFilePath, 20); APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestCheckPointLoadDefaultFile, 21); APSARA_UNIT_TEST_CASE(ConfigUpdatorUnittest, TestValidWildcardPath2, 25); std::string ConfigUpdatorUnittest::mRootDir; std::unordered_map<std::string, std::string> ConfigUpdatorUnittest::flagMap; string ConfigUpdatorUnittest::sResponse; string ConfigUpdatorUnittest::sSecurityResponse; string ConfigUpdatorUnittest::sAccessKeyResponse; bool ConfigUpdatorUnittest::sReplaceKey = false; unordered_map<string, int> ConfigUpdatorUnittest::sProjectNameCountMap; unordered_map<string, int> ConfigUpdatorUnittest::sProjectCategoryTopicCountMap; unordered_map<string, int> ConfigUpdatorUnittest::sTopicCountMap; unordered_map<string, string> ConfigUpdatorUnittest::sRegionResponseMap; unordered_set<string> ConfigUpdatorUnittest::sConfigRejectSet; string ConfigUpdatorUnittest::sDaemonDir = ""; static int sRejectCount = 0; void ConfigUpdatorUnittest::SetUpTestCase() { sLogger->set_level(spdlog::level::trace); BOOL_FLAG(enable_mock_send) = true; INT32_FLAG(check_base_dir_interval) = 10; INT32_FLAG(batch_send_interval) = 2; #if defined(_MSC_VER) // Shorten polling interval to reduce UT execution time. INT32_FLAG(dirfile_check_interval_ms) = 1000; #endif Sender::Instance()->MockAsyncSend = MockAsyncSend; bfs::remove("loongcollector_config.json"); mRootDir = GetProcessExecutionDir(); if (PATH_SEPARATOR[0] == mRootDir.at(mRootDir.size() - 1)) mRootDir.resize(mRootDir.size() - 1); STRING_FLAG(check_point_filename) = mRootDir + PATH_SEPARATOR + "logtail_checkpoint_config_udpator_ut"; AppConfig::GetInstance()->mCheckPointFilePath = STRING_FLAG(check_point_filename); mRootDir += PATH_SEPARATOR + "ConfigUpdatorUnittest"; bfs::remove_all(mRootDir); bfs::create_directories(mRootDir); } void ConfigUpdatorUnittest::SetupContainerModeConfig() { Json::Value logtailConfig; logtailConfig["container_mode"] = Json::Value(true); logtailConfig["working_ip"] = Json::Value("1.2.3.4"); logtailConfig["working_hostname"] = Json::Value("sls-zc-test"); logtailConfig["container_mount_path"] = Json::Value("./container_mount_test.json"); ofstream fout(STRING_FLAG(ilogtail_config).c_str()); fout << logtailConfig.toStyledString() << endl; fout.close(); Json::Value mountConfig; mountConfig["version"] = Json::Value("0.1.0"); mountConfig["container_name"] = Json::Value("logtail-docker"); mountConfig["container_id"] = Json::Value("abcdef1234567890"); mountConfig["host_path"] = Json::Value(mRootDir); Json::Value mount1; mount1["destination"] = "/"; mount1["source"] = "/mount"; Json::Value mount2; mount2["destination"] = "/home/admin/logs"; mount2["source"] = "/home/admin/t4/docker/logs"; Json::Value mount3; mount3["destination"] = "/app_2"; mount3["source"] = "/yyyy"; Json::Value mount4; mount4["destination"] = "/app_2/xxx"; mount4["source"] = "/xxx"; Json::Value mountArray; mountArray.append(mount1); mountArray.append(mount2); mountArray.append(mount3); mountArray.append(mount4); mountConfig["container_mount"] = mountArray; ofstream foutMount("./container_mount_test.json"); foutMount << mountConfig.toStyledString() << endl; foutMount.close(); } void ConfigUpdatorUnittest::ReplaceWithContainerModeConfig() { Json::Value logtailConfig; logtailConfig["container_mode"] = Json::Value(true); logtailConfig["working_ip"] = Json::Value("1.2.3.4"); logtailConfig["working_hostname"] = Json::Value("sls-zc-test"); logtailConfig["container_mount_path"] = Json::Value("./container_mount_test.json"); ofstream fout(STRING_FLAG(ilogtail_config).c_str()); fout << logtailConfig.toStyledString() << endl; fout.close(); Json::Value mountConfig; mountConfig["version"] = Json::Value("0.1.0"); mountConfig["container_name"] = Json::Value("logtail-docker"); mountConfig["container_id"] = Json::Value("abcdef1234567890"); mountConfig["host_path"] = Json::Value("/"); Json::Value mount1; mount1["destination"] = "/"; mount1["source"] = ""; Json::Value mount2; mount2["destination"] = "/home/admin/logs"; mount2["source"] = "/home/admin/t4/docker/logs"; Json::Value mount3; mount3["destination"] = "/app_2"; mount3["source"] = "/yyyy"; Json::Value mount4; mount4["destination"] = "/app_2/xxx"; mount4["source"] = "/xxx"; Json::Value mountArray; mountArray.append(mount1); mountArray.append(mount2); mountArray.append(mount3); mountArray.append(mount4); mountConfig["container_mount"] = mountArray; ofstream foutMount("./container_mount_test.json"); foutMount << mountConfig.toStyledString() << endl; foutMount.close(); } void ConfigUpdatorUnittest::TestDirCheckPoint() { LOG_INFO(sLogger, ("TestDirCheckPoint() begin", time(NULL))); // Depend on inotify, test on Linux only. #if defined(__linux__) bfs::remove_all(STRING_FLAG(check_point_filename)); string dirs[] = {PS + "dir" + PS + "dir1", PS + "dir" + PS + "dir1" + PS + "dir2_1", PS + "dir" + PS + "dir1" + PS + "dir2_2", PS + "dir" + PS + "dir1" + PS + "dir2_1" + PS + "dir_3_1", PS + "dir" + PS + "dir1" + PS + "dir2_2" + PS + "dir_3_2"}; bool isInotify[] = {true, false, true, false, true}; bool isTimeout[] = {false, false, true, false, true}; bfs::create_directories(bfs::path(mRootDir) / "dir"); // dirs[0] -- inotify:yes, timeout:no; // dirs[1] -- inotify:no for (int i = 0; i < 2; ++i) { bfs::create_directories(bfs::path(mRootDir) / dirs[i]); usleep(WRITE_LOG_SLEEP_INTERVAL); } // config --- log_path: /dir, preserve: false, depth: 1 Json::Value apsara_log; apsara_log["project_name"] = Json::Value("9000000_proj"); apsara_log["category"] = Json::Value("9000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PS + "dir"); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(false); apsara_log["preserve_depth"] = Json::Value(1); apsara_log["topic_format"] = Json::Value("default"); Json::Value rootJson, metrics; rootJson["apsara_log"] = apsara_log; metrics["metrics"] = rootJson; ofstream fout(STRING_FLAG(user_log_config).c_str()); fout << metrics << endl; fout.close(); CaseSetup(); for (int i = 2; i < 5; ++i) { bfs::create_directories(bfs::path(mRootDir) / dirs[i]); sleep(1); // usleep(WRITE_LOG_SLEEP_INTERVAL); } auto eventDispatcher = EventDispatcher::GetInstance(); // before update config, check the inotify and timeout? for (int i = 0; i < 5; i++) { unordered_map<std::string, int>::iterator pwItr = (eventDispatcher->mPathWdMap).find(mRootDir + dirs[i]); APSARA_TEST_TRUE_DESC((pwItr != eventDispatcher->mPathWdMap.end()) == isInotify[i], "Inotify error:" + mRootDir + dirs[i]); if (pwItr != eventDispatcher->mPathWdMap.end()) { unordered_map<int, time_t>::iterator wuItr = eventDispatcher->mWdUpdateTimeMap.find(pwItr->second); APSARA_TEST_TRUE_DESC((wuItr != eventDispatcher->mWdUpdateTimeMap.end()) == isTimeout[i], "watchout error:" + mRootDir + dirs[i]); } else { LOG_INFO(sLogger, ("path not add into watch", mRootDir + dirs[i])); } } // then update config apsara_log["version"] = Json::Value(1); rootJson["apsara_log"] = apsara_log; metrics["metrics"] = rootJson; SetConfigResponse(metrics.toStyledString()); // need a little time here to let config update sleep(WAIT_CONFIG_UPDATE_INTERVAL); // waiting for check root dir sleep(2 * INT32_FLAG(check_base_dir_interval) + 1); // after update config, check the inotify and timeout again for (int i = 0; i < 5; i++) { unordered_map<std::string, int>::iterator pwItr = (eventDispatcher->mPathWdMap).find(mRootDir + dirs[i]); APSARA_TEST_TRUE_DESC((pwItr != eventDispatcher->mPathWdMap.end()) == isInotify[i], "Inotify error:" + mRootDir + dirs[i]); if (pwItr != eventDispatcher->mPathWdMap.end()) { unordered_map<int, time_t>::iterator wuItr = eventDispatcher->mWdUpdateTimeMap.find(pwItr->second); APSARA_TEST_TRUE_DESC((wuItr != eventDispatcher->mWdUpdateTimeMap.end()) == isTimeout[i], "watchout error:" + mRootDir + dirs[i]); } else { LOG_INFO(sLogger, ("path not add into watch", mRootDir + dirs[i])); } } CaseCleanup(); RemoveConfigFile(); #endif LOG_INFO(sLogger, ("TestDirCheckPonit() end", time(NULL))); } void ConfigUpdatorUnittest::TestTimeoutCheckPoint() { bfs::remove_all(STRING_FLAG(check_point_filename)); LOG_INFO(sLogger, ("TestTimeoutCheckPoint() begin", time(NULL))); INT32_FLAG(check_point_check_interval) = 30; string dirs[] = {PS + "dir/dir1", PS + "dir/dir1/dir2_1", PS + "dir/dir1/dir2_2", PS + "dir/dir1/dir2_1/dir_3_1", PS + "dir/dir1/dir2_2/dir_3_2"}; bfs::create_directories(bfs::path(mRootDir) / "dir"); for (int i = 0; i < 5; ++i) { bfs::create_directories(bfs::path(mRootDir) / dirs[i]); } Json::Value rootJson; // config --- log_path: /dir, preserve: false, depth: 1 Json::Value apsara_log; apsara_log["project_name"] = Json::Value("9000000_proj"); apsara_log["category"] = Json::Value("9000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PS + "dir"); apsara_log["file_pattern"] = Json::Value("*.[Ll][Oo][Gg]"); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(true); apsara_log["topic_format"] = Json::Value("default"); rootJson["apsara_log"] = apsara_log; Json::Value metrics; metrics["metrics"] = rootJson; ofstream fout(STRING_FLAG(user_log_config).c_str()); fout << metrics << endl; fout.close(); CaseSetup(); for (int i = 0; i < 5; ++i) { DumpLog(10, mRootDir + dirs[i], APSARA_LOG); } sleep(2 * INT32_FLAG(batch_send_interval) + 2); // then update config apsara_log["version"] = Json::Value(1); rootJson["apsara_log"] = apsara_log; metrics["metrics"] = rootJson; SetConfigResponse(metrics.toStyledString()); // need a little time here to let config update sleep(WAIT_CONFIG_UPDATE_INTERVAL); // logtail will readlog after add dir watch, then load checkpoint and clean it APSARA_TEST_EQUAL(CheckPointManager::Instance()->mDevInodeCheckPointPtrMap.size(), 0); CaseCleanup(); RemoveConfigFile(); LOG_INFO(sLogger, ("TestTimeoutCheckPoint() end", time(NULL))); } void ConfigUpdatorUnittest::TestCheckPointManager() { bfs::remove_all(STRING_FLAG(check_point_filename)); LOG_INFO(sLogger, ("TestCheckPointManager() begin", time(NULL))); string filenames[] = {PS + "tmp" + PS + "apsara" + PS + "log.log.1", "tmp" + PS + "apsara.log.2", PS + "apsara" + PS + "tmp" + PS + "log.3", PS + "apsara" + PS + "tmp" + PS + "log.3"}; int64_t offsets[] = {100, 10000, 200000, 200000}; uint64_t devs[] = {34, 56, 98, 98}; uint64_t inodes[] = {100, 2006, 3000008, 3000008}; string configs[] = {"config1", "config_2006", "3000008config", "4000008config"}; string sigs[] = {"jroeijgperieorwpqijpgegu", "jrpehuwklxcnvujoakckghrueioxckllxjgeuin \n\t", "jfuir\tjgpdg\ndfjkoepj\njfpsdfgdpgqe\tfdjksi", "jfuir\tjgpdg\ndfjkoepj\njfpsdfgdpgqe\tfdjksi"}; uint64_t sigHashs[] = {0, 0, 0, 0}; uint32_t sigSizes[] = {0, 0, 0, 0}; for (size_t i = 0; i < sizeof(sigs) / sizeof(string); ++i) { SignatureToHash(sigs[i], sigHashs[i], sigSizes[i]); } string dirs[] = {PS + "tmp" + PS + "1" + PS + "2" + PS + "3" + PS + "4" + PS + "5", PS + "tmp" + PS + "1" + PS + "2" + PS + "3" + PS + "5", PS + "tmp" + PS + "1" + PS + "2" + PS + "3" + PS + "4" + PS, PS + "tmp" + PS + "1" + PS + "3"}; string pdirs[] = {PS + "tmp" + PS + "1" + PS + "2" + PS + "3" + PS + "4", PS + "tmp" + PS + "1" + PS + "2" + PS + "3", PS + "tmp" + PS + "1"}; auto checkpointManager = CheckPointManager::Instance(); for (int i = 0; i < 4; ++i) { checkpointManager->AddDirCheckPoint(dirs[i]); } // timeout dircheckpoint (checkpointManager->mDirNameMap[pdirs[0]])->mUpdateTime -= INT32_FLAG(file_check_point_time_out) + 10; (checkpointManager->mDirNameMap[pdirs[2]])->mUpdateTime -= INT32_FLAG(file_check_point_time_out) + 5; checkpointManager->DumpCheckPointToLocal(); // case 0 : delete dircheckpoint for (int i = 0; i < 3; ++i) { checkpointManager->DeleteDirCheckPoint(pdirs[i]); } for (int i = 0; i < 3; ++i) { DirCheckPointPtr ptr; APSARA_TEST_EQUAL(checkpointManager->GetDirCheckPoint(pdirs[i], ptr), false); } // case 1 : load dircheckpoint LOG_INFO(sLogger, ("check point file", STRING_FLAG(check_point_filename))); checkpointManager->LoadCheckPoint(); for (int i = 0; i < 3; ++i) { DirCheckPointPtr ptr; if (i != 1) { APSARA_TEST_EQUAL(checkpointManager->GetDirCheckPoint(pdirs[0], ptr), false); } else { APSARA_TEST_EQUAL_FATAL(checkpointManager->GetDirCheckPoint(pdirs[i], ptr), true); APSARA_TEST_EQUAL(ptr.get()->mSubDir.size(), 2); } } // case 2 : add filecheckpoint for (int i = 0; i < 4; ++i) { CheckPoint* checkPointPtr = new CheckPoint( filenames[i], offsets[i], sigSizes[i], sigHashs[i], DevInode(devs[i], inodes[i]), configs[i]); checkpointManager->AddCheckPoint(checkPointPtr); } // timeout checkpoint (checkpointManager ->mDevInodeCheckPointPtrMap[CheckPointManager::CheckPointKey(DevInode(devs[0], inodes[0]), configs[0])]) ->mLastUpdateTime -= INT32_FLAG(mem_check_point_time_out) + 10; for (int i = 3; i >= 0; --i) { CheckPointPtr checkPointSharePtr; bool result = checkpointManager->GetCheckPoint(DevInode(devs[i], inodes[i]), configs[i], checkPointSharePtr); APSARA_TEST_EQUAL(result, true); CheckPoint* checkPointPtr = checkPointSharePtr.get(); APSARA_TEST_EQUAL(checkPointPtr->mFileName, filenames[i]); APSARA_TEST_EQUAL(checkPointPtr->mSignatureHash, sigHashs[i]); APSARA_TEST_EQUAL(checkPointPtr->mSignatureSize, sigSizes[i]); APSARA_TEST_EQUAL(checkPointPtr->mOffset, offsets[i]); APSARA_TEST_EQUAL(checkPointPtr->mDevInode.inode, inodes[i]); APSARA_TEST_EQUAL(checkPointPtr->mDevInode.dev, devs[i]); APSARA_TEST_EQUAL(checkPointPtr->mConfigName, configs[i]); } checkpointManager->DumpCheckPointToLocal(); // case 3 : delete filecheckpoint for (int i = 0; i < 4; ++i) { CheckPointPtr checkPointSharePtr; checkpointManager->DeleteCheckPoint(DevInode(devs[i], inodes[i]), configs[i]); bool result = checkpointManager->GetCheckPoint(DevInode(devs[i], inodes[i]), configs[i], checkPointSharePtr); APSARA_TEST_EQUAL(result, false); } // case 4 : load filecheckpoint checkpointManager->LoadCheckPoint(); for (int i = 3; i >= 0; --i) { CheckPointPtr checkPointSharePtr; bool result = checkpointManager->GetCheckPoint(DevInode(devs[i], inodes[i]), configs[i], checkPointSharePtr); APSARA_TEST_EQUAL(result, true); if (result == false) continue; CheckPoint* checkPointPtr = checkPointSharePtr.get(); APSARA_TEST_EQUAL(checkPointPtr->mFileName, filenames[i]); APSARA_TEST_EQUAL(checkPointPtr->mSignatureHash, sigHashs[i]); APSARA_TEST_EQUAL(checkPointPtr->mSignatureSize, sigSizes[i]); APSARA_TEST_EQUAL(checkPointPtr->mOffset, offsets[i]); APSARA_TEST_EQUAL(checkPointPtr->mDevInode.inode, inodes[i]); APSARA_TEST_EQUAL(checkPointPtr->mDevInode.dev, devs[i]); APSARA_TEST_EQUAL(checkPointPtr->mConfigName, configs[i]); checkpointManager->DeleteCheckPoint(DevInode(devs[i], inodes[i]), configs[i]); result = checkpointManager->GetCheckPoint(DevInode(devs[i], inodes[i]), configs[i], checkPointSharePtr); APSARA_TEST_EQUAL(result, false); } LOG_INFO(sLogger, ("TestCheckPointManager() end", time(NULL))); } void ConfigUpdatorUnittest::DumpInitConfigToLocal(string localPath) { Json::Value rootJson; Json::Value commonreg_com; commonreg_com["project_name"] = Json::Value("1000000_proj"); commonreg_com["category"] = Json::Value("1000000_cateogry"); commonreg_com["log_type"] = Json::Value("common_reg_log"); commonreg_com["log_path"] = Json::Value(mRootDir + PATH_SEPARATOR + "comm"); SetConfigFilePattern(commonreg_com); commonreg_com["enable"] = Json::Value(true); commonreg_com["timeformat"] = Json::Value("%d/%b/%Y:%H:%M:%S"); commonreg_com["topic_format"] = Json::Value("none"); Json::Value customizedFieldsValue, dataIntegrityValue, lineCountValue; dataIntegrityValue["switch"] = Json::Value(false); dataIntegrityValue["project_name"] = Json::Value("data_integrity"); dataIntegrityValue["logstore"] = Json::Value("data_integrity"); dataIntegrityValue["log_time_reg"] = Json::Value("([0-9]{4})-(0[0-9]{1}|1[0-2])-(0[0-9]{1}|[12][0-9]{1}|3[01]) " "(0[0-9]{1}|1[0-9]{1}|2[0-3]):[0-5][0-9]{1}:([0-5][0-9]{1})"); dataIntegrityValue["time_pos"] = Json::Value(0); customizedFieldsValue["data_integrity"] = dataIntegrityValue; lineCountValue["switch"] = Json::Value(false); lineCountValue["project_name"] = Json::Value("line_count"); lineCountValue["logstore"] = Json::Value("line_count"); customizedFieldsValue["line_count"] = lineCountValue; // customizedFieldsValue["fuse_mode"] = Json::Value(true); commonreg_com["customized_fields"] = customizedFieldsValue; Json::Value regs; regs.append(Json::Value("([\\d\\.]+) \\S+ \\S+ \\[(\\S+) \\S+\\] \\\"(\\w+) ([^\"]*)\\\" (\\d+) (\\d+|-)")); regs.append(Json::Value("([\\d\\.]+) \\S+ \\S+ \\[(\\S+) \\S+\\] \\\"(\\w+) ([^\"]*)\\\" (\\d+) (\\d+|-) " "\\\"([^\"]*)\\\" \\\"([^\"]*)\\\"")); Json::Value keys; keys.append(Json::Value("ip,time,method,url,status,length")); keys.append(Json::Value("ip,time,method,url,status,length,ref_url,browser")); commonreg_com["regex"] = regs; commonreg_com["keys"] = keys; commonreg_com["preserve"] = Json::Value(true); rootJson["commonreg.com"] = commonreg_com; Json::Value apsara_log; apsara_log["project_name"] = Json::Value("8000000_proj"); apsara_log["category"] = Json::Value("8000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PATH_SEPARATOR + "apsara_log"); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(true); apsara_log["topic_format"] = Json::Value("none"); rootJson["apsara_log"] = apsara_log; Json::Value metrics; metrics["metrics"] = rootJson; if (localPath.empty()) { ofstream fout(STRING_FLAG(user_log_config).c_str()); fout << metrics << endl; fout.close(); } else { ofstream fout(localPath.c_str()); fout << metrics << endl; fout.close(); } } void ConfigUpdatorUnittest::SetupGlobalFuseMode(bool globalFuseMode) { Json::Value logtailConfig; logtailConfig["global_fuse_mode"] = Json::Value(globalFuseMode); ofstream fout(STRING_FLAG(ilogtail_config).c_str()); fout << logtailConfig.toStyledString() << endl; fout.close(); } void ConfigUpdatorUnittest::DumpCustomizedConfigToLocal(bool check_ulogfs_env, bool fuse_mode) { Json::Value rootJson; Json::Value commonreg_com; commonreg_com["project_name"] = Json::Value("1000000_proj"); commonreg_com["category"] = Json::Value("1000000_cateogry"); commonreg_com["log_type"] = Json::Value("common_reg_log"); commonreg_com["log_path"] = Json::Value(mRootDir + "/comm"); commonreg_com["file_pattern"] = Json::Value("*.[Ll][Oo][Gg]"); commonreg_com["enable"] = Json::Value(true); commonreg_com["timeformat"] = Json::Value("%d/%b/%Y:%H:%M:%S"); commonreg_com["topic_format"] = Json::Value("none"); Json::Value customizedFieldsValue; customizedFieldsValue["check_ulogfs_env"] = Json::Value(check_ulogfs_env); customizedFieldsValue["fuse_mode"] = Json::Value(fuse_mode); commonreg_com["customized_fields"] = customizedFieldsValue; Json::Value regs; regs.append(Json::Value("([\\d\\.]+) \\S+ \\S+ \\[(\\S+) \\S+\\] \\\"(\\w+) ([^\"]*)\\\" (\\d+) (\\d+|-)")); regs.append(Json::Value("([\\d\\.]+) \\S+ \\S+ \\[(\\S+) \\S+\\] \\\"(\\w+) ([^\"]*)\\\" (\\d+) (\\d+|-) " "\\\"([^\"]*)\\\" \\\"([^\"]*)\\\"")); Json::Value keys; keys.append(Json::Value("ip,time,method,url,status,length")); keys.append(Json::Value("ip,time,method,url,status,length,ref_url,browser")); commonreg_com["regex"] = regs; commonreg_com["keys"] = keys; commonreg_com["preserve"] = Json::Value(true); rootJson["commonreg.com"] = commonreg_com; Json::Value metrics; metrics["metrics"] = rootJson; ofstream fout(STRING_FLAG(user_log_config).c_str()); fout << metrics << endl; fout.close(); } void ConfigUpdatorUnittest::CaseSetup(bool replaceConfigAllowed) { ClearConfigResponse(); // set up container mode test bool container_mode = CheckExistance("LogtailContainerModeTest"); if (container_mode && replaceConfigAllowed) { cout << "replace with container config" << endl; ReplaceWithContainerModeConfig(); } AppConfig::GetInstance()->LoadAppConfig(STRING_FLAG(ilogtail_config)); bool ret = ConfigManager::GetInstance()->LoadConfig(STRING_FLAG(user_log_config)); ASSERT_TRUE(ret); ret = LogFilter::Instance()->InitFilter(STRING_FLAG(user_log_config)); ASSERT_TRUE(ret); ConfigManager::GetInstance()->mRegionType = REGION_CORP; Sender::Instance()->Init(); Sender::Instance()->MockAsyncSend = MockAsyncSend; vector<string> filesToSend; Sender::Instance()->LoadFileToSend(time(NULL), filesToSend); for (size_t i = 0; i < filesToSend.size(); ++i) remove((Sender::Instance()->mBufferFilePath + filesToSend[i]).c_str()); ConfigManager::GetInstance()->mThreadIsRunning = true; ConfigManager::GetInstance()->InitUpdateConfig(true); SetConfigResponse("{}"); sProjectNameCountMap.clear(); sProjectCategoryTopicCountMap.clear(); sTopicCountMap.clear(); sRegionResponseMap.clear(); sConfigRejectSet.clear(); sRejectCount = 0; gDispatchThreadId.reset(new std::thread(RunningDispatcher)); sleep(1); } void ConfigUpdatorUnittest::CaseCleanup() { LogInput::GetInstance()->CleanEnviroments(); ConfigManager::GetInstance()->mThreadIsRunning = false; EventDispatcher::GetInstance()->CleanEnviroments(); ConfigManager::GetInstance()->CleanEnviroments(); Sender::Instance()->RemoveSender(); bfs::remove_all(mRootDir); bfs::remove("loongcollector_config.json"); gDispatchThreadId->join(); gDispatchThreadId = nullptr; } void ConfigUpdatorUnittest::RemoveConfigFile() { string userLogConfig = STRING_FLAG(user_log_config); bfs::remove_all(userLogConfig); bfs::remove_all(AppConfig::GetInstance()->GetLocalUserConfigPath()); bfs::remove_all(AppConfig::GetInstance()->GetLocalUserConfigDirPath()); } void ConfigUpdatorUnittest::TestLogRotateWhenUpdate() { LOG_INFO(sLogger, ("TestLogRotateWhenUpdate() begin", time(NULL))); bfs::remove_all(mRootDir); bfs::create_directories(bfs::path(mRootDir) / "apsara_log"); bfs::create_directories(bfs::path(mRootDir) / "comm"); DumpInitConfigToLocal(); CaseSetup(); for (int i = 0; i < 10; ++i) { for (int j = 0; j < 2; ++j) DumpLog(10, mRootDir + PATH_SEPARATOR + gPath[j], (LogType)j); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); // check config update Json::Value rootJson; Json::Value commonreg_com; commonreg_com["project_name"] = Json::Value("2000000_proj"); commonreg_com["category"] = Json::Value("2000000_category"); commonreg_com["log_type"] = Json::Value("common_reg_log"); commonreg_com["log_path"] = Json::Value(mRootDir + PATH_SEPARATOR + "comm"); SetConfigFilePattern(commonreg_com); commonreg_com["enable"] = Json::Value(true); commonreg_com["timeformat"] = Json::Value("%d/%b/%Y:%H:%M:%S"); Json::Value regs; regs.append(Json::Value("([\\d\\.]+) \\S+ \\S+ \\[(\\S+) \\S+\\] \\\"(\\w+) ([^\"]*)\\\" (\\d+) (\\d+|-)")); regs.append(Json::Value("([\\d\\.]+) \\S+ \\S+ \\[(\\S+) \\S+\\] \\\"(\\w+) ([^\"]*)\\\" (\\d+) (\\d+|-) " "\\\"([^\"]*)\\\" \\\"([^\"]*)\\\"")); Json::Value keys; keys.append(Json::Value("ip,time,method,url,status,length")); keys.append(Json::Value("ip,time,method,url,status,length,ref_url,browser")); commonreg_com["regex"] = regs; commonreg_com["keys"] = keys; commonreg_com["preserve"] = Json::Value(true); commonreg_com["version"] = Json::Value(1); rootJson["commonreg.com"] = commonreg_com; Json::Value apsara_log; apsara_log["project_name"] = Json::Value("8000000_proj"); apsara_log["category"] = Json::Value("8000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PATH_SEPARATOR + "apsara_log"); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(false); apsara_log["preserve_depth"] = Json::Value(2); apsara_log["version"] = Json::Value(1); rootJson["apsara_log"] = apsara_log; Json::Value metrics; metrics["metrics"] = rootJson; APSARA_TEST_EQUAL(sProjectNameCountMap["1000000_proj"], 100); APSARA_TEST_EQUAL(sProjectNameCountMap["8000000_proj"], 100); // Move files to parent directory and append more data into them, then move them back (with // suffix .1). // When logtail updates configuration, it will hold on and resume LogInput, and when LogInput // is resuming, it will try to load checkpoints, at this time, it will find that the old file // has been renamed to file with suffix (according to their same devInode and signature). // Therefore, after resuming, it will skip the first 100 lines in log file. for (int j = 0; j < 2; ++j) { bfs::rename(bfs::path(mRootDir) / gPath[j] / "job.log", bfs::path(mRootDir) / (std::string("job.log.1") + std::to_string(j))); } // Wait until Logtail finds that files are removed. sleep(1); #if defined(_MSC_VER) sleep(30); // Windows polling needs more time. #endif // mock process stop and log write -> rotate by rotate -> write for (int i = 0; i < 20; ++i) { for (int j = 0; j < 2; ++j) DumpLog(10, mRootDir, (LogType)j, string("job.log.1") + ToString(j)); usleep(WRITE_LOG_SLEEP_INTERVAL); } for (int j = 0; j < 2; ++j) { bfs::rename(bfs::path(mRootDir) / (std::string("job.log.1") + std::to_string(j)), bfs::path(mRootDir) / gPath[j] / "job.log.1"); } sleep(2 * INT32_FLAG(batch_send_interval) + 2); APSARA_TEST_EQUAL(sProjectNameCountMap["1000000_proj"], 100); APSARA_TEST_EQUAL(sProjectNameCountMap["8000000_proj"], 100); // Update config to trigger hold on and resume in LogInput. SetConfigResponse(metrics.toStyledString()); sleep(WAIT_CONFIG_UPDATE_INTERVAL); WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); unordered_map<string, CollectionConfig*>& configMap = ConfigManager::GetInstance()->mNameConfigMap; unordered_map<string, CollectionConfig*>::iterator it = configMap.find("commonreg.com"); APSARA_TEST_TRUE(it != configMap.end()); CollectionConfig* config = it->second; APSARA_TEST_EQUAL(config->mProjectName, "2000000_proj"); APSARA_TEST_EQUAL(config->mBasePath, mRootDir + PATH_SEPARATOR + "comm"); APSARA_TEST_EQUAL(config->mVersion, 1); APSARA_TEST_EQUAL(config->mLogType, REGEX_LOG); it = configMap.find("apsara_log"); APSARA_TEST_TRUE(it != configMap.end()); config = it->second; APSARA_TEST_EQUAL(config->mProjectName, "8000000_proj"); APSARA_TEST_EQUAL(config->mBasePath, mRootDir + PATH_SEPARATOR + "apsara_log"); APSARA_TEST_EQUAL(config->mVersion, 1); APSARA_TEST_EQUAL(config->mLogType, APSARA_LOG); APSARA_TEST_EQUAL(config->mIsPreserve, false); APSARA_TEST_EQUAL(configMap.size(), 2); APSARA_TEST_EQUAL(sProjectNameCountMap["2000000_proj"] + sProjectNameCountMap["1000000_proj"], 300); APSARA_TEST_EQUAL(sProjectNameCountMap["8000000_proj"], 300); // test add config APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "comm").c_str())); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "apsara_log").c_str())); APSARA_TEST_TRUE( !EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "apsara_log1").c_str())); CaseCleanup(); RemoveConfigFile(); LOG_INFO(sLogger, ("TestLogRotateWhenUpdate() end", time(NULL))); } void ConfigUpdatorUnittest::TestConfigUpdate() { LOG_INFO(sLogger, ("TestConfigUpdate() begin", time(NULL))); bfs::remove_all(mRootDir); bfs::create_directories(bfs::path(mRootDir) / "apsara_log"); bfs::create_directories(bfs::path(mRootDir) / "comm"); DumpInitConfigToLocal(); CaseSetup(); for (int i = 0; i < 10; ++i) { for (int j = 0; j < 2; ++j) DumpLog(10, mRootDir + PATH_SEPARATOR + gPath[j], (LogType)j); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); // check config update Json::Value rootJson; Json::Value commonreg_com; commonreg_com["project_name"] = Json::Value("2000000_proj"); commonreg_com["category"] = Json::Value("2000000_category"); commonreg_com["log_type"] = Json::Value("common_reg_log"); commonreg_com["log_path"] = Json::Value(mRootDir + PATH_SEPARATOR + "comm"); SetConfigFilePattern(commonreg_com); commonreg_com["enable"] = Json::Value(true); commonreg_com["timeformat"] = Json::Value("%d/%b/%Y:%H:%M:%S"); Json::Value regs; regs.append(Json::Value("([\\d\\.]+) \\S+ \\S+ \\[(\\S+) \\S+\\] \\\"(\\w+) ([^\"]*)\\\" (\\d+) (\\d+|-)")); regs.append(Json::Value("([\\d\\.]+) \\S+ \\S+ \\[(\\S+) \\S+\\] \\\"(\\w+) ([^\"]*)\\\" (\\d+) (\\d+|-) " "\\\"([^\"]*)\\\" \\\"([^\"]*)\\\"")); Json::Value keys; keys.append(Json::Value("ip,time,method,url,status,length")); keys.append(Json::Value("ip,time,method,url,status,length,ref_url,browser")); commonreg_com["regex"] = regs; commonreg_com["keys"] = keys; commonreg_com["max_depth"] = Json::Value(0); commonreg_com["preserve"] = Json::Value(true); commonreg_com["version"] = Json::Value(1); rootJson["commonreg.com"] = commonreg_com; Json::Value apsara_log; apsara_log["project_name"] = Json::Value("8000000_proj"); apsara_log["category"] = Json::Value("8000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PATH_SEPARATOR + "apsara_log"); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(false); apsara_log["preserve_depth"] = Json::Value(2); apsara_log["version"] = Json::Value(1); rootJson["apsara_log"] = apsara_log; Json::Value metrics; metrics["metrics"] = rootJson; SetConfigResponse(metrics.toStyledString()); sleep(WAIT_CONFIG_UPDATE_INTERVAL); for (int i = 0; i < 20; ++i) { for (int j = 0; j < 2; ++j) DumpLog(10, mRootDir + PATH_SEPARATOR + gPath[j], (LogType)j); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); unordered_map<string, CollectionConfig*>& configMap = ConfigManager::GetInstance()->mNameConfigMap; unordered_map<string, CollectionConfig*>::iterator it = configMap.find("commonreg.com"); APSARA_TEST_TRUE(it != configMap.end()); CollectionConfig* config = it->second; APSARA_TEST_EQUAL(config->mProjectName, "2000000_proj"); APSARA_TEST_EQUAL(config->mBasePath, mRootDir + PATH_SEPARATOR + "comm"); APSARA_TEST_EQUAL(config->mVersion, 1); APSARA_TEST_EQUAL(config->mLogType, REGEX_LOG); it = configMap.find("apsara_log"); APSARA_TEST_TRUE(it != configMap.end()); config = it->second; APSARA_TEST_EQUAL(config->mProjectName, "8000000_proj"); APSARA_TEST_EQUAL(config->mBasePath, mRootDir + PATH_SEPARATOR + "apsara_log"); APSARA_TEST_EQUAL(config->mVersion, 1); APSARA_TEST_EQUAL(config->mLogType, APSARA_LOG); APSARA_TEST_EQUAL(config->mIsPreserve, false); APSARA_TEST_EQUAL(configMap.size(), 2); APSARA_TEST_EQUAL(sProjectNameCountMap["2000000_proj"] + sProjectNameCountMap["1000000_proj"], 300); APSARA_TEST_EQUAL(sProjectNameCountMap["8000000_proj"], 300); // test add config APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "comm").c_str())); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "apsara_log").c_str())); APSARA_TEST_TRUE( !EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "apsara_log1").c_str())); SetConfigResponse("{}"); apsara_log["project_name"] = Json::Value("3000000_proj"); apsara_log["category"] = Json::Value("3000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PATH_SEPARATOR + "apsara_log1"); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(true); commonreg_com["version"] = Json::Value(1); Json::Value rootJson1; rootJson1["apsara_log1"] = apsara_log; Json::Value metrics1; metrics1["metrics"] = rootJson1; bfs::create_directories(bfs::path(mRootDir) / "apsara_log1"); SetConfigResponse(metrics1.toStyledString()); sleep(WAIT_CONFIG_UPDATE_INTERVAL); for (int i = 0; i < 20; ++i) { DumpLog(10, mRootDir + PATH_SEPARATOR + "apsara_log1", APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); it = configMap.find("apsara_log1"); APSARA_TEST_EQUAL(it != configMap.end(), true); if (it == configMap.end()) return; config = it->second; APSARA_TEST_EQUAL(config->mProjectName, "3000000_proj"); APSARA_TEST_EQUAL(config->mLogType, APSARA_LOG); APSARA_TEST_EQUAL(config->mBasePath, mRootDir + PATH_SEPARATOR + "apsara_log1"); APSARA_TEST_EQUAL(config->mVersion, 1); APSARA_TEST_EQUAL(sProjectNameCountMap["3000000_proj"], 200); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "comm").c_str())); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "apsara_log").c_str())); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "apsara_log1").c_str())); DirCheckPointPtr dirCheckPointPtr; CheckPointPtr checkPointPtr; CheckPointManager* pCheckPointManager = CheckPointManager::Instance(); // APSARA_TEST_TRUE(pCheckPointManager->GetCheckPoint(mRootDir + "/comm/job.log", checkPointPtr)); // APSARA_TEST_TRUE(pCheckPointManager->GetCheckPoint(mRootDir + "/apsara_log/job.log", checkPointPtr)); APSARA_TEST_TRUE(pCheckPointManager->GetDirCheckPoint(mRootDir, dirCheckPointPtr)); if (dirCheckPointPtr.get() != NULL) { APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PATH_SEPARATOR + "comm") != dirCheckPointPtr->mSubDir.end()); APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PATH_SEPARATOR + "apsara_log") != dirCheckPointPtr->mSubDir.end()); } // pCheckPointManager->PrintStatus(); // test delete config SetConfigResponse("{}"); apsara_log = Json::Value(); apsara_log["project_name"] = Json::Value("8000000_proj"); apsara_log["enable"] = Json::Value(true); apsara_log["version"] = Json::Value(-1); Json::Value rootJson2; rootJson2["apsara_log"] = apsara_log; Json::Value metrics2; metrics2["metrics"] = rootJson2; SetConfigResponse(metrics2.toStyledString()); sProjectNameCountMap["8000000_proj"] = 0; sleep(WAIT_CONFIG_UPDATE_INTERVAL); for (int i = 0; i < 20; ++i) { DumpLog(10, mRootDir + PATH_SEPARATOR + "apsara_log", APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); APSARA_TEST_EQUAL(sProjectNameCountMap["8000000_proj"], 0); APSARA_TEST_TRUE(configMap.find("apsara_log") == configMap.end()); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "comm").c_str())); APSARA_TEST_TRUE(!EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "apsara_log").c_str())); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "apsara_log1").c_str())); // APSARA_TEST_TRUE(pCheckPointManager->GetCheckPoint(mRootDir + "/comm/job.log", checkPointPtr)); // APSARA_TEST_TRUE(!pCheckPointManager->GetCheckPoint(mRootDir + "/apsara_log/job.log", checkPointPtr)); // APSARA_TEST_TRUE(pCheckPointManager->GetCheckPoint(mRootDir + "/apsara_log1/job.log", checkPointPtr)); APSARA_TEST_TRUE(pCheckPointManager->GetDirCheckPoint(mRootDir, dirCheckPointPtr)); if (dirCheckPointPtr.get() != NULL) { APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PATH_SEPARATOR + "comm") != dirCheckPointPtr->mSubDir.end()); APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PATH_SEPARATOR + "apsara_log") != dirCheckPointPtr->mSubDir.end()); APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PATH_SEPARATOR + "apsara_log1") != dirCheckPointPtr->mSubDir.end()); } pCheckPointManager->RemoveAllCheckPoint(); pCheckPointManager->LoadCheckPoint(); DevInode devInode1 = GetFileDevInode(mRootDir + PATH_SEPARATOR + "comm" + PATH_SEPARATOR + "job.log"); DevInode devInode2 = GetFileDevInode(mRootDir + PATH_SEPARATOR + "apsara_log" + PATH_SEPARATOR + "job.log"); DevInode devInode3 = GetFileDevInode(mRootDir + PATH_SEPARATOR + "apsara_log1" + PATH_SEPARATOR + "job.log"); APSARA_TEST_TRUE(pCheckPointManager->GetCheckPoint(devInode1, "commonreg.com", checkPointPtr)); APSARA_TEST_TRUE(!pCheckPointManager->GetCheckPoint(devInode2, "apsara_log", checkPointPtr)); APSARA_TEST_TRUE(pCheckPointManager->GetCheckPoint(devInode3, "apsara_log1", checkPointPtr)); APSARA_TEST_TRUE(pCheckPointManager->GetDirCheckPoint(mRootDir, dirCheckPointPtr)); if (dirCheckPointPtr.get() != NULL) { APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PATH_SEPARATOR + "comm") != dirCheckPointPtr->mSubDir.end()); APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PATH_SEPARATOR + "apsara_log") != dirCheckPointPtr->mSubDir.end()); APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PATH_SEPARATOR + "apsara_log1") != dirCheckPointPtr->mSubDir.end()); } CaseCleanup(); RemoveConfigFile(); LOG_INFO(sLogger, ("TestConfigUpdate() end", time(NULL))); } void ConfigUpdatorUnittest::TestLocalConfigUpdate() { LOG_INFO(sLogger, ("TestLocalConfigUpdate() begin", time(NULL))); bfs::remove_all(mRootDir); bfs::create_directories(bfs::path(mRootDir) / "apsara_log"); bfs::create_directories(bfs::path(mRootDir) / "comm"); DumpInitConfigToLocal(AppConfig::GetInstance()->GetLocalUserConfigPath()); DumpInitConfigToLocal(); CaseSetup(); LOG_INFO(sLogger, ("Write data with initialized config", "")); for (int i = 0; i < 10; ++i) { for (int j = 0; j < 2; ++j) DumpLog(10, mRootDir + PATH_SEPARATOR + gPath[j], (LogType)j); usleep(WRITE_LOG_SLEEP_INTERVAL); } LOG_INFO(sLogger, ("Write data with initialized config", "done")); WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); // check config update Json::Value rootJson; Json::Value commonreg_com; commonreg_com["project_name"] = Json::Value("2000000_proj"); commonreg_com["category"] = Json::Value("2000000_category"); commonreg_com["log_type"] = Json::Value("common_reg_log"); commonreg_com["log_path"] = Json::Value(mRootDir + PATH_SEPARATOR + "comm"); SetConfigFilePattern(commonreg_com); commonreg_com["enable"] = Json::Value(true); commonreg_com["timeformat"] = Json::Value("%d/%b/%Y:%H:%M:%S"); Json::Value regs; regs.append(Json::Value("([\\d\\.]+) \\S+ \\S+ \\[(\\S+) \\S+\\] \\\"(\\w+) ([^\"]*)\\\" (\\d+) (\\d+|-)")); regs.append(Json::Value("([\\d\\.]+) \\S+ \\S+ \\[(\\S+) \\S+\\] \\\"(\\w+) ([^\"]*)\\\" (\\d+) (\\d+|-) " "\\\"([^\"]*)\\\" \\\"([^\"]*)\\\"")); Json::Value keys; keys.append(Json::Value("ip,time,method,url,status,length")); keys.append(Json::Value("ip,time,method,url,status,length,ref_url,browser")); commonreg_com["regex"] = regs; commonreg_com["keys"] = keys; commonreg_com["max_depth"] = Json::Value(0); commonreg_com["preserve"] = Json::Value(true); commonreg_com["version"] = Json::Value(1); rootJson["commonreg.com"] = commonreg_com; Json::Value apsara_log; apsara_log["project_name"] = Json::Value("8000000_proj"); apsara_log["category"] = Json::Value("8000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value((bfs::path(mRootDir) / "apsara_log").string()); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(false); apsara_log["preserve_depth"] = Json::Value(2); apsara_log["version"] = Json::Value(1); rootJson["apsara_log"] = apsara_log; Json::Value metrics; metrics["metrics"] = rootJson; LOG_INFO(sLogger, ("Add local user config", "local overwrite remote")); ofstream fout(AppConfig::GetInstance()->GetLocalUserConfigPath().c_str()); fout << metrics.toStyledString() << endl; fout.close(); LOG_INFO(sLogger, ("Add local user config", "done")); sleep(WAIT_CONFIG_UPDATE_INTERVAL); LOG_INFO(sLogger, ("Wait for local user config to be updated", "done")); LOG_INFO(sLogger, ("Write new data", "")); for (int i = 0; i < 20; ++i) { for (int j = 0; j < 2; ++j) DumpLog(10, mRootDir + PATH_SEPARATOR + gPath[j], (LogType)j); usleep(WRITE_LOG_SLEEP_INTERVAL); } LOG_INFO(sLogger, ("Write new data", "done")); WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); LOG_INFO(sLogger, ("Test config update status", "")); unordered_map<string, CollectionConfig*>& configMap = ConfigManager::GetInstance()->mNameConfigMap; unordered_map<string, CollectionConfig*>::iterator it = configMap.find("commonreg.com"); APSARA_TEST_TRUE(it != configMap.end()); CollectionConfig* config = it->second; APSARA_TEST_EQUAL(config->mProjectName, "2000000_proj"); APSARA_TEST_EQUAL(config->mBasePath, (bfs::path(mRootDir) / "comm").string()); APSARA_TEST_EQUAL(config->mVersion, 1); APSARA_TEST_EQUAL(config->mLogType, REGEX_LOG); it = configMap.find("apsara_log"); APSARA_TEST_TRUE(it != configMap.end()); config = it->second; APSARA_TEST_EQUAL(config->mProjectName, "8000000_proj"); APSARA_TEST_EQUAL(config->mBasePath, (bfs::path(mRootDir) / "apsara_log").string()); APSARA_TEST_EQUAL(config->mVersion, 1); APSARA_TEST_EQUAL(config->mLogType, APSARA_LOG); APSARA_TEST_EQUAL(config->mIsPreserve, false); LOG_INFO(sLogger, ("Print config map", "")); for (auto iter : configMap) { LOG_INFO( sLogger, (iter.first, iter.second->GetProjectName())("count", sProjectNameCountMap[iter.second->GetProjectName()])); } LOG_INFO(sLogger, ("print config map", "done")); APSARA_TEST_EQUAL(configMap.size(), 2); APSARA_TEST_EQUAL(sProjectNameCountMap["2000000_proj"] + sProjectNameCountMap["1000000_proj"], 300); APSARA_TEST_EQUAL(sProjectNameCountMap["8000000_proj"], 300); // test add config APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "comm").c_str())); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "apsara_log").c_str())); APSARA_TEST_TRUE( !EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "apsara_log1").c_str())); apsara_log["project_name"] = Json::Value("3000000_proj"); apsara_log["category"] = Json::Value("3000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PATH_SEPARATOR + "apsara_log1"); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(true); commonreg_com["version"] = Json::Value(1); Json::Value rootJson1; rootJson1["apsara_log1"] = apsara_log; Json::Value metrics1; metrics1["metrics"] = rootJson1; ofstream fout1((AppConfig::GetInstance()->GetLocalUserConfigDirPath() + "config_3000000.json").c_str()); fout1 << metrics1.toStyledString() << endl; fout1.close(); bfs::create_directories(bfs::path(mRootDir) / "apsara_log1"); sleep(WAIT_CONFIG_UPDATE_INTERVAL); for (int i = 0; i < 20; ++i) { DumpLog(10, mRootDir + PATH_SEPARATOR + "apsara_log1", APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); it = configMap.find("apsara_log1"); APSARA_TEST_EQUAL(it != configMap.end(), true); if (it == configMap.end()) return; config = it->second; APSARA_TEST_EQUAL(config->mProjectName, "3000000_proj"); APSARA_TEST_EQUAL(config->mLogType, APSARA_LOG); APSARA_TEST_EQUAL(config->mBasePath, mRootDir + PATH_SEPARATOR + "apsara_log1"); APSARA_TEST_EQUAL(config->mVersion, 1); APSARA_TEST_EQUAL(sProjectNameCountMap["3000000_proj"], 200); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "comm").c_str())); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "apsara_log").c_str())); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "apsara_log1").c_str())); DirCheckPointPtr dirCheckPointPtr; CheckPointPtr checkPointPtr; CheckPointManager* pCheckPointManager = CheckPointManager::Instance(); // APSARA_TEST_TRUE(pCheckPointManager->GetCheckPoint(mRootDir + "/comm/job.log", checkPointPtr)); // APSARA_TEST_TRUE(pCheckPointManager->GetCheckPoint(mRootDir + "/apsara_log/job.log", checkPointPtr)); APSARA_TEST_TRUE(pCheckPointManager->GetDirCheckPoint(mRootDir, dirCheckPointPtr)); if (dirCheckPointPtr.get() != NULL) { APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PATH_SEPARATOR + "comm") != dirCheckPointPtr->mSubDir.end()); APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PATH_SEPARATOR + "apsara_log") != dirCheckPointPtr->mSubDir.end()); } // pCheckPointManager->PrintStatus(); // test delete config bfs::remove_all(AppConfig::GetInstance()->GetLocalUserConfigDirPath() + "config_3000000.json"); sProjectNameCountMap["3000000_proj"] = 0; sleep(WAIT_CONFIG_UPDATE_INTERVAL); for (int i = 0; i < 20; ++i) { DumpLog(10, mRootDir + PATH_SEPARATOR + "apsara_log1", APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); APSARA_TEST_EQUAL(sProjectNameCountMap["3000000_proj"], 0); APSARA_TEST_TRUE(configMap.find("apsara_log1") == configMap.end()); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "comm").c_str())); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "apsara_log").c_str())); APSARA_TEST_TRUE( !EventDispatcher::GetInstance()->IsRegistered((mRootDir + PATH_SEPARATOR + "apsara_log1").c_str())); // APSARA_TEST_TRUE(pCheckPointManager->GetCheckPoint(mRootDir + "/comm/job.log", checkPointPtr)); // APSARA_TEST_TRUE(!pCheckPointManager->GetCheckPoint(mRootDir + "/apsara_log/job.log", checkPointPtr)); // APSARA_TEST_TRUE(pCheckPointManager->GetCheckPoint(mRootDir + "/apsara_log1/job.log", checkPointPtr)); APSARA_TEST_TRUE(pCheckPointManager->GetDirCheckPoint(mRootDir, dirCheckPointPtr)); if (dirCheckPointPtr.get() != NULL) { APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PATH_SEPARATOR + "comm") != dirCheckPointPtr->mSubDir.end()); APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PATH_SEPARATOR + "apsara_log") != dirCheckPointPtr->mSubDir.end()); APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PATH_SEPARATOR + "apsara_log1") == dirCheckPointPtr->mSubDir.end()); } pCheckPointManager->RemoveAllCheckPoint(); pCheckPointManager->LoadCheckPoint(); auto& PS = PATH_SEPARATOR; DevInode devInode1 = GetFileDevInode(mRootDir + PS + "comm" + PS + "job.log"); DevInode devInode2 = GetFileDevInode(mRootDir + PS + "apsara_log" + PS + "job.log"); DevInode devInode3 = GetFileDevInode(mRootDir + PS + "apsara_log1" + PS + "job.log"); APSARA_TEST_TRUE(pCheckPointManager->GetCheckPoint(devInode1, "commonreg.com", checkPointPtr)); APSARA_TEST_TRUE(pCheckPointManager->GetCheckPoint(devInode2, "apsara_log", checkPointPtr)); APSARA_TEST_TRUE(!pCheckPointManager->GetCheckPoint(devInode3, "apsara_log1", checkPointPtr)); APSARA_TEST_TRUE(pCheckPointManager->GetDirCheckPoint(mRootDir, dirCheckPointPtr)); if (dirCheckPointPtr.get() != NULL) { APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PS + "comm") != dirCheckPointPtr->mSubDir.end()); APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PS + "apsara_log") != dirCheckPointPtr->mSubDir.end()); APSARA_TEST_TRUE(dirCheckPointPtr->mSubDir.find(mRootDir + PS + "apsara_log1") == dirCheckPointPtr->mSubDir.end()); } CaseCleanup(); RemoveConfigFile(); LOG_INFO(sLogger, ("TestLocalConfigUpdate() end", time(NULL))); } void ConfigUpdatorUnittest::TestUpdatePath() { LOG_INFO(sLogger, ("TestUpdatePath() begin", time(NULL))); string dirs[] = {PS + "dir" + PS + "dir1", PS + "dir" + PS + "dir1" + PS + "dir2_1", PS + "dir" + PS + "dir1" + PS + "dir2_2", PS + "dir" + PS + "dir1" + PS + "dir2_1" + PS + "dir_3_1", PS + "dir" + PS + "dir1" + PS + "dir2_2" + PS + "dir_3_2"}; bfs::create_directories(bfs::path(mRootDir) / "dir"); for (int i = 0; i < 5; ++i) { bfs::create_directories(bfs::path(mRootDir) / dirs[i]); } Json::Value rootJson; // at first, only watch depth=2 dirs Json::Value apsara_log; apsara_log["project_name"] = Json::Value("9000000_proj"); apsara_log["category"] = Json::Value("9000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PS + "dir"); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(false); apsara_log["preserve_depth"] = Json::Value(2); apsara_log["topic_format"] = Json::Value("default"); rootJson["apsara_log"] = apsara_log; Json::Value metrics; metrics["metrics"] = rootJson; ofstream fout(STRING_FLAG(user_log_config).c_str()); fout << metrics << endl; fout.close(); CaseSetup(); for (int i = 0; i < 10; ++i) { for (int j = 0; j < 5; ++j) DumpLog(10, mRootDir + dirs[j], APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[0] + PS + "job"], 100, mRootDir + dirs[0] + PS + "job"); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[1] + PS + "job"], 100, mRootDir + dirs[1] + PS + "job"); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[2] + PS + "job"], 100, mRootDir + dirs[2] + PS + "job"); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[3] + PS + "job"], 0, mRootDir + dirs[3] + PS + "job"); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[4] + PS + "job"], 0, mRootDir + dirs[4] + PS + "job"); // then update config to all depth apsara_log["preserve"] = Json::Value(true); apsara_log.removeMember("preserve_depth"); apsara_log.removeMember("dir_pattern_black_list"); apsara_log["version"] = Json::Value(1); rootJson["apsara_log"] = apsara_log; metrics["metrics"] = rootJson; SetConfigResponse(metrics.toStyledString()); // need a little time here to let config update sleep(WAIT_CONFIG_UPDATE_INTERVAL); for (int i = 0; i < 20; ++i) { for (int j = 0; j < 5; ++j) DumpLog(10, mRootDir + dirs[j], APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[0] + PS + "job"], 300, mRootDir + dirs[0] + PS + "job"); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[1] + PS + "job"], 300, mRootDir + dirs[1] + PS + "job"); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[2] + PS + "job"], 300, mRootDir + dirs[2] + PS + "job"); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[3] + PS + "job"], 300, mRootDir + dirs[3] + PS + "job"); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[4] + PS + "job"], 300, mRootDir + dirs[4] + PS + "job"); // when "preserve" = true, "preserve_depth" is useless sTopicCountMap.clear(); apsara_log["preserve"] = Json::Value(true); apsara_log["preserve_depth"] = Json::Value(2); apsara_log["version"] = Json::Value(2); rootJson["apsara_log"] = apsara_log; metrics["metrics"] = rootJson; SetConfigResponse(metrics.toStyledString()); sleep(WAIT_CONFIG_UPDATE_INTERVAL); for (int i = 0; i < 20; ++i) { for (int j = 0; j < 5; ++j) DumpLog(10, mRootDir + dirs[j], APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[0] + PS + "job"], 200, mRootDir + dirs[0] + PS + "job"); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[1] + PS + "job"], 200, mRootDir + dirs[1] + PS + "job"); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[2] + PS + "job"], 200, mRootDir + dirs[2] + PS + "job"); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[3] + PS + "job"], 200, mRootDir + dirs[3] + PS + "job"); APSARA_TEST_EQUAL_DESC(sTopicCountMap[mRootDir + dirs[4] + PS + "job"], 200, mRootDir + dirs[4] + PS + "job"); CaseCleanup(); RemoveConfigFile(); LOG_INFO(sLogger, ("TestUpdatePath() end", time(NULL))); } void ConfigUpdatorUnittest::TestBlackDirList() { LOG_INFO(sLogger, ("TestBlackDirList() begin", time(NULL))); std::string dirs[] = {PS + "dir" + PS + "dir_1_1" + PS + "dir_2_1", PS + "dir" + PS + "dir_1_1" + PS + "dir_2_1" + PS + "dir_3_1", PS + "dir" + PS + "dir_1_1" + PS + "dir_2_1" + PS + "dir_3_2", PS + "dir" + PS + "dir_1_1" + PS + "dir_2_2" + PS + "dir_3_1" + PS + "dir_4_1", PS + "dir" + PS + "dir_1_1" + PS + "dir_2_2" + PS + "dir_3_2" + PS + "dir_4_2", PS + "dir" + PS + "dir_1_2" + PS + "dir_2_1" + PS + "dir_3_1" + PS + "dir_4_1", PS + "dir" + PS + "dir_1_2" + PS + "dir_2_1" + PS + "dir_3_1" + PS + "dir_4_2" + PS + "dir_5_1"}; const int DIR_COUNT = sizeof(dirs) / sizeof(std::string); for (int i = 0; i < DIR_COUNT; ++i) { bfs::create_directories(bfs::path(mRootDir) / dirs[i]); } Json::Value rootJson; // apsara_log Json::Value apsara_log; apsara_log["project_name"] = Json::Value("9000000_proj"); apsara_log["category"] = Json::Value("9000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PS + "dir" + PS + "dir_1_1"); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(true); apsara_log["topic_format"] = Json::Value("default"); Json::Value blackDirList_1; #if defined(__linux__) blackDirList_1.append(Json::Value("*" + PS + "dir" + PS + "dir_1_1" + PS + "dir_2_[12]" + PS + "dir_3_2")); #elif defined(_MSC_VER) blackDirList_1.append(Json::Value("*" + PS + "dir" + PS + "dir_1_1" + PS + "dir_2_1" + PS + "dir_3_2")); blackDirList_1.append(Json::Value("*" + PS + "dir" + PS + "dir_1_1" + PS + "dir_2_2" + PS + "dir_3_2")); #endif apsara_log["dir_pattern_black_list"] = blackDirList_1; rootJson["apsara_log"] = apsara_log; // apsara_log_2 Json::Value apsara_log_2; apsara_log_2["project_name"] = Json::Value("9000000_proj"); apsara_log_2["category"] = Json::Value("9000000_category"); apsara_log_2["log_type"] = Json::Value("apsara_log"); apsara_log_2["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log_2["log_path"] = Json::Value(mRootDir + PS + "dir" + PS + "dir_1_2"); SetConfigFilePattern(apsara_log_2); apsara_log_2["enable"] = Json::Value(true); apsara_log_2["preserve"] = Json::Value(true); apsara_log_2["topic_format"] = Json::Value("default"); Json::Value blackDirList_2; blackDirList_2.append( Json::Value("*" + PS + "dir" + PS + "dir_1_2" + PS + "dir_2_1" + PS + "dir_3_1" + PS + "dir_4_2*")); apsara_log_2["dir_pattern_black_list"] = blackDirList_2; rootJson["apsara_log_2"] = apsara_log_2; Json::Value metrics; metrics["metrics"] = rootJson; ofstream fout(STRING_FLAG(user_log_config).c_str()); fout << metrics << endl; fout.close(); CaseSetup(); for (int i = 0; i < 10; ++i) { for (int j = 0; j < DIR_COUNT; ++j) DumpLog(10, mRootDir + dirs[j], APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[0] + PS + "job"], 100); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[1] + PS + "job"], 100); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[2] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[3] + PS + "job"], 100); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[4] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[5] + PS + "job"], 100); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[6] + PS + "job"], 0); CaseCleanup(); RemoveConfigFile(); LOG_INFO(sLogger, ("TestBlackDirList() end", time(NULL))); } void ConfigUpdatorUnittest::TestValidPath() { LOG_INFO(sLogger, ("TestValidPath() begin", time(NULL))); string dirs[] = {PS + "modify" + PS + "aa", PS + "modify" + PS + "aa" + PS + "bb", PS + "modify" + PS + "aa" + PS + ".cc", PS + "modify" + PS + "aa" + PS + "cc" + PS + ".ee"}; for (int i = 0; i < 3; ++i) bfs::create_directories(bfs::path(mRootDir) / dirs[i]); Json::Value rootJson; Json::Value apsara_log; apsara_log["project_name"] = Json::Value("9000000_proj"); apsara_log["category"] = Json::Value("9000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PS + "modify"); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(false); apsara_log["preserve_depth"] = Json::Value(2); apsara_log["topic_format"] = Json::Value("default"); rootJson["apsara_log"] = apsara_log; Json::Value metrics; metrics["metrics"] = rootJson; ofstream fout(STRING_FLAG(user_log_config).c_str()); fout << metrics << endl; fout.close(); CaseSetup(); LOG_INFO(sLogger, ("DumpLog", "Begin")); for (int i = 0; i < 20; ++i) { for (int j = 0; j < 4; ++j) DumpLog(10, mRootDir + dirs[j], APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[0] + PS + "job"], 200); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[1] + PS + "job"], 200); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[2] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[3] + PS + "job"], 0); CaseCleanup(); RemoveConfigFile(); LOG_INFO(sLogger, ("TestValidPath() end", time(NULL))); } void ConfigUpdatorUnittest::TestUpdateProfileProject() { LOG_INFO(sLogger, ("TestUpdateProfileProject() begin", time(NULL))); DumpInitConfigToLocal(); CaseSetup(); APSARA_TEST_EQUAL(ConfigManager::GetInstance()->GetDefaultProfileProjectName(), STRING_FLAG(profile_project_name)); Json::Value firstRoot; firstRoot["profile_project"] = Json::Value("17_1"); firstRoot["profile_project_region"] = Json::Value("normal"); SetConfigResponse(firstRoot.toStyledString()); sleep(WAIT_CONFIG_UPDATE_INTERVAL); APSARA_TEST_EQUAL(ConfigManager::GetInstance()->GetDefaultProfileProjectName(), "17_1"); APSARA_TEST_EQUAL(ConfigManager::GetInstance()->GetDefaultProfileRegion(), "normal"); Json::Value secondRoot; secondRoot["profile_project"] = Json::Value("17_2"); secondRoot["profile_project_region"] = Json::Value("abnormal"); SetConfigResponse(secondRoot.toStyledString()); sleep(WAIT_CONFIG_UPDATE_INTERVAL); APSARA_TEST_EQUAL(ConfigManager::GetInstance()->GetDefaultProfileProjectName(), "17_2"); APSARA_TEST_EQUAL(ConfigManager::GetInstance()->GetDefaultProfileRegion(), "abnormal"); CaseCleanup(); RemoveConfigFile(); LOG_INFO(sLogger, ("TestUpdateProfileProject() end", time(NULL))); } void ConfigUpdatorUnittest::GetLogContent(LogType logType, char* buffer) { char timeBuffer[50]; struct tm timeInfo; time_t cur; cur = time(NULL); #if defined(__linux__) localtime_r(&cur, &timeInfo); #elif defined(_MSC_VER) localtime_s(&timeInfo, &cur); #endif char buffer2[] = "\"GET /icons/text.gif HTTP/1.1\" 200 229"; char buffer3[] = " \"http://10.230.201.117/AT-INT-A01/\" \"Mozilla/5.0 (Windows NT 6.1; rv:10.0.2) Gecko/20100101 " "Firefox/10.0.2\""; if (logType == REGEX_LOG) { strftime(timeBuffer, sizeof(timeBuffer), " - - [%d/%b/%Y:%R:%S +0800] ", &timeInfo); sprintf(buffer, "%s%s%s", LoongCollectorMonitor::GetInstance()->mIpAddr.c_str(), timeBuffer, buffer2); if ((rand()) % 2 == 0) strcat(buffer, buffer3); strcat(buffer, "\n"); } else if (logType == APSARA_LOG) { static int seq = 0; strftime(timeBuffer, sizeof(timeBuffer), "%Y-%m-%d %H:%M:%S", &timeInfo); sprintf(buffer, "[%s.523094]\t[INFO]\t[29817]\t[build/debug64/sqlonline/OpenTableService/OpenTableServer/worker/" "ots_worker.cpp:274]\tAccessId:1vjejd8xmpocf63g1cc8h2xv\tOpenId:CapTest\tOperationType:" "PutData\tTargetEntity:ots_ft_test_contact_test12\tSourceIP:10.230.201.25\tAPIVersion:1\tTime:" "1332913419517022\tLatency:6070\tInFlowDataSize:793\tOutFlowDataSize:198\tHttpResponseStatus:" "200\tOTSStatus:0\tSQLStatus:0\tExtraMessage:\tseq:%d\n", timeBuffer, seq++); } } void ConfigUpdatorUnittest::DumpLog(int logNum, string path, enum LogType logType, string logName) { // Use binary mode to write to avoid \n being converted to \r\n. std::ofstream out(path + logtail::PATH_SEPARATOR + logName, std::ios_base::app | std::ios_base::binary); if (out) { char buffer[10240]; for (int i = 0; i < logNum; ++i) { GetLogContent(logType, buffer); out.write(buffer, strlen(buffer)); } } } bool ConfigUpdatorUnittest::MockGetLogtailConfig(const string& targetURL, const string& intf, const string& ip, const string& config, string& response, string& errorMessage, bool isCompress) { LOG_INFO(sLogger, ("reject set size", sConfigRejectSet.size())); LOG_INFO(sLogger, ("target url", targetURL)); if (IsConfigRejected(targetURL)) { ++sRejectCount; LOG_INFO(sLogger, ("target url reject", targetURL)("Reject count", sRejectCount)); return false; } Json::Reader parser; Json::Value input; Json::Value output; auto configRes = GetConfigResponse(); if (!parser.parse(configRes, output)) { LOG_WARNING(sLogger, ("Parse response fail", configRes)); return true; } string regionResp = GetRegionConfigResponse(targetURL); if (!regionResp.empty()) { LOG_INFO(sLogger, ("target url use region resp", regionResp)); output.clear(); if (!parser.parse(regionResp, output)) { LOG_WARNING(sLogger, ("Parse response fail", regionResp)); return true; } } if (!parser.parse(config, input)) { LOG_WARNING(sLogger, ("Parse response fail", config)); return true; } if (!output.isMember("metrics") && !output.isMember("config") && !output.isMember("profile_project")) return true; if (input.isMember("metrics") == false) return true; Json::Value::Members logNames = input["metrics"].getMemberNames(); for (size_t i = 0; i < logNames.size(); ++i) { string logname = logNames[i]; if (output["metrics"].isMember(logname) == false) continue; int version_out = output["metrics"][logname]["version"].asInt(); int version_in = input["metrics"][logname]["version"].asInt(); bool inHasGroupTopic = input["metrics"][logname]["topic_format"] == "group_topic"; bool outHasGroupTopic = input["metrics"][logname]["topic_format"] == "group_topic"; if (version_in >= version_out && version_out >= 0) { if (inHasGroupTopic && outHasGroupTopic && input["metrics"][logname]["group_topic"] != output["metrics"][logname]["group_topic"]) continue; output["metrics"].removeMember(logname); } } logNames = output["metrics"].getMemberNames(); for (size_t i = 0; i < logNames.size(); ++i) { string logname = logNames[i]; if (input["metrics"].isMember(logname) == false) { int version_out = output["metrics"][logname]["version"].asInt(); if (version_out == -1) { output["metrics"].removeMember(logname); } } } if (output.isMember("region_list")) { input["region_list"] = output["region_list"]; } Json::Value outputRoot; outputRoot["/GetLogtailConfig"] = output; response = outputRoot.toStyledString(); LOG_DEBUG(sLogger, ("config request", config)("get config update response ", response)); return true; } void ConfigUpdatorUnittest::TestCheckPointSaveInterval() { LOG_INFO(sLogger, ("TestCheckPointSaveInterval() start", time(NULL))); bfs::remove_all(STRING_FLAG(check_point_filename)); string filenames[] = {"/tmp/apsara/log.log.1", "tmp/apsara.log.2", "/apsara/tmp/log.3"}; int64_t offsets[] = {100, 10000, 200000}; uint64_t inodes[] = {100, 200, 300}; string configs[] = {"config1", "config_2006", "3000008config"}; uint64_t devs[] = {34, 56, 98}; string sigs[] = {"jroeijgperieorwpqijpgegu", "jrpehuwklxcnvujoakckghrueioxckllxjgeuin \n\t", "jfuir\tjgpdg\ndfjkoepj\njfpsdfgdpgqe\tfdjksi"}; uint64_t sigHashs[] = {0, 0, 0}; uint32_t sigSizes[] = {0, 0, 0}; for (size_t i = 0; i < sizeof(sigs) / sizeof(string); ++i) { SignatureToHash(sigs[i], sigHashs[i], sigSizes[i]); } string dirs[] = {"/tmp/1/2/3/4/5", "/tmp/1/2/3/5", "/tmp/1/2/3/4/", "/tmp/1/3"}; // string pdirs[] = { "/tmp/1/2/3/4","/tmp/1/2/3","/tmp/1" }; DumpInitConfigToLocal(); CaseSetup(); sleep(2); for (int i = 0; i < 4; ++i) { CheckPointManager::Instance()->AddDirCheckPoint(dirs[i]); } for (int i = 0; i < 3; ++i) { CheckPoint* checkPointPtr = new CheckPoint( filenames[i], offsets[i], sigSizes[i], sigHashs[i], DevInode(devs[i], inodes[i]), configs[i]); CheckPointManager::Instance()->AddCheckPoint(checkPointPtr); } APSARA_TEST_EQUAL(CheckPointManager::Instance()->mDevInodeCheckPointPtrMap.size(), (size_t)3); int32_t defaultInterval = INT32_FLAG(check_point_dump_interval); INT32_FLAG(check_point_dump_interval) = 5; int32_t curTime = (int32_t)time(NULL); // checkpoint add rand dump, sleep should be long enough sleep(68); // after save, not existed file's check point will be removed, so mDevInodeCheckPointPtrMap.size == 0 APSARA_TEST_EQUAL(CheckPointManager::Instance()->mDevInodeCheckPointPtrMap.size(), (size_t)0); CheckPointManager::Instance()->mDevInodeCheckPointPtrMap.clear(); CheckPointManager::Instance()->mDirNameMap.clear(); CheckPointManager::Instance()->LoadCheckPoint(); APSARA_TEST_TRUE(CheckPointManager::Instance()->mLastDumpTime > curTime); // for (int i = 0; i < 3; ++i) //{ // DirCheckPointPtr ptr; // if (i == 1) // { // APSARA_TEST_EQUAL_FATAL(CheckPointManager::Instance()->GetDirCheckPoint(pdirs[i], ptr), true); // APSARA_TEST_EQUAL(ptr.get()->mSubDir.size(), 2); // } // else // { // APSARA_TEST_EQUAL_FATAL(CheckPointManager::Instance()->GetDirCheckPoint(pdirs[i], ptr), true); // APSARA_TEST_EQUAL(ptr.get()->mSubDir.size(), 1); // } // } // if checkpoint dump twice in this test, mDevInodeCheckPointPtrMap will be empty // for (int i = 2; i >= 0; --i) //{ // CheckPointPtr checkPointSharePtr; // bool result = CheckPointManager::Instance()->GetCheckPoint(DevInode(devs[i], inodes[i]), checkPointSharePtr); // APSARA_TEST_EQUAL_FATAL(result, true); // CheckPoint * checkPointPtr = checkPointSharePtr.get(); // APSARA_TEST_EQUAL(checkPointPtr->mFileName, filenames[i]); // APSARA_TEST_EQUAL(checkPointPtr->mSignature, sigs[i]); // APSARA_TEST_EQUAL(checkPointPtr->mOffset, offsets[i]); // APSARA_TEST_EQUAL(checkPointPtr->mInode, inodes[i]); // } INT32_FLAG(check_point_dump_interval) = defaultInterval; CaseCleanup(); RemoveConfigFile(); LOG_INFO(sLogger, ("TestCheckPointSaveInterval() end", time(NULL))); } void ConfigUpdatorUnittest::TestCheckPointUserDefinedFilePath() { LOG_INFO(sLogger, ("TestCheckPointUserDefinedFilePath() start", time(NULL))); bfs::remove_all(STRING_FLAG(check_point_filename)); // dump to STRING_FLAG(check_point_filename) CheckPointManager::Instance()->DumpCheckPointToLocal(); string filenames[] = {PS + "tmp" + PS + "apsara" + PS + "log.log.1", "tmp" + PS + "apsara.log.2", PS + "apsara" + PS + "tmp" + PS + "log.3"}; int64_t offsets[] = {100, 10000, 200000}; uint64_t devs[] = {34, 56, 98}; uint64_t inodes[] = {100, 200, 300}; string configs[] = {"config1", "config_2006", "3000008config"}; string sigs[] = {"jroeijgperieorwpqijpgegu", "jrpehuwklxcnvujoakckghrueioxckllxjgeuin \n\t", "jfuir\tjgpdg\ndfjkoepj\njfpsdfgdpgqe\tfdjksi"}; uint64_t sigHashs[] = {0, 0, 0}; uint32_t sigSizes[] = {0, 0, 0}; for (size_t i = 0; i < sizeof(sigs) / sizeof(string); ++i) { SignatureToHash(sigs[i], sigHashs[i], sigSizes[i]); } string dirs[] = {PS + "tmp" + PS + "1" + PS + "2" + PS + "3" + PS + "4" + PS + "5", PS + "tmp" + PS + "1" + PS + "2" + PS + "3" + PS + "5", PS + "tmp" + PS + "1" + PS + "2" + PS + "3" + PS + "4" + PS, PS + "tmp" + PS + "1" + PS + "3"}; string pdirs[] = {PS + "tmp" + PS + "1" + PS + "2" + PS + "3" + PS + "4", PS + "tmp" + PS + "1" + PS + "2" + PS + "3", PS + "tmp" + PS + "1"}; for (int i = 0; i < 4; ++i) { CheckPointManager::Instance()->AddDirCheckPoint(dirs[i]); } for (int i = 0; i < 3; ++i) { CheckPoint* checkPointPtr = new CheckPoint( filenames[i], offsets[i], sigSizes[i], sigHashs[i], DevInode(devs[i], inodes[i]), configs[i]); CheckPointManager::Instance()->AddCheckPoint(checkPointPtr); } std::string customCptFilePath = mRootDir + PS + "checkpoint.cpt"; bfs::remove(customCptFilePath); Json::Value settingsJson; settingsJson["check_point_filename"] = customCptFilePath; string ilogtailConfig = mRootDir + PS + "checkpointfile.json"; ofstream fout((ilogtailConfig).c_str()); fout << settingsJson.toStyledString() << endl; fout.close(); AppConfig::GetInstance()->LoadAppConfig(ilogtailConfig); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetCheckPointFilePath(), customCptFilePath); CheckPointManager::Instance()->DumpCheckPointToLocal(); FILE* pCpt = fopen(customCptFilePath.c_str(), "r"); APSARA_TEST_TRUE(pCpt != NULL); if (pCpt != NULL) { fclose(pCpt); } CheckPointManager::Instance()->LoadCheckPoint(); for (int i = 0; i < 3; ++i) { DirCheckPointPtr ptr; if (i == 1) { APSARA_TEST_EQUAL_FATAL(CheckPointManager::Instance()->GetDirCheckPoint(pdirs[i], ptr), true); APSARA_TEST_EQUAL(ptr.get()->mSubDir.size(), 2); } else { APSARA_TEST_EQUAL_FATAL(CheckPointManager::Instance()->GetDirCheckPoint(pdirs[i], ptr), true); APSARA_TEST_EQUAL(ptr.get()->mSubDir.size(), 1); } } for (int i = 2; i >= 0; --i) { CheckPointPtr checkPointSharePtr; bool result = CheckPointManager::Instance()->GetCheckPoint( DevInode(devs[i], inodes[i]), configs[i], checkPointSharePtr); APSARA_TEST_EQUAL(result, true); CheckPoint* checkPointPtr = checkPointSharePtr.get(); APSARA_TEST_EQUAL(checkPointPtr->mFileName, filenames[i]); APSARA_TEST_EQUAL(checkPointPtr->mSignatureHash, sigHashs[i]); APSARA_TEST_EQUAL(checkPointPtr->mSignatureSize, sigSizes[i]); APSARA_TEST_EQUAL(checkPointPtr->mOffset, offsets[i]); APSARA_TEST_EQUAL(checkPointPtr->mDevInode.inode, inodes[i]); APSARA_TEST_EQUAL(checkPointPtr->mDevInode.dev, devs[i]); APSARA_TEST_EQUAL(checkPointPtr->mConfigName, configs[i]); } bfs::remove(customCptFilePath); bfs::remove(ilogtailConfig); bfs::remove(STRING_FLAG(check_point_filename)); LOG_INFO(sLogger, ("TestCheckPointUserDefinedFilePath() end", time(NULL))); } void ConfigUpdatorUnittest::TestCheckPointLoadDefaultFile() { LOG_INFO(sLogger, ("TestCheckPointLoadDefaultFile() start", time(NULL))); string filenames[] = {PS + "tmp" + PS + "apsara" + PS + "log.log.1", "tmp" + PS + "apsara.log.2", PS + "apsara" + PS + "tmp" + PS + "log.3"}; int64_t offsets[] = {100, 10000, 200000}; uint64_t inodes[] = {100, 200, 300}; uint64_t devs[] = {34, 56, 98}; string sigs[] = {"jroeijgperieorwpqijpgegu", "jrpehuwklxcnvujoakckghrueioxckllxjgeuin \n\t", "jfuir\tjgpdg\ndfjkoepj\njfpsdfgdpgqe\tfdjksi"}; uint64_t sigHashs[] = {0, 0, 0}; uint32_t sigSizes[] = {0, 0, 0}; string configs[] = {"config1", "config_2006", "3000008config"}; for (size_t i = 0; i < sizeof(sigs) / sizeof(string); ++i) { SignatureToHash(sigs[i], sigHashs[i], sigSizes[i]); } string dirs[] = {PS + "tmp" + PS + "1" + PS + "2" + PS + "3" + PS + "4" + PS + "5", PS + "tmp" + PS + "1" + PS + "2" + PS + "3" + PS + "5", PS + "tmp" + PS + "1" + PS + "2" + PS + "3" + PS + "4" + PS, PS + "tmp" + PS + "1" + PS + "3"}; string pdirs[] = {PS + "tmp" + PS + "1" + PS + "2" + PS + "3" + PS + "4", PS + "tmp" + PS + "1" + PS + "2" + PS + "3", PS + "tmp" + PS + "1"}; for (int i = 0; i < 4; ++i) { CheckPointManager::Instance()->AddDirCheckPoint(dirs[i]); } for (int i = 0; i < 3; ++i) { CheckPoint* checkPointPtr = new CheckPoint( filenames[i], offsets[i], sigSizes[i], sigHashs[i], DevInode(devs[i], inodes[i]), configs[i]); CheckPointManager::Instance()->AddCheckPoint(checkPointPtr); } bfs::remove(STRING_FLAG(check_point_filename)); CheckPointManager::Instance()->DumpCheckPointToLocal(); std::string customCptFilePath = mRootDir + PS + "checkpoint.cpt"; bfs::remove(customCptFilePath); Json::Value settingsJson; settingsJson["check_point_filename"] = customCptFilePath; string ilogtailConfig = mRootDir + PS + "checkpointfile.json"; ofstream fout((ilogtailConfig).c_str()); fout << settingsJson.toStyledString() << endl; fout.close(); AppConfig::GetInstance()->LoadAppConfig(ilogtailConfig); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetCheckPointFilePath(), customCptFilePath); CheckPointManager::Instance()->LoadCheckPoint(); for (int i = 0; i < 3; ++i) { DirCheckPointPtr ptr; if (i == 1) { APSARA_TEST_EQUAL_FATAL(CheckPointManager::Instance()->GetDirCheckPoint(pdirs[i], ptr), true); APSARA_TEST_EQUAL(ptr.get()->mSubDir.size(), 2); } else { APSARA_TEST_EQUAL_FATAL(CheckPointManager::Instance()->GetDirCheckPoint(pdirs[i], ptr), true); APSARA_TEST_EQUAL(ptr.get()->mSubDir.size(), 1); } } for (int i = 2; i >= 0; --i) { CheckPointPtr checkPointSharePtr; bool result = CheckPointManager::Instance()->GetCheckPoint( DevInode(devs[i], inodes[i]), configs[i], checkPointSharePtr); APSARA_TEST_EQUAL(result, true); CheckPoint* checkPointPtr = checkPointSharePtr.get(); APSARA_TEST_EQUAL(checkPointPtr->mFileName, filenames[i]); APSARA_TEST_EQUAL(checkPointPtr->mSignatureHash, sigHashs[i]); APSARA_TEST_EQUAL(checkPointPtr->mSignatureSize, sigSizes[i]); APSARA_TEST_EQUAL(checkPointPtr->mOffset, offsets[i]); APSARA_TEST_EQUAL(checkPointPtr->mDevInode.inode, inodes[i]); APSARA_TEST_EQUAL(checkPointPtr->mDevInode.dev, devs[i]); APSARA_TEST_EQUAL(checkPointPtr->mConfigName, configs[i]); } bfs::remove(customCptFilePath); bfs::remove(STRING_FLAG(check_point_filename)); bfs::remove(ilogtailConfig); LOG_INFO(sLogger, ("TestCheckPointLoadDefaultFile() end", time(NULL))); } bool ConfigUpdatorUnittest::MockGetLogtailSecurity(const string& targetURL, const string& intf, bool httpsVerifyPeer, const string& request, string& response, string& errorMessage, const string& caCert) { response = sSecurityResponse; return true; } bool ConfigUpdatorUnittest::MockGetAccessKey(const string& targetURL, const string& intf, bool httpsVerifyPeer, const string& request, string& response, string& errorMessage, const string& caCert) { response = sAccessKeyResponse; return true; } void ConfigUpdatorUnittest::SetConfigResponse(const std::string& response) { LOG_INFO(sLogger, ("SetConfigResponse", response)); ScopedSpinLock lock(mResponseLock); sResponse = response; } string ConfigUpdatorUnittest::GetConfigResponse() { ScopedSpinLock lock(mResponseLock); return sResponse; } void ConfigUpdatorUnittest::MockAsyncSend(const std::string& projectName, const std::string& logstore, const std::string& logData, SEND_DATA_TYPE dataType, int32_t rawSize, sls_logs::SlsCompressType compressType, SendClosure* sendClosure) { if (logstore == "logtail_alarm") { PostLogStoreLogsResponse* sr = new PostLogStoreLogsResponse; sr->statusCode = 200; sr->requestId = ""; sendClosure->OnSuccess(sr); return; } vector<LogGroup> logGroupVec; Sender::ParseLogGroupFromString(logData, dataType, rawSize, compressType, logGroupVec); LOG_INFO( sLogger, ("send, sReplaceKey", sReplaceKey)("projectName", projectName)("category", logstore)("dataType", dataType)); // There is a regular send in the Sender with real aliuid, whose length is longer than 7, // the second condition is used to avoid it changing the sReplaceKey flag. if (sReplaceKey && sendClosure->mDataPtr->mAliuid.length() <= 7) { sReplaceKey = false; LOG_INFO(sLogger, ("mock exception", " SLSE_UNAUTHORIZED")); PostLogStoreLogsResponse* sr = new PostLogStoreLogsResponse; sr->statusCode = 500; sr->requestId = ""; sendClosure->OnFail(sr, LOGE_UNAUTHORIZED, "accesskey, accesskeyid invalid"); return; } for (vector<LogGroup>::iterator iter = logGroupVec.begin(); iter != logGroupVec.end(); ++iter) { sProjectNameCountMap[projectName] += iter->logs_size(); sProjectCategoryTopicCountMap[projectName + "_" + logstore + "_" + iter->topic()] += iter->logs_size(); LOG_INFO(sLogger, ("MockAsyncSend, projectName", projectName)("logstore", logstore)("topic", iter->topic())("logs", iter->logs_size())); if (iter->topic().size() > 0) sTopicCountMap[iter->topic()] += iter->logs_size(); } PostLogStoreLogsResponse* sr = new PostLogStoreLogsResponse; sr->statusCode = 200; sr->requestId = ""; sendClosure->OnSuccess(sr); } void ConfigUpdatorUnittest::TestLoadIlogtailConfig() { LOG_INFO(sLogger, ("TestLoadIlogtailConfig() begin", time(NULL))); string check_point_filenametemp = AppConfig::GetInstance()->mCheckPointFilePath; LOG_INFO(sLogger, ("back up check_point_filenametemp", check_point_filenametemp)); bfs::create_directories(mRootDir); string ilogtailConfig = mRootDir + PS + STRING_FLAG(ilogtail_config); string subConfigPath = mRootDir + PS + STRING_FLAG(ilogtail_config) + ".d"; bfs::create_directories(subConfigPath); LOG_INFO(sLogger, ("test default settings config (global flag)", "")); AppConfig::GetInstance()->LoadAppConfig(ilogtailConfig); APSARA_TEST_EQUAL(AppConfig::GetInstance()->mCpuUsageUpLimit, DOUBLE_FLAG(cpu_usage_up_limit)); APSARA_TEST_EQUAL(AppConfig::GetInstance()->mMemUsageUpLimit, INT64_FLAG(memory_usage_up_limit)); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetMaxBytePerSec(), kDefaultMaxSendBytePerSec); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetBytePerSec(), INT32_FLAG(default_send_byte_per_sec)); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetNumOfBufferFile(), INT32_FLAG(default_buffer_file_num)); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetLocalFileSize(), INT32_FLAG(default_local_file_size)); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetMaxHoldedDataSize(), INT32_FLAG(max_holded_data_size)); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetMaxBufferNum(), INT32_FLAG(max_buffer_num)); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetCheckPointFilePath(), STRING_FLAG(check_point_filename)); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetCustomizedConfigIp(), std::string("")); LOG_INFO(sLogger, ("test settings config file", "")); Json::Value settingsJson; settingsJson["cpu_usage_limit"] = 0.4; settingsJson["mem_usage_limit"] = Json::Int64(1099511627776); settingsJson["max_bytes_per_sec"] = 2097152; settingsJson["bytes_per_sec"] = 1048576; settingsJson["buffer_file_num"] = 25; settingsJson["buffer_file_size"] = 20971520; settingsJson["buffer_map_num"] = 5; settingsJson["buffer_map_size"] = 2097152; settingsJson["check_point_filename"] = std::string("/etc/ilotail/checkpoint.cpt"); settingsJson["include_config_path"] = subConfigPath; settingsJson["customized_config_ip"] = std::string("127.0.0.1.fuse"); ofstream fout((ilogtailConfig).c_str()); fout << settingsJson.toStyledString() << endl; fout.close(); AppConfig::GetInstance()->LoadAppConfig(ilogtailConfig); APSARA_TEST_EQUAL(AppConfig::GetInstance()->mCpuUsageUpLimit, (float)0.4); APSARA_TEST_EQUAL(AppConfig::GetInstance()->mMemUsageUpLimit, 1099511627776); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetMaxBytePerSec(), 2097152); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetBytePerSec(), 1048576); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetNumOfBufferFile(), 25); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetLocalFileSize(), 20971520); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetMaxHoldedDataSize(), 2097152); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetMaxBufferNum(), 5); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetCheckPointFilePath(), std::string("/etc/ilotail/checkpoint.cpt")); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetCustomizedConfigIp(), std::string("127.0.0.1.fuse")); Json::Value subSettingsJson; subSettingsJson["cpu_usage_limit"] = 0.5; subSettingsJson["mem_usage_limit"] = Json::Int64(1099511627777); subSettingsJson["check_point_filename"] = std::string("/etc/ilotail/checkpoint1.cpt"); ofstream fSubOut((subConfigPath + "/1.json").c_str()); fSubOut << subSettingsJson.toStyledString() << endl; fSubOut.close(); AppConfig::GetInstance()->LoadAppConfig(ilogtailConfig); APSARA_TEST_EQUAL(AppConfig::GetInstance()->mCpuUsageUpLimit, (float)0.5); APSARA_TEST_EQUAL(AppConfig::GetInstance()->mMemUsageUpLimit, 1099511627777); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetCheckPointFilePath(), std::string("/etc/ilotail/checkpoint1.cpt")); subSettingsJson["cpu_usage_limit"] = 0.6; subSettingsJson["user_config_file_path"] = std::string("/home/admin/logs/user_config_file.json"); subSettingsJson["mem_usage_limit"] = Json::Int64(1099511627778); subSettingsJson["check_point_filename"] = std::string("/etc/ilotail/checkpoint2.cpt"); ofstream fSubOut2((subConfigPath + "/2.json").c_str()); fSubOut2 << subSettingsJson.toStyledString() << endl; fSubOut2.close(); AppConfig::GetInstance()->LoadAppConfig(ilogtailConfig); APSARA_TEST_EQUAL(AppConfig::GetInstance()->mCpuUsageUpLimit, (float)0.6); APSARA_TEST_EQUAL(AppConfig::GetInstance()->mUserConfigPath, std::string("/home/admin/logs/user_config_file.json")); APSARA_TEST_EQUAL(AppConfig::GetInstance()->mMemUsageUpLimit, 1099511627778); APSARA_TEST_EQUAL(AppConfig::GetInstance()->GetCheckPointFilePath(), std::string("/etc/ilotail/checkpoint2.cpt")); AppConfig::GetInstance()->mCheckPointFilePath = check_point_filenametemp; LOG_INFO(sLogger, ("TestLoadIlogtailConfig() end", time(NULL))); } void ConfigUpdatorUnittest::TestUpdateGroupTopic() { LOG_INFO(sLogger, ("TestUpdateGroupTopic() begin", time(NULL))); bfs::create_directories(bfs::path(mRootDir) / "apsara_log"); DumpInitConfigToLocal(); CaseSetup(); for (int i = 0; i < 10; ++i) { DumpLog(10, mRootDir + PATH_SEPARATOR + gPath[0], APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); // update group config first Json::Value apsara_log; apsara_log["project_name"] = Json::Value("8000000_proj"); apsara_log["category"] = Json::Value("8000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PATH_SEPARATOR + "apsara_log"); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(false); apsara_log["topic_format"] = Json::Value("group_topic"); apsara_log["group_topic"] = Json::Value("xxxx"); apsara_log["preserve_depth"] = Json::Value(2); apsara_log["version"] = Json::Value(1); Json::Value rootJson, metrics; rootJson["apsara_log"] = apsara_log; metrics["metrics"] = rootJson; SetConfigResponse(metrics.toStyledString()); sleep(WAIT_CONFIG_UPDATE_INTERVAL); for (int i = 0; i < 20; ++i) { DumpLog(10, mRootDir + PATH_SEPARATOR + gPath[0], APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); unordered_map<string, CollectionConfig*>& configMap = ConfigManager::GetInstance()->mNameConfigMap; unordered_map<string, CollectionConfig*>::iterator it; it = configMap.find("apsara_log"); APSARA_TEST_TRUE(it != configMap.end()); CollectionConfig* config = it->second; APSARA_TEST_EQUAL(config->mProjectName, "8000000_proj"); APSARA_TEST_EQUAL(config->mBasePath, mRootDir + PATH_SEPARATOR + "apsara_log"); APSARA_TEST_EQUAL(config->mVersion, 1); APSARA_TEST_EQUAL(config->mLogType, APSARA_LOG); APSARA_TEST_EQUAL(config->mIsPreserve, false); APSARA_TEST_EQUAL(config->mTopicFormat, "group_topic"); APSARA_TEST_EQUAL(config->mGroupTopic, "xxxx"); APSARA_TEST_EQUAL(configMap.size(), 2); APSARA_TEST_EQUAL(sProjectCategoryTopicCountMap["8000000_proj_8000000_category_"], 100); APSARA_TEST_EQUAL(sProjectCategoryTopicCountMap["8000000_proj_8000000_category_xxxx"], 200); // update group topic apsara_log = Json::Value(); apsara_log["project_name"] = Json::Value("8000000_proj"); apsara_log["category"] = Json::Value("8000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PATH_SEPARATOR + "apsara_log"); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(false); apsara_log["topic_format"] = Json::Value("group_topic"); apsara_log["group_topic"] = Json::Value("xxxx1"); apsara_log["preserve_depth"] = Json::Value(2); apsara_log["version"] = Json::Value(1); Json::Value rootJson1, metrics1; rootJson1["apsara_log"] = apsara_log; metrics1["metrics"] = rootJson1; SetConfigResponse(metrics1.toStyledString()); sleep(WAIT_CONFIG_UPDATE_INTERVAL); for (int i = 0; i < 20; ++i) { DumpLog(10, mRootDir + PATH_SEPARATOR + gPath[0], APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); it = configMap.find("apsara_log"); APSARA_TEST_EQUAL(it != configMap.end(), true); config = it->second; APSARA_TEST_EQUAL(config->mProjectName, "8000000_proj"); APSARA_TEST_EQUAL(config->mBasePath, mRootDir + PATH_SEPARATOR + "apsara_log"); APSARA_TEST_EQUAL(config->mVersion, 1); APSARA_TEST_EQUAL(config->mLogType, APSARA_LOG); APSARA_TEST_EQUAL(config->mIsPreserve, false); APSARA_TEST_EQUAL(config->mTopicFormat, "group_topic"); APSARA_TEST_EQUAL(config->mGroupTopic, "xxxx1"); APSARA_TEST_EQUAL(sProjectCategoryTopicCountMap["8000000_proj_8000000_category_"], 100); APSARA_TEST_EQUAL(sProjectCategoryTopicCountMap["8000000_proj_8000000_category_xxxx"], 200); APSARA_TEST_EQUAL(sProjectCategoryTopicCountMap["8000000_proj_8000000_category_xxxx1"], 200); // disable group topic apsara_log = Json::Value(); apsara_log["project_name"] = Json::Value("8000000_proj"); apsara_log["category"] = Json::Value("8000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PATH_SEPARATOR + "apsara_log"); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(false); apsara_log["topic_format"] = Json::Value("none"); apsara_log["preserve_depth"] = Json::Value(2); apsara_log["version"] = Json::Value(2); Json::Value rootJson2; rootJson2["apsara_log"] = apsara_log; Json::Value metrics2; metrics2["metrics"] = rootJson2; SetConfigResponse(metrics2.toStyledString()); sleep(WAIT_CONFIG_UPDATE_INTERVAL); for (int i = 0; i < 20; ++i) { DumpLog(10, mRootDir + PATH_SEPARATOR + "apsara_log", APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); APSARA_TEST_EQUAL(sProjectCategoryTopicCountMap["8000000_proj_8000000_category_"], 300); APSARA_TEST_EQUAL(sProjectCategoryTopicCountMap["8000000_proj_8000000_category_xxxx"], 200); APSARA_TEST_EQUAL(sProjectCategoryTopicCountMap["8000000_proj_8000000_category_xxxx1"], 200); APSARA_TEST_TRUE(configMap.find("apsara_log") != configMap.end()); CaseCleanup(); RemoveConfigFile(); LOG_INFO(sLogger, ("TestUpdateGroupTopic() end", time(NULL))); } void ConfigUpdatorUnittest::TestValidWildcardPath() { LOG_INFO(sLogger, ("TestValidWildcardPath() begin", time(NULL))); auto eventDispatcher = EventDispatcher::GetInstance(); string dirs[] = {PS + "nginx" + PS + "aa" + PS + "bb", PS + "app_1" + PS + "aa", PS + "app_2" + PS + "aa" + PS + "bb", // ok PS + "app_2" + PS + "aa" + PS + "bb" + PS + "cc", // ok PS + "app_2" + PS + "aa" + PS + "bb" + PS + "cc" + PS + "dd", PS + "app_3" + PS + "aa" + PS + "bb2" + PS + "cc", // ok PS + "app_3" + PS + "aa" + PS + "bb2" + PS + "cc" + PS + "dd", PS + "app_13" + PS + "aa" + PS + "bb" + PS + "cc"}; for (int i = 0; i < 8; ++i) { if (i != 3 && i != 4) bfs::create_directories(bfs::path(mRootDir) / dirs[i]); } Json::Value rootJson; Json::Value apsara_log; apsara_log["project_name"] = Json::Value("9000000_proj"); apsara_log["category"] = Json::Value("9000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PS + "app_?" + PS + "aa" + PS + "*"); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(true); apsara_log["max_depth"] = Json::Value(1); apsara_log["topic_format"] = Json::Value("default"); rootJson["apsara_log"] = apsara_log; Json::Value metrics; metrics["metrics"] = rootJson; ofstream fout(STRING_FLAG(user_log_config).c_str()); fout << metrics << endl; fout.close(); CaseSetup(); LOG_INFO(sLogger, ("case", "#1")); sleep(1); bfs::create_directories(bfs::path(mRootDir) / dirs[3]); bfs::create_directories(bfs::path(mRootDir) / dirs[4]); WaitForFileBeenRead(); #if defined(_MSC_VER) // Because polling can't register wildcard path (it's a bug), we only have this way... sleep(LogInput::GetInstance()->mCheckBaseDirInterval); #endif usleep(100 * 1000); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[0]) == eventDispatcher->mPathWdMap.end()); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[1]) == eventDispatcher->mPathWdMap.end()); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[2]) != eventDispatcher->mPathWdMap.end()); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[3]) != eventDispatcher->mPathWdMap.end()); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[4]) == eventDispatcher->mPathWdMap.end()); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[5]) != eventDispatcher->mPathWdMap.end()); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[6]) == eventDispatcher->mPathWdMap.end()); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[7]) == eventDispatcher->mPathWdMap.end()); for (int i = 0; i < 20; ++i) { for (int j = 0; j < 8; ++j) DumpLog(10, mRootDir + dirs[j], APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[0] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[1] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[2] + PS + "job"], 200); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[3] + PS + "job"], 200); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[4] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[5] + PS + "job"], 200); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[6] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[7] + PS + "job"], 0); LOG_INFO(sLogger, ("case", "#2")); #if defined(__linux__) bfs::remove_all(mRootDir + dirs[3]); #elif defined(_MSC_VER) // On Windows, mRootDir + dirs[3] can't be deleted if we don't move job.log out. auto targetPath = mRootDir + "dir3_job.log"; bfs::rename(mRootDir + dirs[3] + PS + "job.log", targetPath); bfs::remove(targetPath); bfs::remove_all(mRootDir + dirs[3]); #endif // remove will not work when file is open // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[0]) == eventDispatcher->mPathWdMap.end()); // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[1]) == eventDispatcher->mPathWdMap.end()); // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[2]) != eventDispatcher->mPathWdMap.end()); // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[3]) == eventDispatcher->mPathWdMap.end()); // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[4]) == eventDispatcher->mPathWdMap.end()); // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[5]) != eventDispatcher->mPathWdMap.end()); // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[6]) == eventDispatcher->mPathWdMap.end()); // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[7]) == eventDispatcher->mPathWdMap.end()); for (int i = 0; i < 20; ++i) { for (int j = 0; j < 8; ++j) DumpLog(10, mRootDir + dirs[j], APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[0] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[1] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[2] + PS + "job"], 400); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[3] + PS + "job"], 200); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[4] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[5] + PS + "job"], 400); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[6] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[7] + PS + "job"], 0); LOG_INFO(sLogger, ("case", "#3")); for (int i = 0; i < 8; ++i) { auto targetPath = mRootDir + "dirs" + std::to_string(i) + "_job.log"; try { bfs::rename(mRootDir + dirs[i] + PS + "job.log", targetPath); bfs::remove(targetPath); } catch (...) { } } for (int i = 0; i < 8; ++i) { try { bfs::remove_all(mRootDir + dirs[i]); } catch (...) { } } usleep(100 * 1000); // remove will not work when file is open // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[0]) == eventDispatcher->mPathWdMap.end()); // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[1]) == eventDispatcher->mPathWdMap.end()); // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[2]) == eventDispatcher->mPathWdMap.end()); // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[3]) == eventDispatcher->mPathWdMap.end()); // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[4]) == eventDispatcher->mPathWdMap.end()); // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[5]) == eventDispatcher->mPathWdMap.end()); // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[6]) == eventDispatcher->mPathWdMap.end()); // APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[7]) == eventDispatcher->mPathWdMap.end()); for (int i = 0; i < 20; ++i) { for (int j = 0; j < 8; ++j) DumpLog(10, mRootDir + dirs[j], APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } #if defined(_MSC_VER) sleep(INT32_FLAG(polling_modify_repush_interval)); #endif WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[0] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[1] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[2] + PS + "job"], 400); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[3] + PS + "job"], 200); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[4] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[5] + PS + "job"], 400); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[6] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[7] + PS + "job"], 0); LOG_INFO(sLogger, ("case", "#4")); for (int i = 0; i < 8; ++i) { try { bfs::create_directories(mRootDir + dirs[i]); } catch (std::exception& e) { LOG_WARNING( sLogger, ("bfs::create_directories failed", (bfs::path(mRootDir) / dirs[i]).string())("error", e.what())); } } sleep(INT32_FLAG(check_base_dir_interval) + 1); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[0]) == eventDispatcher->mPathWdMap.end()); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[1]) == eventDispatcher->mPathWdMap.end()); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[2]) != eventDispatcher->mPathWdMap.end()); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[3]) != eventDispatcher->mPathWdMap.end()); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[4]) == eventDispatcher->mPathWdMap.end()); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[5]) != eventDispatcher->mPathWdMap.end()); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[6]) == eventDispatcher->mPathWdMap.end()); APSARA_TEST_TRUE(eventDispatcher->mPathWdMap.find(mRootDir + dirs[7]) == eventDispatcher->mPathWdMap.end()); for (int i = 0; i < 20; ++i) { for (int j = 0; j < 8; ++j) DumpLog(10, mRootDir + dirs[j], APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[0] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[1] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[2] + PS + "job"], 600); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[3] + PS + "job"], 400); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[4] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[5] + PS + "job"], 600); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[6] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[7] + PS + "job"], 0); CaseCleanup(); RemoveConfigFile(); LOG_INFO(sLogger, ("TestValidWildcardPath() end", time(NULL))); } void ConfigUpdatorUnittest::TestValidWildcardPath2() { LOG_INFO(sLogger, ("TestValidWildcardPath2() begin", time(NULL))); string dirs[] = {PS + "nginx" + PS + "aa" + PS + "bb", PS + "app_1" + PS + "aa", PS + "app_2" + PS + "aa" + PS + "bb", // ok PS + "app_2" + PS + "aa" + PS + "bb" + PS + "cc", // ok PS + "app_2" + PS + "aa" + PS + "bb" + PS + "cc" + PS + "dd", PS + "app_3" + PS + "aa" + PS + "bb2" + PS + "cc", PS + "app_3" + PS + "aa" + PS + "bb2" + PS + "cc" + PS + "dd", PS + "app_13" + PS + "aa" + PS + "bb" + PS + "cc"}; for (int i = 0; i < 8; ++i) { if (i != 3 && i != 4) bfs::create_directories(bfs::path(mRootDir) / dirs[i]); } Json::Value rootJson; Json::Value apsara_log; apsara_log["project_name"] = Json::Value("9000000_proj"); apsara_log["category"] = Json::Value("9000000_category"); apsara_log["log_type"] = Json::Value("apsara_log"); apsara_log["log_begin_reg"] = Json::Value("\\[\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+\\].*"); apsara_log["log_path"] = Json::Value(mRootDir + PS + "app_?" + PS + "*" + PS + "bb"); SetConfigFilePattern(apsara_log); apsara_log["enable"] = Json::Value(true); apsara_log["preserve"] = Json::Value(true); apsara_log["max_depth"] = Json::Value(1); apsara_log["topic_format"] = Json::Value("default"); rootJson["apsara_log"] = apsara_log; Json::Value metrics; metrics["metrics"] = rootJson; ofstream fout(STRING_FLAG(user_log_config).c_str()); fout << metrics << endl; fout.close(); CaseSetup(); LOG_INFO(sLogger, ("case", "#1")); sleep(1); bfs::create_directories(bfs::path(mRootDir) / dirs[3]); bfs::create_directories(bfs::path(mRootDir) / dirs[4]); #if defined(_MSC_VER) // Because polling can't register wildcard path (it's a bug), we only have this way... sleep(LogInput::GetInstance()->mCheckBaseDirInterval); #endif usleep(100 * 1000); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->mPathWdMap.find(mRootDir + dirs[0]) == EventDispatcher::GetInstance()->mPathWdMap.end()); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->mPathWdMap.find(mRootDir + dirs[1]) == EventDispatcher::GetInstance()->mPathWdMap.end()); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->mPathWdMap.find(mRootDir + dirs[2]) != EventDispatcher::GetInstance()->mPathWdMap.end()); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->mPathWdMap.find(mRootDir + dirs[3]) != EventDispatcher::GetInstance()->mPathWdMap.end()); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->mPathWdMap.find(mRootDir + dirs[4]) == EventDispatcher::GetInstance()->mPathWdMap.end()); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->mPathWdMap.find(mRootDir + dirs[5]) == EventDispatcher::GetInstance()->mPathWdMap.end()); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->mPathWdMap.find(mRootDir + dirs[6]) == EventDispatcher::GetInstance()->mPathWdMap.end()); APSARA_TEST_TRUE(EventDispatcher::GetInstance()->mPathWdMap.find(mRootDir + dirs[7]) == EventDispatcher::GetInstance()->mPathWdMap.end()); for (int i = 0; i < 20; ++i) { for (int j = 0; j < 8; ++j) DumpLog(10, mRootDir + dirs[j], APSARA_LOG); usleep(WRITE_LOG_SLEEP_INTERVAL); } WaitForFileBeenRead(); sleep(2 * INT32_FLAG(batch_send_interval) + 2); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[0] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[1] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[2] + PS + "job"], 200); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[3] + PS + "job"], 200); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[4] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[5] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[6] + PS + "job"], 0); APSARA_TEST_EQUAL(sTopicCountMap[mRootDir + dirs[7] + PS + "job"], 0); CaseCleanup(); LOG_INFO(sLogger, ("TestValidWildcardPath2() end", time(NULL))); } void ConfigUpdatorUnittest::TestWithinMaxDepth() { // No wildcard. CollectionConfig* cfg_1 = new CollectionConfig( PS + "abc" + PS + "de" + PS + "f", "x.log", REGEX_LOG, "a", "", "", "", "prj", true, 0, 0, "cat"); EXPECT_EQ(cfg_1->WithinMaxDepth(PS + "abc"), false); EXPECT_EQ(cfg_1->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "f"), true); EXPECT_EQ(cfg_1->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "fx"), false); EXPECT_EQ(cfg_1->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "f" + PS + "ghi"), false); delete cfg_1; // To be compatible with old settings CollectionConfig* cfg_2 = new CollectionConfig( PS + "abc" + PS + "de" + PS + "f", "x.log", REGEX_LOG, "a", "", "", "", "prj", true, 0, -1, "cat"); EXPECT_EQ(cfg_2->WithinMaxDepth(PS + "abc"), true); EXPECT_EQ(cfg_2->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "f"), true); EXPECT_EQ(cfg_2->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "fx"), true); EXPECT_EQ(cfg_2->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "f" + PS + "ghi"), true); EXPECT_EQ(cfg_2->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "f" + PS + "ghi" + PS + "agec" + PS + "egegt"), true); delete cfg_2; CollectionConfig* cfg_3 = new CollectionConfig( PS + "abc" + PS + "de" + PS + "f", "x.log", REGEX_LOG, "a", "", "", "", "prj", true, 0, 3, "cat"); EXPECT_EQ(cfg_3->WithinMaxDepth(PS + "abc"), false); EXPECT_EQ(cfg_3->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "f"), true); EXPECT_EQ(cfg_3->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "fx"), false); EXPECT_EQ(cfg_3->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "f" + PS + "ghi"), true); EXPECT_EQ(cfg_3->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "f" + PS + "ghi" + PS + "age"), true); EXPECT_EQ(cfg_3->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "f" + PS + "ghi" + PS + "age" + PS + "gege"), true); EXPECT_EQ( cfg_3->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "f" + PS + "ghi" + PS + "ageg" + PS + "agege" + PS + "gae"), false); EXPECT_EQ(cfg_3->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "f" + PS + "ghi" + PS + "agege" + PS + "gege" + PS + "gegeg" + PS + "ge"), false); delete cfg_3; // Wildcard. CollectionConfig* cfg_4 = new CollectionConfig( PS + "ab?" + PS + "de" + PS + "*", "x.log", REGEX_LOG, "a", "", "", "", "prj", true, 0, 0, "cat"); EXPECT_EQ(cfg_4->WithinMaxDepth(PS + "abc"), false); EXPECT_EQ(cfg_4->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "f"), true); EXPECT_EQ(cfg_4->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "xyz"), true); EXPECT_EQ(cfg_4->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "f" + PS + "ghi"), false); delete cfg_4; // To be compatible with old settings. CollectionConfig* cfg_5 = new CollectionConfig(PS + "abc" + PS + "de?" + PS + "f*" + PS + "xyz", "x.log", REGEX_LOG, "a", "", "", "", "prj", true, 0, -1, "cat", ""); EXPECT_EQ(cfg_5->WithinMaxDepth(PS + "abc"), true); EXPECT_EQ(cfg_5->WithinMaxDepth(PS + "abc" + PS + "def" + PS + "fgz"), true); EXPECT_EQ(cfg_5->WithinMaxDepth(PS + "abc" + PS + "def" + PS + "fgz" + PS + "xyz0"), true); EXPECT_EQ(cfg_5->WithinMaxDepth(PS + "abc" + PS + "def" + PS + "fgz" + PS + "xyz"), true); EXPECT_EQ(cfg_5->WithinMaxDepth(PS + "abc" + PS + "deef" + PS + "fgz" + PS + "xyz" + PS + "000"), true); EXPECT_EQ(cfg_5->WithinMaxDepth(PS + "abc" + PS + "deef" + PS + "fgz" + PS + "xyz" + PS + "000" + PS + "111" + PS + "222"), true); delete cfg_5; CollectionConfig* cfg_6 = new CollectionConfig( PS + "abc" + PS + "d?" + PS + "f*", "x.log", REGEX_LOG, "a", "", "", "", "prj", true, 0, 3, "cat"); EXPECT_EQ(cfg_6->WithinMaxDepth(PS + "abc"), false); EXPECT_EQ(cfg_6->WithinMaxDepth(PS + "abc" + PS + "de"), false); EXPECT_EQ(cfg_6->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "f"), true); EXPECT_EQ(cfg_6->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "x"), false); EXPECT_EQ(cfg_6->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "xf" + PS + "g"), false); EXPECT_EQ(cfg_6->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "fx" + PS + "ghi"), true); EXPECT_EQ(cfg_6->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "fx" + PS + "ghi" + PS + "age"), true); EXPECT_EQ(cfg_6->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "fx" + PS + "ghi" + PS + "age" + PS + "gege"), true); EXPECT_EQ(cfg_6->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "fx" + PS + "ghi" + PS + "ageg" + PS + "agege" + PS + "gae"), false); EXPECT_EQ(cfg_6->WithinMaxDepth(PS + "abc" + PS + "de" + PS + "fx" + PS + "ghi" + PS + "agege" + PS + "gege" + PS + "gegeg" + PS + "ge"), false); delete cfg_6; // Wildcard on root path, only Windows works. { #if defined(__linux__) CollectionConfig cfg("/*", "x.log", REGEX_LOG, "a", "", "", "", "prj", true, 0, 3, "cat"); EXPECT_TRUE(cfg.WithinMaxDepth("/var")); BOOL_FLAG(enable_root_path_collection) = true; EXPECT_TRUE(cfg.WithinMaxDepth("/var")); #elif defined(_MSC_VER) CollectionConfig cfg("D:\\*", "x.log", REGEX_LOG, "a", "", "prj", true, 0, 3, "cat"); EXPECT_TRUE(!cfg.WithinMaxDepth("D:\\var")); BOOL_FLAG(enable_root_path_collection) = true; EXPECT_TRUE(cfg.WithinMaxDepth("D:\\var")); #endif BOOL_FLAG(enable_root_path_collection) = false; } } void ConfigUpdatorUnittest::TestParseWildcardPath() { CollectionConfig cfg(PS, "*.log", APSARA_LOG, "x", "", "", "", "prj", true, 0, 0, "cat"); std::string pathRoot = ""; #if defined(_MSC_VER) pathRoot = "DriveLetter:"; #endif cfg.mBasePath = pathRoot + PS + "usr" + PS + "?"; cfg.ParseWildcardPath(); APSARA_TEST_EQUAL(cfg.mWildcardPaths.size(), 2); APSARA_TEST_EQUAL(cfg.mConstWildcardPaths.size(), 1); APSARA_TEST_EQUAL(cfg.mWildcardPaths[0], pathRoot + PS + "usr"); APSARA_TEST_EQUAL(cfg.mWildcardPaths[1], pathRoot + PS + "usr" + PS + "?"); APSARA_TEST_EQUAL(cfg.mConstWildcardPaths[0], ""); cfg.mBasePath = pathRoot + PS + "home" + PS + "tops" + PS + "?in" + PS + "python" + PS + "*" + PS + "logs"; cfg.ParseWildcardPath(); APSARA_TEST_EQUAL_FATAL(cfg.mWildcardPaths.size(), 5); APSARA_TEST_EQUAL_FATAL(cfg.mConstWildcardPaths.size(), 4); APSARA_TEST_EQUAL(cfg.mWildcardPaths[0], pathRoot + PS + "home" + PS + "tops"); APSARA_TEST_EQUAL(cfg.mWildcardPaths[1], pathRoot + PS + "home" + PS + "tops" + PS + "?in"); APSARA_TEST_EQUAL(cfg.mWildcardPaths[2], pathRoot + PS + "home" + PS + "tops" + PS + "?in" + PS + "python"); APSARA_TEST_EQUAL(cfg.mWildcardPaths[3], pathRoot + PS + "home" + PS + "tops" + PS + "?in" + PS + "python" + PS + "*"); APSARA_TEST_EQUAL(cfg.mWildcardPaths[4], pathRoot + PS + "home" + PS + "tops" + PS + "?in" + PS + "python" + PS + "*" + PS + "logs"); APSARA_TEST_EQUAL(cfg.mConstWildcardPaths[0], ""); APSARA_TEST_EQUAL(cfg.mConstWildcardPaths[1], "python"); APSARA_TEST_EQUAL(cfg.mConstWildcardPaths[2], ""); APSARA_TEST_EQUAL(cfg.mConstWildcardPaths[3], "logs"); cfg.mBasePath = pathRoot + PS + "*"; cfg.ParseWildcardPath(); APSARA_TEST_EQUAL_FATAL(cfg.mWildcardPaths.size(), 2); APSARA_TEST_EQUAL_FATAL(cfg.mConstWildcardPaths.size(), 1); APSARA_TEST_EQUAL(cfg.mWildcardPaths[0], pathRoot + PS); APSARA_TEST_EQUAL(cfg.mWildcardPaths[1], pathRoot + PS + "*"); APSARA_TEST_EQUAL(cfg.mConstWildcardPaths[0], ""); cfg.mBasePath = "h?me"; cfg.ParseWildcardPath(); APSARA_TEST_EQUAL_FATAL(cfg.mWildcardPaths.size(), 0); APSARA_TEST_EQUAL_FATAL(cfg.mConstWildcardPaths.size(), 0); cfg.mBasePath = "*sr"; cfg.ParseWildcardPath(); APSARA_TEST_EQUAL_FATAL(cfg.mWildcardPaths.size(), 0); APSARA_TEST_EQUAL_FATAL(cfg.mConstWildcardPaths.size(), 0); cfg.mBasePath = pathRoot + PS; cfg.ParseWildcardPath(); APSARA_TEST_EQUAL_FATAL(cfg.mWildcardPaths.size(), 0); APSARA_TEST_EQUAL_FATAL(cfg.mConstWildcardPaths.size(), 0); cfg.mBasePath = pathRoot + PS + "home" + PS + "admin" + PS + "logs"; cfg.ParseWildcardPath(); APSARA_TEST_EQUAL_FATAL(cfg.mWildcardPaths.size(), 0); APSARA_TEST_EQUAL_FATAL(cfg.mConstWildcardPaths.size(), 0); } void ConfigUpdatorUnittest::TestIsWildcardPathMatch() { CollectionConfig cfg(PS, "*.log", APSARA_LOG, "x", "", "", "", "prj", true, 100, 100, "cat"); cfg.mBasePath = PS + "usr" + PS + "?" + PS + "abc" + PS + "*" + PS + "def"; cfg.ParseWildcardPath(); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usr" + PS + "a" + PS + "abc" + PS + "def" + PS + "def"), true); APSARA_TEST_EQUAL( cfg.IsWildcardPathMatch(PS + "usr" + PS + "a" + PS + "abc" + PS + "def" + PS + "def" + PS + "ghi"), true); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usr" + PS + "aa" + PS + "abc" + PS + "def" + PS + "def"), false); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usr" + PS + "a" + PS + "abc" + PS + "def" + PS + "adef"), false); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usr" + PS + "a" + PS + "abc" + PS + "def"), false); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS), false); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(""), false); cfg.mBasePath = PS + "usr*"; cfg.ParseWildcardPath(); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usr"), true); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usrx"), true); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usr" + PS + "abc"), true); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usrx" + PS + "abc" + PS + "def" + PS + "ghi"), true); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "us"), false); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS), false); cfg.mBasePath = PS + "usr" + PS + "?"; cfg.ParseWildcardPath(); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usr" + PS + "a"), true); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usr" + PS + "b" + PS + "cef" + PS + "geg" + PS + "gege" + PS + "gegea" + PS + "egege"), true); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usr" + PS + "aa"), false); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usr" + PS + "bb" + PS + "cef"), false); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usr"), false); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(""), false); APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usr" + PS + "b" + PS + "cef"), true); cfg.mMaxDepth = 0; APSARA_TEST_EQUAL(cfg.IsWildcardPathMatch(PS + "usr" + PS + "b" + PS + "cef"), false); } } // namespace logtail int main(int argc, char** argv) { InitUnittestMain(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }