in core/unittest/sender/SenderUnittest.cpp [1484:1694]
void TestSecondaryStorage() {
LOG_INFO(sLogger, ("TestSecondaryStorage() begin", time(NULL)));
CaseSetUp();
auto sender = Sender::Instance();
gProjectNetEnableIndex = 1000000;
int32_t defaultCheckPeriod = sender->mCheckPeriod;
sender->mCheckPeriod = 1;
INT32_FLAG(buffer_file_alive_interval) = 4 * INT32_FLAG(batch_send_interval) + 5;
DisableNetWork();
int32_t defaultMergeLimit = INT32_FLAG(merge_log_count_limit);
INT32_FLAG(merge_log_count_limit) = 1024 * 1024 * 10;
int32_t defaultHoldSize = INT32_FLAG(max_holded_data_size);
INT32_FLAG(max_holded_data_size) = 20 * 1024 * 1024;
set<string> firstLogSet;
set<string> secondLogSet;
set<string> thirdLogSet;
set<string> forthLogSet;
size_t logCount = 30;
while (firstLogSet.size() < logCount) {
string first = GenerateRandomStr(1, 1024 * 2) + "_$#1";
if (firstLogSet.find(first) == firstLogSet.end())
firstLogSet.insert(first);
}
while (secondLogSet.size() < logCount) {
string second = GenerateRandomStr(1, 1024 * 2) + "_$#2";
if (secondLogSet.find(second) == secondLogSet.end())
secondLogSet.insert(second);
}
while (thirdLogSet.size() < logCount) {
string third = GenerateRandomStr(1, 1024 * 2) + "_$#3";
if (thirdLogSet.find(third) == thirdLogSet.end())
thirdLogSet.insert(third);
}
while (forthLogSet.size() < logCount) {
string forth = GenerateRandomStr(1, 1024 * 2) + "_$#4";
if (forthLogSet.find(forth) == forthLogSet.end())
forthLogSet.insert(forth);
}
FileEncryption::GetInstance()->mKeyMap.clear();
FileEncryption::KeyInfo* keyV1 = new FileEncryption::KeyInfo("73056101345ceb496c5574b8c6b7fc0e", 1);
FileEncryption::GetInstance()->mKeyMap.insert(pair<int32_t, FileEncryption::KeyInfo*>(keyV1->mVersion, keyV1));
FileEncryption::GetInstance()->SetDefaultKey();
APSARA_TEST_EQUAL(FileEncryption::GetInstance()->GetDefaultKeyVersion(), 1);
// SendFailType equals to 3 means that sending error will not write to secondary.
// Generate logs and wait enough time so that they can arrive sender queue.
gSendFailType = 3;
LOG_INFO(sLogger,
("gNetWorkStat", gNetWorkStat)("gSendFailType", gSendFailType)("write forthLogSet", logCount));
for (set<string>::iterator iter = forthLogSet.begin(); iter != forthLogSet.end(); ++iter) {
OneJob(1, gRootDir, "Job", true, time(NULL), *iter);
}
printf("Job done\n");
sleep(10);
APSARA_TEST_TRUE(sender->FlushOut(5 * 1000)); // this operation will write buffer file
sleep(INT32_FLAG(buffer_file_alive_interval) - 4);
vector<string> filesToSend;
// send error will be discard, so second buffer is null
sender->LoadFileToSend(time(NULL), filesToSend);
APSARA_TEST_EQUAL(filesToSend.size(), 0);
// SendFailType equals to 2 means that sending error will write to secondary.
gSendFailType = 2; // test all retryable send error
LOG_INFO(sLogger,
("gNetWorkStat", gNetWorkStat)("gSendFailType", gSendFailType)("write firstLogSet", logCount));
for (set<string>::iterator iter = firstLogSet.begin(); iter != firstLogSet.end(); ++iter) {
OneJob(1, gRootDir, "Job", true, time(NULL), *iter);
}
printf("Job done\n");
sleep(5);
APSARA_TEST_TRUE(sender->FlushOut(5 * 1000)); // this operation will write buffer file
sleep(INT32_FLAG(buffer_file_alive_interval) - 4);
// Data sent failed will be written to secondary, so there is a local file.
filesToSend.clear();
sender->LoadFileToSend(time(NULL), filesToSend);
APSARA_TEST_EQUAL(filesToSend.size(), 1);
LOG_INFO(sLogger, ("bufferFileList", MergeVectorString(filesToSend)));
FileEncryption::KeyInfo* keyV2 = new FileEncryption::KeyInfo("c134368ce7840a4e217ee6d8c27a7e0f", 2);
FileEncryption::GetInstance()->mKeyMap.insert(pair<int32_t, FileEncryption::KeyInfo*>(keyV2->mVersion, keyV2));
FileEncryption::GetInstance()->SetDefaultKey();
APSARA_TEST_EQUAL(FileEncryption::GetInstance()->GetDefaultKeyVersion(), 2);
// Refresh buffer file manually.
string bufferFileName = sender->GetBufferFileName();
if (sender->GetBufferFileName() == bufferFileName)
sender->CreateNewFile();
bufferFileName = sender->GetBufferFileName();
LOG_INFO(sLogger,
("gNetWorkStat", gNetWorkStat)("gSendFailType", gSendFailType)("write secondLogSet", logCount));
gSendFailType = 1; // test network error
for (set<string>::iterator iter = secondLogSet.begin(); iter != secondLogSet.end(); ++iter) {
OneJob(1, gRootDir, "Job", true, time(NULL), *iter);
}
printf("Job done\n");
sleep(5);
APSARA_TEST_TRUE(Sender::Instance()->FlushOut(5 * 1000)); // this operation will write buffer file
sleep(INT32_FLAG(buffer_file_alive_interval) - 4);
filesToSend.clear();
Sender::Instance()->LoadFileToSend(time(NULL), filesToSend);
APSARA_TEST_EQUAL(filesToSend.size(), 2);
LOG_INFO(sLogger, ("bufferFileList", MergeVectorString(filesToSend)));
FileEncryption::KeyInfo* keyV3 = new FileEncryption::KeyInfo("e1b07c24b3340c94945c058db193d8f4", 3);
FileEncryption::GetInstance()->mKeyMap.insert(pair<int32_t, FileEncryption::KeyInfo*>(keyV3->mVersion, keyV3));
FileEncryption::GetInstance()->SetDefaultKey();
APSARA_TEST_EQUAL(FileEncryption::GetInstance()->GetDefaultKeyVersion(), 3);
if (Sender::Instance()->GetBufferFileName() == bufferFileName)
Sender::Instance()->CreateNewFile();
bufferFileName = Sender::Instance()->GetBufferFileName();
LOG_INFO(sLogger,
("gNetWorkStat", gNetWorkStat)("gSendFailType", gSendFailType)("write thirdLogSet", logCount));
for (set<string>::iterator iter = thirdLogSet.begin(); iter != thirdLogSet.end(); ++iter) {
OneJob(1, gRootDir, "Job", true, time(NULL), *iter);
}
printf("Job done\n");
// move delete key behind sleep
// delete key 2, so second log will be discard
FileEncryption::GetInstance()->mKeyMap.erase(2);
FileEncryption::GetInstance()->SetDefaultKey();
LOG_INFO(sLogger, ("delete key version 2, networkstat", gNetWorkStat));
sleep(5);
APSARA_TEST_TRUE(Sender::Instance()->FlushOut(5 * 1000)); // this operation will write buffer file
sleep(INT32_FLAG(buffer_file_alive_interval) - 4); // debug
filesToSend.clear();
Sender::Instance()->LoadFileToSend(time(NULL), filesToSend);
// buffer file with version2 will be deleted
APSARA_TEST_EQUAL(filesToSend.size(), 2);
LOG_INFO(sLogger, ("bufferFileList", MergeVectorString(filesToSend)));
EnableNetWork();
sleep(INT32_FLAG(buffer_file_alive_interval) + 1);
LOG_INFO(sLogger, ("begin check, gNetWorkStat", gNetWorkStat));
APSARA_TEST_EQUAL(gCounter, int32_t(2 * logCount));
filesToSend.clear();
Sender::Instance()->LoadFileToSend(time(NULL), filesToSend);
APSARA_TEST_EQUAL(filesToSend.size(), 0);
size_t firstLogCount = 0;
size_t secondLogCount = 0;
size_t thirdLogCount = 0;
size_t forthLogCount = 0;
APSARA_TEST_EQUAL(gBufferLogGroups.size(), 2);
for (size_t i = 0; i < (size_t)gBufferLogGroups.size(); ++i) {
LogGroup& logGroup = gBufferLogGroups[i];
if (logGroup.category() != "app_log")
continue;
for (size_t j = 0; j < (size_t)logGroup.logs_size(); ++j) {
const Log log = logGroup.logs(j);
string ip = "";
string nothing = "";
for (size_t k = 0; k < (size_t)log.contents_size(); ++k) {
const Log_Content content = log.contents(k);
if (content.key() == "ip")
ip = content.value();
else if (content.key() == "nothing")
nothing = content.value();
}
if (ip != "10.7.241.21")
continue;
LOG_INFO(sLogger, ("nothing", nothing)("substr", nothing.substr(nothing.size() - 4)));
if (nothing.substr(nothing.size() - 4) == "_$#1") {
if (firstLogSet.find(nothing) != firstLogSet.end()) {
firstLogCount++;
firstLogSet.erase(nothing);
}
} else if (nothing.substr(nothing.size() - 4) == "_$#3") {
if (thirdLogSet.find(nothing) != thirdLogSet.end()) {
thirdLogCount++;
thirdLogSet.erase(nothing);
}
} else if (nothing.substr(nothing.size() - 4) == "_$#4") {
if (forthLogSet.find(nothing) != forthLogSet.end()) {
forthLogCount++;
forthLogSet.erase(nothing);
}
} else if (nothing.substr(nothing.size() - 4) == "_$#2") {
if (secondLogSet.find(nothing) != secondLogSet.end()) {
secondLogCount++;
secondLogSet.erase(nothing);
}
}
}
}
APSARA_TEST_EQUAL(firstLogSet.size(), 0);
APSARA_TEST_EQUAL(firstLogCount, logCount);
APSARA_TEST_EQUAL(secondLogSet.size(), logCount);
APSARA_TEST_EQUAL(secondLogCount, 0);
APSARA_TEST_EQUAL(thirdLogSet.size(), 0);
APSARA_TEST_EQUAL(thirdLogCount, logCount);
APSARA_TEST_EQUAL(forthLogSet.size(), logCount);
APSARA_TEST_EQUAL(forthLogCount, 0);
FileEncryption::GetInstance()->mKeyMap.clear();
FileEncryption::GetInstance()->LoadKeyInfo();
FileEncryption::GetInstance()->SetDefaultKey();
delete keyV1;
delete keyV2;
delete keyV3;
INT32_FLAG(merge_log_count_limit) = defaultMergeLimit;
INT32_FLAG(max_holded_data_size) = defaultHoldSize;
Sender::Instance()->mCheckPeriod = defaultCheckPeriod;
gProjectNetEnableIndex = 0;
CaseCleanUp();
LOG_INFO(sLogger, ("TestSecondaryStorage() end", time(NULL)));
}