void TestSecondaryStorage()

in core/unittest/sender/SenderUnittest.cpp [1484:1694]


    void TestSecondaryStorage() {
        LOG_INFO(sLogger, ("TestSecondaryStorage() begin", time(NULL)));
        CaseSetUp();
        auto sender = Sender::Instance();
        gProjectNetEnableIndex = 1000000;
        int32_t defaultCheckPeriod = sender->mCheckPeriod;
        sender->mCheckPeriod = 1;
        INT32_FLAG(buffer_file_alive_interval) = 4 * INT32_FLAG(batch_send_interval) + 5;
        DisableNetWork();
        int32_t defaultMergeLimit = INT32_FLAG(merge_log_count_limit);
        INT32_FLAG(merge_log_count_limit) = 1024 * 1024 * 10;
        int32_t defaultHoldSize = INT32_FLAG(max_holded_data_size);
        INT32_FLAG(max_holded_data_size) = 20 * 1024 * 1024;
        set<string> firstLogSet;
        set<string> secondLogSet;
        set<string> thirdLogSet;
        set<string> forthLogSet;
        size_t logCount = 30;
        while (firstLogSet.size() < logCount) {
            string first = GenerateRandomStr(1, 1024 * 2) + "_$#1";
            if (firstLogSet.find(first) == firstLogSet.end())
                firstLogSet.insert(first);
        }
        while (secondLogSet.size() < logCount) {
            string second = GenerateRandomStr(1, 1024 * 2) + "_$#2";
            if (secondLogSet.find(second) == secondLogSet.end())
                secondLogSet.insert(second);
        }
        while (thirdLogSet.size() < logCount) {
            string third = GenerateRandomStr(1, 1024 * 2) + "_$#3";
            if (thirdLogSet.find(third) == thirdLogSet.end())
                thirdLogSet.insert(third);
        }
        while (forthLogSet.size() < logCount) {
            string forth = GenerateRandomStr(1, 1024 * 2) + "_$#4";
            if (forthLogSet.find(forth) == forthLogSet.end())
                forthLogSet.insert(forth);
        }

        FileEncryption::GetInstance()->mKeyMap.clear();
        FileEncryption::KeyInfo* keyV1 = new FileEncryption::KeyInfo("73056101345ceb496c5574b8c6b7fc0e", 1);
        FileEncryption::GetInstance()->mKeyMap.insert(pair<int32_t, FileEncryption::KeyInfo*>(keyV1->mVersion, keyV1));
        FileEncryption::GetInstance()->SetDefaultKey();
        APSARA_TEST_EQUAL(FileEncryption::GetInstance()->GetDefaultKeyVersion(), 1);

        // SendFailType equals to 3 means that sending error will not write to secondary.
        // Generate logs and wait enough time so that they can arrive sender queue.
        gSendFailType = 3;
        LOG_INFO(sLogger,
                 ("gNetWorkStat", gNetWorkStat)("gSendFailType", gSendFailType)("write forthLogSet", logCount));
        for (set<string>::iterator iter = forthLogSet.begin(); iter != forthLogSet.end(); ++iter) {
            OneJob(1, gRootDir, "Job", true, time(NULL), *iter);
        }
        printf("Job done\n");
        sleep(10);
        APSARA_TEST_TRUE(sender->FlushOut(5 * 1000)); // this operation will write buffer file
        sleep(INT32_FLAG(buffer_file_alive_interval) - 4);
        vector<string> filesToSend;
        // send error will be discard, so second buffer is null
        sender->LoadFileToSend(time(NULL), filesToSend);
        APSARA_TEST_EQUAL(filesToSend.size(), 0);

        // SendFailType equals to 2 means that sending error will write to secondary.
        gSendFailType = 2; // test all retryable send error
        LOG_INFO(sLogger,
                 ("gNetWorkStat", gNetWorkStat)("gSendFailType", gSendFailType)("write firstLogSet", logCount));
        for (set<string>::iterator iter = firstLogSet.begin(); iter != firstLogSet.end(); ++iter) {
            OneJob(1, gRootDir, "Job", true, time(NULL), *iter);
        }
        printf("Job done\n");
        sleep(5);
        APSARA_TEST_TRUE(sender->FlushOut(5 * 1000)); // this operation will write buffer file
        sleep(INT32_FLAG(buffer_file_alive_interval) - 4);
        // Data sent failed will be written to secondary, so there is a local file.
        filesToSend.clear();
        sender->LoadFileToSend(time(NULL), filesToSend);
        APSARA_TEST_EQUAL(filesToSend.size(), 1);
        LOG_INFO(sLogger, ("bufferFileList", MergeVectorString(filesToSend)));
        FileEncryption::KeyInfo* keyV2 = new FileEncryption::KeyInfo("c134368ce7840a4e217ee6d8c27a7e0f", 2);
        FileEncryption::GetInstance()->mKeyMap.insert(pair<int32_t, FileEncryption::KeyInfo*>(keyV2->mVersion, keyV2));
        FileEncryption::GetInstance()->SetDefaultKey();
        APSARA_TEST_EQUAL(FileEncryption::GetInstance()->GetDefaultKeyVersion(), 2);

        // Refresh buffer file manually.
        string bufferFileName = sender->GetBufferFileName();
        if (sender->GetBufferFileName() == bufferFileName)
            sender->CreateNewFile();
        bufferFileName = sender->GetBufferFileName();

        LOG_INFO(sLogger,
                 ("gNetWorkStat", gNetWorkStat)("gSendFailType", gSendFailType)("write secondLogSet", logCount));
        gSendFailType = 1; // test network error
        for (set<string>::iterator iter = secondLogSet.begin(); iter != secondLogSet.end(); ++iter) {
            OneJob(1, gRootDir, "Job", true, time(NULL), *iter);
        }
        printf("Job done\n");
        sleep(5);
        APSARA_TEST_TRUE(Sender::Instance()->FlushOut(5 * 1000)); // this operation will write buffer file
        sleep(INT32_FLAG(buffer_file_alive_interval) - 4);
        filesToSend.clear();
        Sender::Instance()->LoadFileToSend(time(NULL), filesToSend);
        APSARA_TEST_EQUAL(filesToSend.size(), 2);
        LOG_INFO(sLogger, ("bufferFileList", MergeVectorString(filesToSend)));

        FileEncryption::KeyInfo* keyV3 = new FileEncryption::KeyInfo("e1b07c24b3340c94945c058db193d8f4", 3);
        FileEncryption::GetInstance()->mKeyMap.insert(pair<int32_t, FileEncryption::KeyInfo*>(keyV3->mVersion, keyV3));
        FileEncryption::GetInstance()->SetDefaultKey();
        APSARA_TEST_EQUAL(FileEncryption::GetInstance()->GetDefaultKeyVersion(), 3);
        if (Sender::Instance()->GetBufferFileName() == bufferFileName)
            Sender::Instance()->CreateNewFile();
        bufferFileName = Sender::Instance()->GetBufferFileName();
        LOG_INFO(sLogger,
                 ("gNetWorkStat", gNetWorkStat)("gSendFailType", gSendFailType)("write thirdLogSet", logCount));
        for (set<string>::iterator iter = thirdLogSet.begin(); iter != thirdLogSet.end(); ++iter) {
            OneJob(1, gRootDir, "Job", true, time(NULL), *iter);
        }
        printf("Job done\n");

        // move delete key behind sleep
        // delete key 2, so second log will be discard
        FileEncryption::GetInstance()->mKeyMap.erase(2);
        FileEncryption::GetInstance()->SetDefaultKey();
        LOG_INFO(sLogger, ("delete key version 2, networkstat", gNetWorkStat));

        sleep(5);
        APSARA_TEST_TRUE(Sender::Instance()->FlushOut(5 * 1000)); // this operation will write buffer file
        sleep(INT32_FLAG(buffer_file_alive_interval) - 4); // debug
        filesToSend.clear();
        Sender::Instance()->LoadFileToSend(time(NULL), filesToSend);
        // buffer file with version2 will be deleted
        APSARA_TEST_EQUAL(filesToSend.size(), 2);
        LOG_INFO(sLogger, ("bufferFileList", MergeVectorString(filesToSend)));

        EnableNetWork();
        sleep(INT32_FLAG(buffer_file_alive_interval) + 1);
        LOG_INFO(sLogger, ("begin check, gNetWorkStat", gNetWorkStat));

        APSARA_TEST_EQUAL(gCounter, int32_t(2 * logCount));
        filesToSend.clear();
        Sender::Instance()->LoadFileToSend(time(NULL), filesToSend);
        APSARA_TEST_EQUAL(filesToSend.size(), 0);

        size_t firstLogCount = 0;
        size_t secondLogCount = 0;
        size_t thirdLogCount = 0;
        size_t forthLogCount = 0;
        APSARA_TEST_EQUAL(gBufferLogGroups.size(), 2);
        for (size_t i = 0; i < (size_t)gBufferLogGroups.size(); ++i) {
            LogGroup& logGroup = gBufferLogGroups[i];
            if (logGroup.category() != "app_log")
                continue;
            for (size_t j = 0; j < (size_t)logGroup.logs_size(); ++j) {
                const Log log = logGroup.logs(j);
                string ip = "";
                string nothing = "";
                for (size_t k = 0; k < (size_t)log.contents_size(); ++k) {
                    const Log_Content content = log.contents(k);
                    if (content.key() == "ip")
                        ip = content.value();
                    else if (content.key() == "nothing")
                        nothing = content.value();
                }
                if (ip != "10.7.241.21")
                    continue;
                LOG_INFO(sLogger, ("nothing", nothing)("substr", nothing.substr(nothing.size() - 4)));
                if (nothing.substr(nothing.size() - 4) == "_$#1") {
                    if (firstLogSet.find(nothing) != firstLogSet.end()) {
                        firstLogCount++;
                        firstLogSet.erase(nothing);
                    }
                } else if (nothing.substr(nothing.size() - 4) == "_$#3") {
                    if (thirdLogSet.find(nothing) != thirdLogSet.end()) {
                        thirdLogCount++;
                        thirdLogSet.erase(nothing);
                    }
                } else if (nothing.substr(nothing.size() - 4) == "_$#4") {
                    if (forthLogSet.find(nothing) != forthLogSet.end()) {
                        forthLogCount++;
                        forthLogSet.erase(nothing);
                    }
                } else if (nothing.substr(nothing.size() - 4) == "_$#2") {
                    if (secondLogSet.find(nothing) != secondLogSet.end()) {
                        secondLogCount++;
                        secondLogSet.erase(nothing);
                    }
                }
            }
        }

        APSARA_TEST_EQUAL(firstLogSet.size(), 0);
        APSARA_TEST_EQUAL(firstLogCount, logCount);
        APSARA_TEST_EQUAL(secondLogSet.size(), logCount);
        APSARA_TEST_EQUAL(secondLogCount, 0);
        APSARA_TEST_EQUAL(thirdLogSet.size(), 0);
        APSARA_TEST_EQUAL(thirdLogCount, logCount);
        APSARA_TEST_EQUAL(forthLogSet.size(), logCount);
        APSARA_TEST_EQUAL(forthLogCount, 0);

        FileEncryption::GetInstance()->mKeyMap.clear();
        FileEncryption::GetInstance()->LoadKeyInfo();
        FileEncryption::GetInstance()->SetDefaultKey();
        delete keyV1;
        delete keyV2;
        delete keyV3;
        INT32_FLAG(merge_log_count_limit) = defaultMergeLimit;
        INT32_FLAG(max_holded_data_size) = defaultHoldSize;
        Sender::Instance()->mCheckPeriod = defaultCheckPeriod;
        gProjectNetEnableIndex = 0;
        CaseCleanUp();
        LOG_INFO(sLogger, ("TestSecondaryStorage() end", time(NULL)));
    }