core/unittest/sender/SenderUnittest.cpp (2,678 lines of code) (raw):
// Copyright 2022 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <assert.h>
#include "json/json.h"
#include "app_config/AppConfig.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/event/Event.h"
#include "file_server/event_handler/EventHandler.h"
#include "file_server/reader/LogFileReader.h"
#include "monitor/Monitor.h"
#include "sender/Sender.h"
#include "unittest/Unittest.h"
#if defined(__linux__)
#include <unistd.h>
#endif
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fstream>
#include <iostream>
#include <queue>
#include <set>
#include <string>
#include <typeinfo>
#include <vector>
#include "boost/regex.hpp"
#include "checkpoint/CheckpointManagerV2.h"
#include "common/FileEncryption.h"
#include "common/FileSystemUtil.h"
#include "common/Lock.h"
#include "common/LogFileCollectOffsetIndicator.h"
#include "common/MemoryBarrier.h"
#include "common/StringTools.h"
#include "common/Thread.h"
#include "common/WaitObject.h"
#include "constants/Constants.h"
#include "file_server/event_handler/LogInput.h"
#include "logger/Logger.h"
#include "monitor/AlarmManager.h"
#include "monitor/LogIntegrity.h"
#include "protobuf/sls/metric.pb.h"
#include "protobuf/sls/sls_logs.pb.h"
#include "runner/ProcessorRunner.h"
#include "sdk/Client.h"
#include "sdk/Common.h"
#include "sdk/Exception.h"
using namespace std;
using namespace sls_logs;
DECLARE_FLAG_INT32(buffer_file_alive_interval);
DECLARE_FLAG_STRING(profile_project_name);
DECLARE_FLAG_BOOL(enable_mock_send);
DECLARE_FLAG_INT32(max_holded_data_size);
DECLARE_FLAG_INT32(merge_log_count_limit);
DECLARE_FLAG_INT32(first_read_endure_bytes);
DECLARE_FLAG_STRING(ilogtail_config);
DECLARE_FLAG_STRING(user_log_config);
DECLARE_FLAG_STRING(logtail_profile_snapshot);
DECLARE_FLAG_INT32(buffer_check_period);
DECLARE_FLAG_INT32(monitor_interval);
DECLARE_FLAG_INT32(max_buffer_num);
DECLARE_FLAG_INT32(sls_host_update_interval);
DECLARE_FLAG_INT32(logtail_alarm_interval);
DECLARE_FLAG_INT32(max_client_send_error_count);
DECLARE_FLAG_INT32(client_disable_send_retry_interval);
DECLARE_FLAG_INT32(client_disable_send_retry_interval_max);
DECLARE_FLAG_INT32(max_client_quota_exceed_count);
DECLARE_FLAG_INT32(client_quota_send_concurrency_min);
DECLARE_FLAG_INT32(client_quota_send_retry_interval);
DECLARE_FLAG_INT32(client_quota_send_retry_interval_max);
DECLARE_FLAG_BOOL(send_prefer_real_ip);
DECLARE_FLAG_INT32(send_switch_real_ip_interval);
DECLARE_FLAG_BOOL(default_global_mark_offset_flag);
DECLARE_FLAG_BOOL(default_global_fuse_mode);
DECLARE_FLAG_INT32(file_eliminate_interval);
DECLARE_FLAG_INT32(dirfile_check_interval_ms);
DECLARE_FLAG_INT32(send_request_concurrency);
DECLARE_FLAG_INT32(test_unavailable_endpoint_interval);
DECLARE_FLAG_STRING(logtail_send_address);
DECLARE_FLAG_INT32(batch_send_interval);
namespace logtail {
string gRootDir = "";
int gCaseID = 0;
bool gSetTimeFlag = false;
int32_t gStartIp = 1;
int gSendFailType = 1; // 1:network error; 2:all error can write secondary; 3:all error will not write secondary
bool gLogIntegrityTestFlag = false;
bool gGlobalMarkOffsetTestFlag = false;
bool gEnableExactlyOnce = false;
const size_t kConcurrency = 8;
// warning: if you want to modify these cases, pay attention to the order
void getLogContent(char* buffer, time_t logTime, string content = "", int32_t seq = 0) {
char timeBuffer[50];
struct tm timeInfo;
#if defined(__linux__)
localtime_r(&logTime, &timeInfo);
#elif defined(_MSC_VER)
localtime_s(&timeInfo, &logTime);
#endif
char buffer1[] = "10.7.241.21"; // - - [05/Mar/2012:15:10:59 +0800] ";
char buffer2[]
= "abcdefghijklmnopqrsputskjueiguwdhruwldirudsjhdklguejsldiuuwjskldgsksjdkdjfksjsdkfjsksdjfksjdkfuujss ";
strftime(timeBuffer, sizeof(timeBuffer), " - - [%d/%b/%Y:%R:%S +0800] ", &timeInfo);
if (content == "")
sprintf(buffer, "%s%s%s%d\n", buffer1, timeBuffer, buffer2, seq);
else
sprintf(buffer, "%s%s%s%d\n", buffer1, timeBuffer, (content + " ").c_str(), seq);
}
// Write {logNum} logs to {filename}
// filename: /{workpath}/{jobname}.log
unsigned OneJob(int logNum,
string path,
string jobname,
bool jobOrNot,
time_t logTime,
string content = "",
int32_t seq = 0,
bool fixedTime = false,
int projectID = 0) {
//$1=num of log $2 path+name ;$3 job or not a job
char fileExt[32];
if (jobOrNot) {
if (projectID == 0) {
strcpy(fileExt, ".log");
} else {
sprintf(fileExt, ".log%d", projectID);
}
} else {
strcpy(fileExt, ".xlog");
}
auto out = FileAppendOpen((path + PATH_SEPARATOR + jobname + fileExt).c_str(), "ab");
if (!out) {
return 0;
}
const static unsigned BUFFER_SIZE = 1024 * 20;
char* buffer;
unsigned lines = 0;
buffer = new char[BUFFER_SIZE + 1024];
unsigned size = 0;
for (int i = 0; i < logNum; ++i) {
size = 0;
memset(buffer, 0, BUFFER_SIZE + 1024);
if (fixedTime)
getLogContent(buffer, logTime, content, seq);
else
getLogContent(buffer, logTime + i, content, seq);
size += strlen(buffer);
++lines;
fwrite(buffer, 1, size, out);
}
delete[] buffer;
fclose(out);
return lines;
}
string GenerateRandomStr(int32_t minLength, int32_t maxLength) {
int32_t length = (rand() % (maxLength - minLength + 1)) + minLength;
string randomStr = "";
for (int32_t i = 0; i < length; ++i) {
// ascii: 33 - 126
int temp = (rand() % (126 - 33 + 1)) + 33;
randomStr += (char)temp;
}
return randomStr;
}
string MergeVectorString(vector<string>& input) {
string output;
for (size_t i = 0; i < input.size(); i++) {
if (i != 0)
output.append(" | ");
output.append(input[i]);
}
return output;
}
void WriteConfigJson(bool isFilter = false,
bool hasAliuid = false,
bool hasEndpoint = false,
int projectNum = 1,
bool hasTopic = false,
int32_t maxSendRateBytes = -1,
int32_t expireTime = 0) {
Json::Value rootJson;
//"slb_aliyun"
for (int i = 0; i < projectNum; ++i) {
Json::Value commonreg_com;
char projectName[32];
sprintf(projectName, "%d_proj", 1000000 + i);
commonreg_com["project_name"] = Json::Value(projectName);
commonreg_com["category"] = Json::Value("app_log");
commonreg_com["log_type"] = Json::Value("common_reg_log");
commonreg_com["log_path"] = Json::Value(gRootDir);
commonreg_com["max_send_rate"] = Json::Value(maxSendRateBytes);
commonreg_com["send_rate_expire"] = Json::Value(expireTime);
if (i == 0) {
#if defined(__linux__)
commonreg_com["file_pattern"] = Json::Value("*.[Ll][Oo][Gg]");
#elif defined(_MSC_VER)
commonreg_com["file_pattern"] = Json::Value("*.log");
#endif
} else {
char filePattern[32];
#if defined(__linux__)
sprintf(filePattern, "*.[Ll][Oo][Gg]%d", i);
#elif defined(_MSC_VER)
sprintf(filePattern, "*.log%d", i);
#endif
commonreg_com["file_pattern"] = Json::Value(filePattern);
commonreg_com["region"] = Json::Value(projectName);
}
commonreg_com["enable"] = Json::Value(true);
commonreg_com["timeformat"] = Json::Value("%d/%b/%Y:%H:%M:%S");
commonreg_com["local_storage"] = Json::Value(true);
if (hasEndpoint) {
commonreg_com["defaultEndpoint"] = Json::Value("cn-yungu-test-intranet.log.aliyuntest.com");
}
if (hasAliuid) {
char aliuid[32];
sprintf(aliuid, "123456789%d", i);
commonreg_com["aliuid"] = Json::Value(aliuid);
}
Json::Value regs;
regs.append(Json::Value("([\\d\\.]+) \\S+ \\S+ \\[(\\S+) \\S+\\] (\\S+) (\\d+)"));
Json::Value keys;
keys.append(Json::Value("ip,time,nothing,seq"));
commonreg_com["regex"] = regs;
commonreg_com["keys"] = keys;
if (isFilter) {
Json::Value filter_keys;
filter_keys.append(Json::Value("nothing"));
filter_keys.append(Json::Value("seq"));
Json::Value filter_regs;
filter_regs.append(Json::Value("filter.*"));
filter_regs.append(Json::Value("5"));
commonreg_com["filter_keys"] = filter_keys;
commonreg_com["filter_regs"] = filter_regs;
}
if (hasTopic) {
commonreg_com["topic_format"] = Json::Value(".*/(.*)");
}
if (gEnableExactlyOnce) {
Json::Value advJSON;
advJSON["exactly_once_concurrency"] = kConcurrency;
commonreg_com["advanced"] = advJSON;
}
commonreg_com["preserve"] = Json::Value(true);
if (i == 0) {
rootJson["commonreg.com"] = commonreg_com;
} else {
char title[32];
sprintf(title, "commomreg.com_%d", i);
rootJson[title] = commonreg_com;
}
}
Json::Value secondary;
secondary["max_flow_byte_per_sec"] = Json::Value("10240000");
secondary["max_num_of_file"] = Json::Value("10");
secondary["enable_secondady"] = Json::Value(true);
Json::Value metrics;
metrics["metrics"] = rootJson;
metrics["local_storage"] = secondary;
ofstream fout("user_log_config.json");
fout << metrics << endl;
fout.close();
}
void WriteIntegrityConfigJson(int projectNum = 1) {
Json::Value rootJson;
//"slb_aliyun"
for (int i = 0; i < projectNum; ++i) {
Json::Value commonreg_com;
char projectName[32];
sprintf(projectName, "%d_proj", 1000000 + i);
commonreg_com["project_name"] = Json::Value(projectName);
commonreg_com["category"] = Json::Value("app_log");
commonreg_com["log_type"] = Json::Value("common_reg_log");
commonreg_com["log_path"] = Json::Value(gRootDir);
commonreg_com["max_send_rate"] = Json::Value(-1);
commonreg_com["send_rate_expire"] = Json::Value(0);
if (i == 0) {
#if defined(__linux__)
commonreg_com["file_pattern"] = Json::Value("*.[Ll][Oo][Gg]");
#elif defined(_MSC_VER)
commonreg_com["file_pattern"] = Json::Value("*.log");
#endif
commonreg_com["region"] = Json::Value(projectName); // not default region here
} else {
char filePattern[32];
#if defined(__linux__)
sprintf(filePattern, "*.[Ll][Oo][Gg]%d", i);
#elif defined(_MSC_VER)
sprintf(filePattern, "*.log%d", i);
#endif
commonreg_com["file_pattern"] = Json::Value(filePattern);
commonreg_com["region"] = Json::Value(projectName);
}
commonreg_com["enable"] = Json::Value(true);
commonreg_com["timeformat"] = Json::Value("%d/%b/%Y:%H:%M:%S");
commonreg_com["local_storage"] = Json::Value(true);
Json::Value regs;
regs.append(Json::Value("(.*)"));
Json::Value keys;
keys.append(Json::Value("content"));
commonreg_com["regex"] = regs;
commonreg_com["keys"] = keys;
commonreg_com["preserve"] = Json::Value(true);
if (i >= 4) {
Json::Value customizedFields;
Json::Value dataIntegrity;
//([0-9]{4})-(0[0-9]{1}|1[0-2])-(0[0-9]{1}|[12][0-9]{1}|3[01])
//(0[0-9]{1}|1[0-9]{1}|2[0-3]):[0-5][0-9]{1}:([0-5][0-9]{1})
dataIntegrity["log_time_reg"]
= Json::Value("(0[0-9]{1}|[12][0-9]{1}|3[01])/([A-Za-z]{3})/"
"([0-9]{4}):(0[0-9]{1}|1[0-9]{1}|2[0-3]):[0-5][0-9]{1}:([0-5][0-9]{1})");
dataIntegrity["logstore"] = Json::Value("test-integrity-logstore");
dataIntegrity["project_name"] = Json::Value(projectName);
dataIntegrity["switch"] = Json::Value(true);
dataIntegrity["time_pos"] = Json::Value(-1);
customizedFields["data_integrity"] = dataIntegrity;
commonreg_com["customized_fields"] = customizedFields;
}
if (i == 0) {
rootJson["commonreg.com"] = commonreg_com;
} else {
char title[32];
sprintf(title, "commomreg.com_%d", i);
rootJson[title] = commonreg_com;
}
}
Json::Value secondary;
secondary["max_flow_byte_per_sec"] = Json::Value("10240000");
secondary["max_num_of_file"] = Json::Value("10");
secondary["enable_secondady"] = Json::Value(true);
Json::Value metrics;
metrics["metrics"] = rootJson;
metrics["local_storage"] = secondary;
ofstream fout("user_log_config.json");
fout << metrics << endl;
fout.close();
}
bool gNetWorkStat;
bool gTestNetWorkStat = false;
int32_t gProjectNetEnableIndex = 0;
int32_t gAsynProjectSendFailCount = 0;
int32_t gSynProjectSendFailCount = 0;
int32_t gCounter; // send_success log num
int32_t gMessageSize;
LogGroup gRcvLogGroup; // the latest send_success log group
PTMutex gRecvLogGroupLock;
int32_t gStatusCount;
LogGroup gStatusLogGroup;
PTMutex gBufferLogGroupsLock;
vector<LogGroup> gBufferLogGroups; // send_success log group
std::mutex gMockExactlyOnceSendLock;
std::vector<RangeCheckpointPtr> gRangeCheckpoints;
std::set<std::string> gBlockedHashKeySet;
std::unique_ptr<std::thread> gDispatchThreadId;
ThreadPtr gRealIpSendThread;
int32_t gAsynMockLatencyMs = 0;
int32_t gSynMockLatencyMs = 0;
bool gSendThreadRunFlag = true;
class SenderUnittest;
bool gDisableIpFlag = true;
sdk::Client* gClient = NULL;
string gDisabledIp;
WaitObject mWait;
PTMutex mQueueLock;
struct AsyncRequest;
std::queue<AsyncRequest*> mAsynRequestQueue;
volatile bool mSignalFlag = false;
void RunningDispatcher() {
LOG_INFO(sLogger, ("RunningDispatcher", "begin"));
ConfigManager::GetInstance()->RegisterHandlers();
EventDispatcher::GetInstance()->Dispatch();
}
struct AsyncRequest {
const std::string mProjectName;
const std::string mLogstore;
const std::string mLogData;
SEND_DATA_TYPE mDataType;
int32_t mRawSize;
SlsCompressType mCompressType;
SendClosure* mSendClosure;
int64_t mBeginTime;
int32_t mSendIndex;
AsyncRequest(const std::string& projectName,
const std::string& logstore,
const std::string& logData,
SEND_DATA_TYPE dataType,
int32_t rawSize,
SlsCompressType compressType,
SendClosure* sendClosure)
: mProjectName(projectName),
mLogstore(logstore),
mLogData(logData),
mDataType(dataType),
mRawSize(rawSize),
mCompressType(compressType),
mSendClosure(sendClosure) {
static int32_t s_sendIndex = 0;
mSendIndex = s_sendIndex++;
mBeginTime = GetCurrentTimeInMilliSeconds();
}
~AsyncRequest() {
// printf("#### %d %d ms \n", mSendIndex, (int)(GetNowTime64() - mBeginTime));
}
};
static decltype(CheckpointManagerV2::GetInstance()) sCptM = nullptr;
static decltype(ExactlyOnceQueueManager::GetInstance()) sQueueM = nullptr;
static decltype(EventDispatcher::GetInstance()) sEventDispatcher = nullptr;
class SenderUnittest : public ::testing::Test {
static decltype(ProcessorRunner::GetInstance()->GetQueue().mLogstoreQueueMap)* sProcessQueueMap;
static decltype(Sender::Instance()->GetQueue().mLogstoreSenderQueueMap)* sSenderQueueMap;
void clearGlobalResource() {
sCptM->rebuild();
sQueueM->clear();
sProcessQueueMap->clear();
sSenderQueueMap->clear();
}
protected:
static void StopMockSendThread() {
gSendThreadRunFlag = false;
{
WaitObject::Lock lock(mWait);
ReadWriteBarrier();
mSignalFlag = true;
mWait.signal();
}
sleep(1);
}
static void ClearRequestQueue() {
PTScopedLock lock(mQueueLock);
while (!mAsynRequestQueue.empty()) {
delete mAsynRequestQueue.front();
mAsynRequestQueue.pop();
}
}
static vector<AsyncRequest*> GetRequest() {
vector<AsyncRequest*> reqVec;
WaitObject::Lock lock(mWait);
ReadWriteBarrier();
while (!mSignalFlag) {
mWait.wait(lock, 1000000);
}
mSignalFlag = false;
PTScopedLock mutexLock(mQueueLock);
// printf("Wait a request %d.\n", (int)mAsynRequestQueue.size());
while (!mAsynRequestQueue.empty()) {
reqVec.push_back(mAsynRequestQueue.front());
mAsynRequestQueue.pop();
}
return reqVec;
}
static void MockAsyncSendThread() {
// printf("Thread Start.\n");
while (gSendThreadRunFlag) {
vector<AsyncRequest*> reqVec = GetRequest();
for (size_t i = 0; i < reqVec.size(); ++i) {
AsyncRequest* pReq = reqVec[i];
if (gSynMockLatencyMs > 0) {
usleep(gSynMockLatencyMs * 1000);
}
// printf("Send Async Inner \n");
MockAsyncSendInner(pReq->mProjectName,
pReq->mLogstore,
pReq->mLogData,
pReq->mDataType,
pReq->mRawSize,
pReq->mCompressType,
pReq->mSendClosure);
delete pReq;
}
}
}
static sdk::GetRealIpResponse MockGetRealIp(const std::string& projectName, const std::string& logstore) {
static char ipStr[32];
PTScopedLock lock(gRecvLogGroupLock);
sdk::GetRealIpResponse rsp;
sprintf(ipStr, "10.123.32.%d", ++gStartIp);
rsp.realIp = ipStr;
return rsp;
}
static sdk::GetRealIpResponse MockGetEmptyRealIp(const std::string& projectName, const std::string& logstore) {
PTScopedLock lock(gRecvLogGroupLock);
sdk::GetRealIpResponse rsp;
rsp.realIp.clear();
return rsp;
}
static void MockSyncSend(const std::string& projectName,
const std::string& logstore,
const std::string& logData,
SEND_DATA_TYPE dataType,
int32_t rawSize,
sls_logs::SlsCompressType compressType) {
if (projectName == string("logtail-test-network-project")) {
// printf("test network %d.\n", gTestNetWorkStat ? 1 : 0);
if (gTestNetWorkStat) {
return;
}
throw sdk::LOGException(sdk::LOGE_REQUEST_ERROR, "Can not connect to server.");
}
LOG_INFO(sLogger, ("MockSyncSend, projectName", projectName)("logstore", logstore)("dataType", dataType));
vector<LogGroup> logGroupVec;
Sender::ParseLogGroupFromString(logData, dataType, rawSize, compressType, logGroupVec);
for (vector<LogGroup>::iterator iter = logGroupVec.begin(); iter != logGroupVec.end(); ++iter) {
bool projectDisabled = true;
if (iter->logs_size() > 0) {
if (gNetWorkStat && iter->category() == "app_log") {
PTScopedLock lock(gBufferLogGroupsLock);
gBufferLogGroups.push_back(*iter);
}
if (gNetWorkStat && (projectName.find("_proj") != string::npos)) /* "1000000_proj" */
{
int prjIndex = atoi(projectName.substr(0, 7).c_str()) - 1000000;
if (prjIndex <= gProjectNetEnableIndex) {
projectDisabled = false;
gRecvLogGroupLock.lock();
gRcvLogGroup = *iter;
gCounter += gRcvLogGroup.logs_size();
gMessageSize += gRcvLogGroup.ByteSize();
gRecvLogGroupLock.unlock();
LOG_INFO(
sLogger,
("gRcvLogGroup.ByteSize()", gRcvLogGroup.ByteSize())("logData.size()", logData.size()));
} else {
++gSynProjectSendFailCount;
}
}
if (gNetWorkStat && projectName == STRING_FLAG(profile_project_name)
&& iter->category() == "logtail_status_profile") {
gStatusCount++;
gStatusLogGroup = *iter;
}
}
if (!gNetWorkStat || projectDisabled) {
// printf("[@MockSyncSend] fail %s %s %d.\n", projectName.c_str(), logstore.c_str(), rawSize);
if (gSendFailType == 1)
throw sdk::LOGException(sdk::LOGE_REQUEST_ERROR, "Can not connect to server.");
else if (gSendFailType == 2) {
int randInt = rand() % 4;
if (randInt == 0)
throw sdk::LOGException(sdk::LOGE_REQUEST_ERROR, "Can not connect to server.");
else if (randInt == 1)
throw sdk::LOGException(sdk::LOGE_WRITE_QUOTA_EXCEED, "project write quota exceed.");
else if (randInt == 2)
throw sdk::LOGException(sdk::LOGE_SERVER_BUSY, "connect to server timeout.");
else if (randInt == 3)
throw sdk::LOGException(sdk::LOGE_INTERNAL_SERVER_ERROR, "connect to server timeout.");
} else if (gSendFailType == 3) {
int randInt = rand() % 5;
if (randInt == 0)
throw sdk::LOGException(sdk::LOGE_UNAUTHORIZED, "LOGE_UNAUTHORIZED.");
else if (randInt == 1)
throw sdk::LOGException(sdk::LOGE_BAD_RESPONSE, "LOGE_BAD_RESPONSE.");
else if (randInt == 2)
throw sdk::LOGException(sdk::LOGE_CATEGORY_NOT_EXIST, "LOGE_CATEGORY_NOT_EXIST.");
else if (randInt == 3)
throw sdk::LOGException(sdk::LOGE_PROJECT_NOT_EXIST, "LOGE_PROJECT_NOT_EXIST.");
else if (randInt == 4)
throw sdk::LOGException(sdk::LOGE_TOPIC_NOT_EXIST, "LOGE_TOPIC_NOT_EXIST.");
} else if (gSendFailType == 4) {
int randInt = rand() % 2;
if (randInt == 0) {
throw sdk::LOGException(sdk::LOGE_WRITE_QUOTA_EXCEED, "LOGE_WRITE_QUOTA_EXCEED");
} else if (randInt == 1) {
throw sdk::LOGException(sdk::LOGE_SHARD_WRITE_QUOTA_EXCEED, "LOGE_SHARD_WRITE_QUOTA_EXCEED");
}
} else if (gSendFailType == 5) {
int randInt = rand() % 2;
if (randInt == 0) {
throw sdk::LOGException(sdk::LOGE_SERVER_BUSY, "LOGE_WRITE_QUOTA_EXCEED");
} else if (randInt == 1) {
throw sdk::LOGException(sdk::LOGE_INTERNAL_SERVER_ERROR, "LOGE_SHARD_WRITE_QUOTA_EXCEED");
}
}
} else {
// printf("[@MockSyncSend] success %s %s %d.\n", projectName.c_str(), logstore.c_str(), rawSize);
}
}
}
static void MockAsyncSend(const std::string& projectName,
const std::string& logstore,
const std::string& logData,
SEND_DATA_TYPE dataType,
int32_t rawSize,
sls_logs::SlsCompressType compressType,
SendClosure* sendClosure) {
// printf("Insert request.\n");
// gSenderTest->MockAsyncSendInner(projectName, logstore, logData, dataType, rawSize, sendClosure);
// return;
AsyncRequest* pReq
= new AsyncRequest(projectName, logstore, logData, dataType, rawSize, compressType, sendClosure);
{
PTScopedLock lock(mQueueLock);
mAsynRequestQueue.push(pReq);
}
{
WaitObject::Lock lock(mWait);
ReadWriteBarrier();
mSignalFlag = true;
mWait.signal();
}
}
static void MockAsyncSendInner(const std::string& projectName,
const std::string& logstore,
const std::string& logData,
SEND_DATA_TYPE dataType,
int32_t rawSize,
sls_logs::SlsCompressType compressType,
SendClosure* sendClosure) {
LOG_INFO(sLogger, ("MockAsyncSend, projectName", projectName)("logstore", logstore)("dataType", dataType));
vector<LogGroup> logGroupVec;
Sender::ParseLogGroupFromString(logData, dataType, rawSize, compressType, logGroupVec);
if (gDisableIpFlag && gClient != NULL && gDisabledIp.size() > 0) {
if (gClient->GetRawSlsHost().find(gDisabledIp) != std::string::npos) {
auto sr = new sdk::PostLogStoreLogsResponse;
sr->statusCode = 500;
sr->requestId = "";
sendClosure->OnFail(sr, sdk::LOGE_REQUEST_ERROR, "timeout to connect network");
return;
}
}
bool projectDisabled = true;
for (vector<LogGroup>::iterator iter = logGroupVec.begin(); iter != logGroupVec.end(); ++iter) {
// Record LogGroup information for debug.
{
LogGroup& logGroup = *iter;
std::string logCase;
if (logGroup.logs_size() > 0) {
auto& log = logGroup.logs(0);
if (log.contents_size() >= 3) {
logCase = log.contents(2).value();
}
}
LOG_INFO(sLogger,
("LogGroupDebugInfo",
logCase)("Project", projectName)("Logstore", logstore)("LogCount", logGroup.logs_size()));
}
if (iter->logs_size() > 0) {
if (gNetWorkStat && iter->category() == "app_log") {
PTScopedLock lock(gBufferLogGroupsLock);
gBufferLogGroups.push_back(*iter);
}
if (gNetWorkStat && (projectName.find("_proj") != string::npos)) /* "1000000_proj" */
{
int prjIndex = atoi(projectName.substr(0, 7).c_str()) - 1000000;
if (prjIndex <= gProjectNetEnableIndex) {
projectDisabled = false;
gRecvLogGroupLock.lock();
gRcvLogGroup = *iter;
gCounter += gRcvLogGroup.logs_size();
gMessageSize += gRcvLogGroup.ByteSize();
gRecvLogGroupLock.unlock();
} else {
// printf("Reject %s %d\n", projectName.c_str(), gProjectNetEnableIndex);
++gAsynProjectSendFailCount;
}
}
if (gNetWorkStat && projectName == STRING_FLAG(profile_project_name)
&& iter->category() == "logtail_status_profile") {
gStatusCount++;
gStatusLogGroup = *iter;
}
}
}
// can't put this code block in for, if have multi loggroup, the sendClosure will be destroyed multi times.
if (gNetWorkStat && !projectDisabled) {
// printf("[#MockAsyncSend] success %s %s %d.\n", projectName.c_str(), logstore.c_str(), rawSize);
sdk::PostLogStoreLogsResponse* sr = new sdk::PostLogStoreLogsResponse;
sr->statusCode = 200;
sr->requestId = "mock_request_id";
sendClosure->OnSuccess(sr);
} else {
sdk::PostLogStoreLogsResponse* sr = new sdk::PostLogStoreLogsResponse;
sr->statusCode = 500;
sr->requestId = "mock_request_id";
// printf("[#MockAsyncSend] fail %s %s %d.\n", projectName.c_str(), logstore.c_str(), rawSize);
if (gSendFailType == 1)
sendClosure->OnFail(sr, sdk::LOGE_REQUEST_ERROR, "timeout to connect network");
else if (gSendFailType == 2) {
int randInt = rand() % 4;
if (randInt == 0)
sendClosure->OnFail(sr, sdk::LOGE_REQUEST_ERROR, "Can not connect to server.");
else if (randInt == 1)
sendClosure->OnFail(sr, sdk::LOGE_WRITE_QUOTA_EXCEED, "project write quota exceed.");
else if (randInt == 2)
sendClosure->OnFail(sr, sdk::LOGE_SERVER_BUSY, "connect to server timeout.");
else if (randInt == 3)
sendClosure->OnFail(sr, sdk::LOGE_INTERNAL_SERVER_ERROR, "connect to server timeout.");
} else if (gSendFailType == 3) {
int randInt = rand() % 5;
if (randInt == 0)
sendClosure->OnFail(sr, sdk::LOGE_UNAUTHORIZED, "LOGE_UNAUTHORIZED.");
else if (randInt == 1)
sendClosure->OnFail(sr, sdk::LOGE_BAD_RESPONSE, "LOGE_BAD_RESPONSE.");
else if (randInt == 2)
sendClosure->OnFail(sr, sdk::LOGE_CATEGORY_NOT_EXIST, "LOGE_CATEGORY_NOT_EXIST.");
else if (randInt == 3)
sendClosure->OnFail(sr, sdk::LOGE_PROJECT_NOT_EXIST, "LOGE_PROJECT_NOT_EXIST.");
else if (randInt == 4)
sendClosure->OnFail(sr, sdk::LOGE_TOPIC_NOT_EXIST, "LOGE_TOPIC_NOT_EXIST.");
} else if (gSendFailType == 4) {
int randInt = rand() % 2;
if (randInt == 0) {
sendClosure->OnFail(sr, sdk::LOGE_WRITE_QUOTA_EXCEED, "LOGE_WRITE_QUOTA_EXCEED");
} else if (randInt == 1) {
sendClosure->OnFail(sr, sdk::LOGE_SHARD_WRITE_QUOTA_EXCEED, "LOGE_SHARD_WRITE_QUOTA_EXCEED");
}
} else if (gSendFailType == 5) {
int randInt = rand() % 2;
if (randInt == 0) {
sendClosure->OnFail(sr, sdk::LOGE_SERVER_BUSY, "LOGE_WRITE_QUOTA_EXCEED");
} else if (randInt == 1) {
sendClosure->OnFail(sr, sdk::LOGE_INTERNAL_SERVER_ERROR, "LOGE_SHARD_WRITE_QUOTA_EXCEED");
}
}
}
}
static void MockIntegritySend(LoggroupTimeValue* data) {
const std::string& project = data->mProjectName;
if (project[6] - '0' >= 4)
LogIntegrity::GetInstance()->Notify(data, true);
}
void MockUserConfigJsonForMarkOffset() {
// remove old suer_log_config.json
bfs::remove_all(STRING_FLAG(user_log_config));
// construct json
Json::Value root;
Json::Value metrics;
Json::Value commonreg_com;
commonreg_com["project_name"] = Json::Value("1000000_proj");
commonreg_com["category"] = Json::Value("app_log");
commonreg_com["log_path"] = Json::Value(gRootDir + "MarkOffsetTest");
commonreg_com["file_pattern"] = Json::Value("*.log");
commonreg_com["log_type"] = Json::Value("common_reg_log");
commonreg_com["max_send_rate"] = Json::Value(-1);
commonreg_com["send_rate_expire"] = Json::Value(0);
commonreg_com["timeformat"] = Json::Value("%Y-%m-%d %H:%M:%S");
Json::Value customized_fields;
customized_fields["mark_offset"] = Json::Value(true);
customized_fields["fuse_mode"] = Json::Value(false);
commonreg_com["customized_fields"] = customized_fields;
commonreg_com["enable"] = Json::Value(true);
commonreg_com["preserve"] = Json::Value(true);
commonreg_com["max_depth"] = Json::Value(10);
Json::Value regs;
regs.append(Json::Value("(.*)"));
Json::Value keys;
keys.append(Json::Value("content"));
commonreg_com["regex"] = regs;
commonreg_com["keys"] = keys;
commonreg_com["version"] = Json::Value(1);
metrics["commonreg.com"] = commonreg_com;
root["metrics"] = metrics;
// write json file
std::ofstream file(STRING_FLAG(user_log_config).c_str());
file << root.toStyledString();
file.close();
}
static void EnableNetWork() { gNetWorkStat = true; }
static void DisableNetWork() { gNetWorkStat = false; }
public:
static void SetUpTestCase() // void Setup()
{
sLogger->set_level(spdlog::level::trace);
printf("Test case setup.\n");
srand(time(NULL));
Sender::Instance()->AddEndpointEntry(STRING_FLAG(default_region_name),
STRING_FLAG(logtail_send_address),
SLSClientManager::EndpointSourceType::LOCAL);
STRING_FLAG(profile_project_name) = "sls-admin";
INT32_FLAG(sls_host_update_interval) = 1;
INT32_FLAG(logtail_alarm_interval) = 600;
BOOL_FLAG(enable_mock_send) = true;
gRootDir = GetProcessExecutionDir();
if (PATH_SEPARATOR[0] == gRootDir.at(gRootDir.size() - 1))
gRootDir.resize(gRootDir.size() - 1);
gRootDir += PATH_SEPARATOR + "SenderUnittest";
bfs::remove_all(gRootDir);
auto const sysConfDir = gRootDir + PATH_SEPARATOR + ".ilogtail" + PATH_SEPARATOR;
bfs::create_directories(sysConfDir);
AppConfig::GetInstance()->SetLoongcollectorConfDir(sysConfDir);
sCptM = CheckpointManagerV2::GetInstance();
sQueueM = ExactlyOnceQueueManager::GetInstance();
sEventDispatcher = EventDispatcher::GetInstance();
sProcessQueueMap = &(ProcessorRunner::GetInstance()->GetQueue().mLogstoreQueueMap);
sSenderQueueMap = &(Sender::Instance()->GetQueue().mLogstoreSenderQueueMap);
new Thread(&SenderUnittest::MockAsyncSendThread);
}
static void TearDownTestCase() // void CleanUp()
{
StopMockSendThread();
try {
bfs::remove_all(gRootDir);
} catch (...) {
}
}
void CaseSetUp(bool isFilter = false,
bool hasAliuid = false,
bool hasEndpoint = false,
int projectNum = 1,
bool hasTopic = false,
int32_t maxSendRateBytes = -1,
int32_t expireTime = 0,
int32_t clientDisableRetry = 0) {
// Test and enable container mode (if the special local file is existing).
if (bfs::exists("LogtailContainerModeTest")) {
cout << "replace with container config" << endl;
ReplaceWithContainerModeConfig();
}
ClearRequestQueue();
gAsynMockLatencyMs = 0;
gSynMockLatencyMs = 0;
INT32_FLAG(client_disable_send_retry_interval) = clientDisableRetry;
INT32_FLAG(buffer_check_period) = 2;
INT32_FLAG(buffer_file_alive_interval) = 3;
gProjectNetEnableIndex = 0;
gAsynProjectSendFailCount = 0;
gSynProjectSendFailCount = 0;
gSendFailType = 1;
gTestNetWorkStat = false;
if (gLogIntegrityTestFlag)
WriteIntegrityConfigJson(projectNum);
else if (gGlobalMarkOffsetTestFlag)
MockUserConfigJsonForMarkOffset();
else
WriteConfigJson(isFilter, hasAliuid, hasEndpoint, projectNum, hasTopic, maxSendRateBytes, expireTime);
{
PTScopedLock lock(gBufferLogGroupsLock);
gBufferLogGroups.clear();
}
bfs::remove("loongcollector_config.json");
{
fsutil::Dir dir(gRootDir);
dir.Open();
fsutil::Entry entry;
while ((entry = dir.ReadNext(false))) {
auto fullPath = gRootDir + PATH_SEPARATOR + entry.Name();
auto targetPath = gRootDir + PATH_SEPARATOR + ".." + PATH_SEPARATOR + entry.Name();
bfs::rename(fullPath, targetPath);
bfs::remove_all(targetPath);
}
}
sCptM->close();
bfs::remove_all(gRootDir);
bfs::create_directories(AppConfig::GetInstance()->GetLoongcollectorConfDir());
sCptM->open();
if (gEnableExactlyOnce) {
clearGlobalResource();
}
AppConfig::GetInstance()->LoadAppConfig(STRING_FLAG(ilogtail_config));
bool ret = ConfigManager::GetInstance()->LoadConfig(STRING_FLAG(user_log_config));
assert(ret);
ret = Sender::Instance()->Init();
assert(ret);
if (gLogIntegrityTestFlag) {
Sender::Instance()->MockIntegritySend = MockIntegritySend;
} else if (gEnableExactlyOnce) {
gRangeCheckpoints.clear();
gBlockedHashKeySet.clear();
Sender::Instance()->MockIntegritySend = MockExactlyOnceSend;
} else {
Sender::Instance()->MockIntegritySend = NULL;
}
Sender::Instance()->MockAsyncSend = MockAsyncSend;
Sender::Instance()->MockSyncSend = MockSyncSend;
Sender::Instance()->mStopRealIpThread = false;
gCounter = 0;
gMessageSize = 0;
vector<string> filesToSend;
Sender::Instance()->LoadFileToSend(time(NULL), filesToSend);
for (size_t i = 0; i < filesToSend.size(); ++i) {
remove((Sender::Instance()->mBufferFilePath + filesToSend[i]).c_str());
}
gDispatchThreadId.reset(new std::thread(RunningDispatcher));
sleep(1);
}
void ReplaceWithContainerModeConfig() {
Json::Value logtailConfig;
logtailConfig["container_mode"] = Json::Value(true);
logtailConfig["working_ip"] = Json::Value("1.2.3.4");
logtailConfig["working_hostname"] = Json::Value("sls-zc-test");
logtailConfig["container_mount_path"] = Json::Value("./container_mount_test.json");
ofstream fout(STRING_FLAG(ilogtail_config).c_str());
fout << logtailConfig.toStyledString() << endl;
fout.close();
Json::Value mountConfig;
mountConfig["version"] = Json::Value("0.1.0");
mountConfig["container_name"] = Json::Value("logtail-docker");
mountConfig["container_id"] = Json::Value("abcdef1234567890");
mountConfig["host_path"] = Json::Value("/");
Json::Value mount1;
mount1["destination"] = "/";
mount1["source"] = "";
Json::Value mount2;
mount2["destination"] = "/home/admin/logs";
mount2["source"] = "/home/admin/t4/docker/logs";
Json::Value mount3;
mount3["destination"] = "/app_2";
mount3["source"] = "/yyyy";
Json::Value mount4;
mount4["destination"] = "/app_2/xxx";
mount4["source"] = "/xxx";
Json::Value mountArray;
mountArray.append(mount1);
mountArray.append(mount2);
mountArray.append(mount3);
mountArray.append(mount4);
mountConfig["container_mount"] = mountArray;
ofstream foutMount("./container_mount_test.json");
foutMount << mountConfig.toStyledString() << endl;
foutMount.close();
}
template <class T>
void ClearMap(T& m) {
for (auto& iter : m) {
delete iter.second;
}
m.clear();
}
void CaseCleanUp() {
LogInput::GetInstance()->CleanEnviroments();
sleep(1);
EventDispatcher::GetInstance()->CleanEnviroments();
ProcessorRunner::GetInstance()->CleanEnviroments();
Sender::Instance()->RemoveSender();
if (gRealIpSendThread) {
Sender::Instance()->mStopRealIpThread = true;
gRealIpSendThread.reset();
PTScopedLock lock(Sender::Instance()->mRegionRealIpLock);
ClearMap(Sender::Instance()->mRegionRealIpMap);
}
{
PTScopedLock lock(Sender::Instance()->mRegionEndpointEntryMapLock);
ClearMap(Sender::Instance()->mRegionEndpointEntryMap);
}
{
PTScopedLock lock(Sender::Instance()->mSendClientLock);
ClearMap(Sender::Instance()->mSendClientMap);
}
gRecvLogGroupLock.lock();
gRcvLogGroup.Clear();
gRecvLogGroupLock.unlock();
ConfigManager::GetInstance()->CleanEnviroments();
delete ConfigManager::GetInstance()->mSharedHandler;
ConfigManager::GetInstance()->mSharedHandler = NULL;
LogIntegrity::GetInstance()->ClearForTest();
ClearRequestQueue();
{
PTScopedLock lock(gBufferLogGroupsLock);
gBufferLogGroups.clear();
}
gDispatchThreadId->join();
gDispatchThreadId = nullptr;
sCptM->close();
bfs::remove_all(gRootDir);
PollingDirFile::GetInstance()->ClearCache();
PollingModify::GetInstance()->ClearCache();
gEnableExactlyOnce = false;
gLogIntegrityTestFlag = false;
gGlobalMarkOffsetTestFlag = false;
}
LogstoreSenderInfo* GetSenderInfo(int projectId) {
char projectName[32];
sprintf(projectName, "%d_proj", 1000000 + projectId);
string logstore("app_log");
LogstoreFeedBackKey key = GenerateLogstoreFeedBackKey(string(projectName), logstore);
LogstoreSenderInfo* pInfo = Sender::Instance()->mSenderQueue.GetSenderInfo(key);
if (pInfo == NULL) {
APSARA_TEST_TRUE_DESC(false, string(projectName) + " " + logstore + " send queue does not exist.");
return NULL;
}
return pInfo;
}
void CheckSendClientStatus(int projectId, bool canSend) {
LogstoreSenderInfo* pInfo = GetSenderInfo(projectId);
if (pInfo == NULL) {
return;
}
APSARA_TEST_EQUAL_DESC(pInfo->CanSend(time(NULL)), canSend, string("projectid : ") + ToString(projectId));
// printf("EQUAL %d %d\n", projectId, canSend ? 1 : 0);
}
const int PROJECT_NUM = 8;
const int32_t ROUND_NUM = 10;
// 100 logs per round to let log time to be seperated in different minutes,
// so there will be two log groups.
const int32_t LOG_COUNT_PER_PROJECT = 100;
const int32_t TOTAL_LOG_COUNT_PER_PROJECT = ROUND_NUM * LOG_COUNT_PER_PROJECT;
const int32_t LOG_SEQ_NO = 5;
void TestMultiUserSeperationAndRetryIntervalRecovery() {
LOG_INFO(sLogger, ("TestMultiUserSeperationAndRetryIntervalRecovery() begin", time(NULL)));
EnableNetWork();
CaseSetUp(false, true, true, PROJECT_NUM, false, -1, 0, 900);
int32_t defaultSendErrorCount = INT32_FLAG(max_client_send_error_count);
INT32_FLAG(max_client_send_error_count) = 5;
Sender::Instance()->MockTestEndpoint = MockSyncSend;
gProjectNetEnableIndex = PROJECT_NUM / 2 - 1;
// Generate 1000 logs for each project.
for (int round = 0; round < ROUND_NUM; ++round) {
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
OneJob(LOG_COUNT_PER_PROJECT,
gRootDir,
"Job",
true,
time(NULL),
"TestMultiUserSeperation",
LOG_SEQ_NO,
false,
prjIndex);
}
usleep(200 * 1000);
}
sleep(20);
// 4 enabled projects (half) can send logs, so:
// - 4000 logs are received.
// - at least 4 projects have send failures.
// - only 4 enabled projects' clients are normal (CanSend).
APSARA_TEST_EQUAL(gCounter, PROJECT_NUM / 2 * TOTAL_LOG_COUNT_PER_PROJECT);
APSARA_TEST_TRUE(gAsynProjectSendFailCount >= PROJECT_NUM / 2);
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
CheckSendClientStatus(prjIndex, prjIndex <= gProjectNetEnableIndex);
}
gRecvLogGroupLock.lock();
const Log& log = gRcvLogGroup.logs(0);
APSARA_TEST_EQUAL(log.contents(2).value(), "TestMultiUserSeperation");
APSARA_TEST_EQUAL(log.contents(3).value(), ToString(LOG_SEQ_NO));
gRecvLogGroupLock.unlock();
// Enable the network of the other 4 projects, try to recover by resetting retry interval
// to give sender a change to resend (in method LogstoreSenderInfo::CanSend).
gAsynProjectSendFailCount = 0;
gSynProjectSendFailCount = 0;
gCounter = 0;
gProjectNetEnableIndex = PROJECT_NUM;
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
LogstoreSenderInfo* info = GetSenderInfo(prjIndex);
if (info != NULL) {
info->mLastNetworkErrorTime = 0;
}
}
// Generate more logs (1000 per project).
for (int round = 0; round < ROUND_NUM; ++round) {
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
OneJob(LOG_COUNT_PER_PROJECT,
gRootDir,
"Job",
true,
time(NULL),
"TestMultiUserSeperationSecond",
LOG_SEQ_NO,
false,
prjIndex);
}
usleep(100 * 1000);
}
sleep(20);
// All projects can send logs, so:
// - 4000 logs in round 1 and 8000 logs in round 2 are received.
// - no project failure.
// - all clients are normal.
APSARA_TEST_EQUAL(gCounter, TOTAL_LOG_COUNT_PER_PROJECT * (PROJECT_NUM / 2 + PROJECT_NUM));
APSARA_TEST_EQUAL(gAsynProjectSendFailCount, 0);
APSARA_TEST_EQUAL(gSynProjectSendFailCount, 0);
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
CheckSendClientStatus(prjIndex, true);
}
// Case cleanup.
INT32_FLAG(max_client_send_error_count) = defaultSendErrorCount;
gProjectNetEnableIndex = 0;
CaseCleanUp();
Sender::Instance()->MockTestEndpoint = NULL;
LOG_INFO(sLogger, ("TestMultiUserSeperationAndRetryIntervalRecovery() end", time(NULL)));
}
void TestMultiUserSeperationAndTestNetWorkRecovery() {
LOG_INFO(sLogger, ("TestMultiUserSeperationAndTestNetWorkRecovery() begin", time(NULL)));
EnableNetWork();
CaseSetUp(false, true, true, PROJECT_NUM, false, -1, 0, 900);
int32_t defaultSendErrorCount = INT32_FLAG(max_client_send_error_count);
INT32_FLAG(max_client_send_error_count) = 5;
int32_t defaultTestEndpointInterval = INT32_FLAG(test_unavailable_endpoint_interval);
INT32_FLAG(test_unavailable_endpoint_interval) = 5; // Quick recover for disabled projects.
Sender::Instance()->MockTestEndpoint = MockSyncSend;
gProjectNetEnableIndex = PROJECT_NUM / 2 - 1;
// Generate 1000 logs for each project.
for (int round = 0; round < ROUND_NUM; ++round) {
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
OneJob(LOG_COUNT_PER_PROJECT,
gRootDir,
"Job",
true,
time(NULL),
"TestMultiUserSeperation",
LOG_SEQ_NO,
false,
prjIndex);
}
usleep(200 * 1000);
}
sleep(20);
// 4 enabled projects (half) can send logs, so:
// - 4000 logs are received.
// - at least 4 projects have send failures.
// - only 4 enabled projects' clients are normal (CanSend).
APSARA_TEST_EQUAL(gCounter, PROJECT_NUM / 2 * TOTAL_LOG_COUNT_PER_PROJECT);
APSARA_TEST_TRUE(gAsynProjectSendFailCount >= PROJECT_NUM / 2);
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
CheckSendClientStatus(prjIndex, prjIndex <= gProjectNetEnableIndex);
}
gRecvLogGroupLock.lock();
const Log& log = gRcvLogGroup.logs(0);
APSARA_TEST_EQUAL(log.contents(2).value(), "TestMultiUserSeperation");
APSARA_TEST_EQUAL(log.contents(3).value(), ToString(LOG_SEQ_NO));
gRecvLogGroupLock.unlock();
// Enable the network of the other 4 projects, try to recover by TestEndpoint.
gAsynProjectSendFailCount = 0;
gSynProjectSendFailCount = 0;
gCounter = 0;
gProjectNetEnableIndex = PROJECT_NUM;
gTestNetWorkStat = true;
sleep(INT32_FLAG(test_unavailable_endpoint_interval));
for (int32_t idx = 0; idx < INT32_FLAG(test_unavailable_endpoint_interval) * 2; ++idx) {
if (gCounter == TOTAL_LOG_COUNT_PER_PROJECT * PROJECT_NUM / 2) {
break;
}
sleep(1);
}
// Generate more logs (1000 per project).
for (int round = 0; round < ROUND_NUM; ++round) {
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
OneJob(LOG_COUNT_PER_PROJECT,
gRootDir,
"Job",
true,
time(NULL),
"TestMultiUserSeperationSecond",
LOG_SEQ_NO,
false,
prjIndex);
}
usleep(100 * 1000);
}
sleep(20);
// All projects can send logs, so:
// - 4000 logs in round 1 and 8000 logs in round 2 are received.
// - no project failure.
// - all clients are normal.
APSARA_TEST_EQUAL(gCounter, TOTAL_LOG_COUNT_PER_PROJECT * (PROJECT_NUM / 2 + PROJECT_NUM));
APSARA_TEST_EQUAL(gAsynProjectSendFailCount, 0);
APSARA_TEST_EQUAL(gSynProjectSendFailCount, 0);
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
CheckSendClientStatus(prjIndex, true);
}
// Case cleanup.
Sender::Instance()->MockTestEndpoint = NULL;
INT32_FLAG(max_client_send_error_count) = defaultSendErrorCount;
INT32_FLAG(test_unavailable_endpoint_interval) = defaultTestEndpointInterval;
gProjectNetEnableIndex = 0;
CaseCleanUp();
Sender::Instance()->MockTestEndpoint = NULL;
LOG_INFO(sLogger, ("TestMultiUserSeperationAndTestNetWorkRecovery() end", time(NULL)));
}
void TestMultiUserSeperationAndRetryQuotaRecovery() {
LOG_INFO(sLogger, ("TestMultiUserSeperationAndRetryQuotaRecovery() begin", time(NULL)));
EnableNetWork();
CaseSetUp(false, true, true, PROJECT_NUM);
auto bakClientQuotaSendRetryInterval = INT32_FLAG(client_quota_send_retry_interval);
INT32_FLAG(client_quota_send_retry_interval) = 900;
int32_t defaultQuotaErrorCount = INT32_FLAG(max_client_quota_exceed_count);
INT32_FLAG(max_client_quota_exceed_count) = 5;
int32_t seqNo = 5;
// try to set net stat ok, this will set quota interval
gProjectNetEnableIndex = PROJECT_NUM;
for (int round = 0; round < 1; ++round) {
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
// use 100, let time to be seperated in different minutes, so there will be two packets
OneJob(10, gRootDir, "Job", true, time(NULL), "TestMultiUserSeperation", seqNo, false, prjIndex);
}
usleep(200 * 1000);
}
WaitForFileBeenRead();
// printf("[###] %d %d\n", gCounter, gAsynProjectSendFailCount);
sleep(10);
APSARA_TEST_EQUAL(gCounter, 10 * PROJECT_NUM);
gCounter = 0;
Sender::Instance()->MockTestEndpoint = MockSyncSend;
gProjectNetEnableIndex = PROJECT_NUM / 2 - 1;
// quota exceed alarm
gSendFailType = 4;
for (int round = 0; round < 10; ++round) {
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
// use 100, let time to be seperated in different minutes, so there will be two packets
OneJob(100, gRootDir, "Job", true, time(NULL), "TestMultiUserSeperation", seqNo, false, prjIndex);
}
usleep(200 * 1000);
}
WaitForFileBeenRead();
// printf("[###] %d %d\n", gCounter, gAsynProjectSendFailCount);
sleep(20);
// printf("[###] %d %d\n", gCounter, gAsynProjectSendFailCount);
APSARA_TEST_EQUAL(gCounter, PROJECT_NUM / 2 * 1000);
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
CheckSendClientStatus(prjIndex, prjIndex <= gProjectNetEnableIndex);
}
gRecvLogGroupLock.lock();
const Log& log = gRcvLogGroup.logs(0);
APSARA_TEST_EQUAL(log.contents(2).value(), "TestMultiUserSeperation");
APSARA_TEST_EQUAL(log.contents(3).value(), ToString(seqNo));
APSARA_TEST_TRUE(gAsynProjectSendFailCount >= PROJECT_NUM / 2);
gRecvLogGroupLock.unlock();
// APSARA_TEST_TRUE(gSynProjectSendFailCount > 0);
// try to recover the connection
// set lastNetworkErrorTime 0 to give sender a chance to send again.
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
LogstoreSenderInfo* pInfo = GetSenderInfo(prjIndex);
if (pInfo != NULL) {
pInfo->mLastNetworkErrorTime = 0;
pInfo->mLastQuotaExceedTime = 0;
}
}
gAsynProjectSendFailCount = 0;
gSynProjectSendFailCount = 0;
gProjectNetEnableIndex = PROJECT_NUM;
gCounter = 0;
for (int round = 0; round < 10; ++round) {
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
OneJob(100, gRootDir, "Job", true, time(NULL), "TestMultiUserSeperationSecond", seqNo, false, prjIndex);
}
usleep(100 * 1000);
}
// printf("[###]\n");
WaitForFileBeenRead();
sleep(20);
// printf("[###]\n");
// 100 first send and 50 secondary
APSARA_TEST_EQUAL(gCounter, 1500 * PROJECT_NUM);
APSARA_TEST_TRUE(gAsynProjectSendFailCount == 0);
APSARA_TEST_TRUE(gSynProjectSendFailCount == 0);
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
CheckSendClientStatus(prjIndex, true);
}
INT32_FLAG(max_client_quota_exceed_count) = defaultQuotaErrorCount;
gProjectNetEnableIndex = 0;
CaseCleanUp();
INT32_FLAG(client_quota_send_retry_interval) = bakClientQuotaSendRetryInterval;
Sender::Instance()->MockTestEndpoint = NULL;
LOG_INFO(sLogger, ("TestMultiUserSeperationAndRetryQuotaRecovery() end", time(NULL)));
}
void TestMultiUserSeperationAndDiscardFailAndOtherFail() {
LOG_INFO(sLogger, ("TestMultiUserSeperationAndDiscardFailAndOtherFail() begin", time(NULL)));
EnableNetWork();
CaseSetUp(false, true, true, PROJECT_NUM);
INT32_FLAG(client_disable_send_retry_interval) = 900;
int32_t defaultSendErrorCount = INT32_FLAG(max_client_send_error_count);
INT32_FLAG(max_client_send_error_count) = 5;
Sender::Instance()->MockTestEndpoint = MockSyncSend;
gProjectNetEnableIndex = PROJECT_NUM / 2 - 1;
// discard error
gSendFailType = 3;
int32_t seqNo = 5;
for (int round = 0; round < ROUND_NUM; ++round) {
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
// use 100, let time to be seperated in different minutes, so there will be two packets
OneJob(LOG_COUNT_PER_PROJECT,
gRootDir,
"Job",
true,
time(NULL),
"TestMultiUserSeperation",
seqNo,
false,
prjIndex);
}
usleep(200 * 1000);
}
// printf("[###] %d %d\n", gCounter, gAsynProjectSendFailCount);
WaitForFileBeenRead();
while (Sender::Instance()->mSenderQueue.IsEmpty() == false) {
LOG_INFO(sLogger, ("sender queue not empty", "try again"));
sleep(1);
}
#if defined(_MSC_VER)
// It's not very stable on Windows, so we have to wait for a long time.
// TODO: Figure it out why???
sleep(40);
#endif
// printf("[###] %d %d\n", gCounter, gAsynProjectSendFailCount);
// half data will be discard
APSARA_TEST_EQUAL(gCounter, PROJECT_NUM / 2 * TOTAL_LOG_COUNT_PER_PROJECT);
// discard data error will not change send client status
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
CheckSendClientStatus(prjIndex, true);
}
gRecvLogGroupLock.lock();
const Log& log = gRcvLogGroup.logs(0);
APSARA_TEST_EQUAL(log.contents(2).value(), "TestMultiUserSeperation");
APSARA_TEST_EQUAL(log.contents(3).value(), ToString(seqNo));
APSARA_TEST_TRUE(gAsynProjectSendFailCount >= PROJECT_NUM / 2);
gRecvLogGroupLock.unlock();
// APSARA_TEST_TRUE(gSynProjectSendFailCount > 0);
// server busy error
gSendFailType = 5;
gAsynProjectSendFailCount = 0;
gSynProjectSendFailCount = 0;
gCounter = 0;
for (int round = 0; round < ROUND_NUM; ++round) {
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
OneJob(LOG_COUNT_PER_PROJECT,
gRootDir,
"Job",
true,
time(NULL),
"TestMultiUserSeperationSecond",
seqNo,
false,
prjIndex);
}
usleep(100 * 1000);
}
// printf("[###]\n");
WaitForFileBeenRead();
sleep(10);
// printf("[###]\n");
// server error will send fail, so send success count is 50
APSARA_TEST_EQUAL(gCounter, TOTAL_LOG_COUNT_PER_PROJECT * PROJECT_NUM / 2);
// server error will not change send client status
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
CheckSendClientStatus(prjIndex, true);
}
gProjectNetEnableIndex = PROJECT_NUM;
sleep(10);
INT32_FLAG(max_client_send_error_count) = defaultSendErrorCount;
gProjectNetEnableIndex = 0;
CaseCleanUp();
Sender::Instance()->MockTestEndpoint = NULL;
LOG_INFO(sLogger, ("TestMultiUserSeperationAndDiscardFailAndOtherFail() end", time(NULL)));
}
void TestSecondaryStorage() {
LOG_INFO(sLogger, ("TestSecondaryStorage() begin", time(NULL)));
CaseSetUp();
auto sender = Sender::Instance();
gProjectNetEnableIndex = 1000000;
int32_t defaultCheckPeriod = sender->mCheckPeriod;
sender->mCheckPeriod = 1;
INT32_FLAG(buffer_file_alive_interval) = 4 * INT32_FLAG(batch_send_interval) + 5;
DisableNetWork();
int32_t defaultMergeLimit = INT32_FLAG(merge_log_count_limit);
INT32_FLAG(merge_log_count_limit) = 1024 * 1024 * 10;
int32_t defaultHoldSize = INT32_FLAG(max_holded_data_size);
INT32_FLAG(max_holded_data_size) = 20 * 1024 * 1024;
set<string> firstLogSet;
set<string> secondLogSet;
set<string> thirdLogSet;
set<string> forthLogSet;
size_t logCount = 30;
while (firstLogSet.size() < logCount) {
string first = GenerateRandomStr(1, 1024 * 2) + "_$#1";
if (firstLogSet.find(first) == firstLogSet.end())
firstLogSet.insert(first);
}
while (secondLogSet.size() < logCount) {
string second = GenerateRandomStr(1, 1024 * 2) + "_$#2";
if (secondLogSet.find(second) == secondLogSet.end())
secondLogSet.insert(second);
}
while (thirdLogSet.size() < logCount) {
string third = GenerateRandomStr(1, 1024 * 2) + "_$#3";
if (thirdLogSet.find(third) == thirdLogSet.end())
thirdLogSet.insert(third);
}
while (forthLogSet.size() < logCount) {
string forth = GenerateRandomStr(1, 1024 * 2) + "_$#4";
if (forthLogSet.find(forth) == forthLogSet.end())
forthLogSet.insert(forth);
}
FileEncryption::GetInstance()->mKeyMap.clear();
FileEncryption::KeyInfo* keyV1 = new FileEncryption::KeyInfo("73056101345ceb496c5574b8c6b7fc0e", 1);
FileEncryption::GetInstance()->mKeyMap.insert(pair<int32_t, FileEncryption::KeyInfo*>(keyV1->mVersion, keyV1));
FileEncryption::GetInstance()->SetDefaultKey();
APSARA_TEST_EQUAL(FileEncryption::GetInstance()->GetDefaultKeyVersion(), 1);
// SendFailType equals to 3 means that sending error will not write to secondary.
// Generate logs and wait enough time so that they can arrive sender queue.
gSendFailType = 3;
LOG_INFO(sLogger,
("gNetWorkStat", gNetWorkStat)("gSendFailType", gSendFailType)("write forthLogSet", logCount));
for (set<string>::iterator iter = forthLogSet.begin(); iter != forthLogSet.end(); ++iter) {
OneJob(1, gRootDir, "Job", true, time(NULL), *iter);
}
printf("Job done\n");
sleep(10);
APSARA_TEST_TRUE(sender->FlushOut(5 * 1000)); // this operation will write buffer file
sleep(INT32_FLAG(buffer_file_alive_interval) - 4);
vector<string> filesToSend;
// send error will be discard, so second buffer is null
sender->LoadFileToSend(time(NULL), filesToSend);
APSARA_TEST_EQUAL(filesToSend.size(), 0);
// SendFailType equals to 2 means that sending error will write to secondary.
gSendFailType = 2; // test all retryable send error
LOG_INFO(sLogger,
("gNetWorkStat", gNetWorkStat)("gSendFailType", gSendFailType)("write firstLogSet", logCount));
for (set<string>::iterator iter = firstLogSet.begin(); iter != firstLogSet.end(); ++iter) {
OneJob(1, gRootDir, "Job", true, time(NULL), *iter);
}
printf("Job done\n");
sleep(5);
APSARA_TEST_TRUE(sender->FlushOut(5 * 1000)); // this operation will write buffer file
sleep(INT32_FLAG(buffer_file_alive_interval) - 4);
// Data sent failed will be written to secondary, so there is a local file.
filesToSend.clear();
sender->LoadFileToSend(time(NULL), filesToSend);
APSARA_TEST_EQUAL(filesToSend.size(), 1);
LOG_INFO(sLogger, ("bufferFileList", MergeVectorString(filesToSend)));
FileEncryption::KeyInfo* keyV2 = new FileEncryption::KeyInfo("c134368ce7840a4e217ee6d8c27a7e0f", 2);
FileEncryption::GetInstance()->mKeyMap.insert(pair<int32_t, FileEncryption::KeyInfo*>(keyV2->mVersion, keyV2));
FileEncryption::GetInstance()->SetDefaultKey();
APSARA_TEST_EQUAL(FileEncryption::GetInstance()->GetDefaultKeyVersion(), 2);
// Refresh buffer file manually.
string bufferFileName = sender->GetBufferFileName();
if (sender->GetBufferFileName() == bufferFileName)
sender->CreateNewFile();
bufferFileName = sender->GetBufferFileName();
LOG_INFO(sLogger,
("gNetWorkStat", gNetWorkStat)("gSendFailType", gSendFailType)("write secondLogSet", logCount));
gSendFailType = 1; // test network error
for (set<string>::iterator iter = secondLogSet.begin(); iter != secondLogSet.end(); ++iter) {
OneJob(1, gRootDir, "Job", true, time(NULL), *iter);
}
printf("Job done\n");
sleep(5);
APSARA_TEST_TRUE(Sender::Instance()->FlushOut(5 * 1000)); // this operation will write buffer file
sleep(INT32_FLAG(buffer_file_alive_interval) - 4);
filesToSend.clear();
Sender::Instance()->LoadFileToSend(time(NULL), filesToSend);
APSARA_TEST_EQUAL(filesToSend.size(), 2);
LOG_INFO(sLogger, ("bufferFileList", MergeVectorString(filesToSend)));
FileEncryption::KeyInfo* keyV3 = new FileEncryption::KeyInfo("e1b07c24b3340c94945c058db193d8f4", 3);
FileEncryption::GetInstance()->mKeyMap.insert(pair<int32_t, FileEncryption::KeyInfo*>(keyV3->mVersion, keyV3));
FileEncryption::GetInstance()->SetDefaultKey();
APSARA_TEST_EQUAL(FileEncryption::GetInstance()->GetDefaultKeyVersion(), 3);
if (Sender::Instance()->GetBufferFileName() == bufferFileName)
Sender::Instance()->CreateNewFile();
bufferFileName = Sender::Instance()->GetBufferFileName();
LOG_INFO(sLogger,
("gNetWorkStat", gNetWorkStat)("gSendFailType", gSendFailType)("write thirdLogSet", logCount));
for (set<string>::iterator iter = thirdLogSet.begin(); iter != thirdLogSet.end(); ++iter) {
OneJob(1, gRootDir, "Job", true, time(NULL), *iter);
}
printf("Job done\n");
// move delete key behind sleep
// delete key 2, so second log will be discard
FileEncryption::GetInstance()->mKeyMap.erase(2);
FileEncryption::GetInstance()->SetDefaultKey();
LOG_INFO(sLogger, ("delete key version 2, networkstat", gNetWorkStat));
sleep(5);
APSARA_TEST_TRUE(Sender::Instance()->FlushOut(5 * 1000)); // this operation will write buffer file
sleep(INT32_FLAG(buffer_file_alive_interval) - 4); // debug
filesToSend.clear();
Sender::Instance()->LoadFileToSend(time(NULL), filesToSend);
// buffer file with version2 will be deleted
APSARA_TEST_EQUAL(filesToSend.size(), 2);
LOG_INFO(sLogger, ("bufferFileList", MergeVectorString(filesToSend)));
EnableNetWork();
sleep(INT32_FLAG(buffer_file_alive_interval) + 1);
LOG_INFO(sLogger, ("begin check, gNetWorkStat", gNetWorkStat));
APSARA_TEST_EQUAL(gCounter, int32_t(2 * logCount));
filesToSend.clear();
Sender::Instance()->LoadFileToSend(time(NULL), filesToSend);
APSARA_TEST_EQUAL(filesToSend.size(), 0);
size_t firstLogCount = 0;
size_t secondLogCount = 0;
size_t thirdLogCount = 0;
size_t forthLogCount = 0;
APSARA_TEST_EQUAL(gBufferLogGroups.size(), 2);
for (size_t i = 0; i < (size_t)gBufferLogGroups.size(); ++i) {
LogGroup& logGroup = gBufferLogGroups[i];
if (logGroup.category() != "app_log")
continue;
for (size_t j = 0; j < (size_t)logGroup.logs_size(); ++j) {
const Log log = logGroup.logs(j);
string ip = "";
string nothing = "";
for (size_t k = 0; k < (size_t)log.contents_size(); ++k) {
const Log_Content content = log.contents(k);
if (content.key() == "ip")
ip = content.value();
else if (content.key() == "nothing")
nothing = content.value();
}
if (ip != "10.7.241.21")
continue;
LOG_INFO(sLogger, ("nothing", nothing)("substr", nothing.substr(nothing.size() - 4)));
if (nothing.substr(nothing.size() - 4) == "_$#1") {
if (firstLogSet.find(nothing) != firstLogSet.end()) {
firstLogCount++;
firstLogSet.erase(nothing);
}
} else if (nothing.substr(nothing.size() - 4) == "_$#3") {
if (thirdLogSet.find(nothing) != thirdLogSet.end()) {
thirdLogCount++;
thirdLogSet.erase(nothing);
}
} else if (nothing.substr(nothing.size() - 4) == "_$#4") {
if (forthLogSet.find(nothing) != forthLogSet.end()) {
forthLogCount++;
forthLogSet.erase(nothing);
}
} else if (nothing.substr(nothing.size() - 4) == "_$#2") {
if (secondLogSet.find(nothing) != secondLogSet.end()) {
secondLogCount++;
secondLogSet.erase(nothing);
}
}
}
}
APSARA_TEST_EQUAL(firstLogSet.size(), 0);
APSARA_TEST_EQUAL(firstLogCount, logCount);
APSARA_TEST_EQUAL(secondLogSet.size(), logCount);
APSARA_TEST_EQUAL(secondLogCount, 0);
APSARA_TEST_EQUAL(thirdLogSet.size(), 0);
APSARA_TEST_EQUAL(thirdLogCount, logCount);
APSARA_TEST_EQUAL(forthLogSet.size(), logCount);
APSARA_TEST_EQUAL(forthLogCount, 0);
FileEncryption::GetInstance()->mKeyMap.clear();
FileEncryption::GetInstance()->LoadKeyInfo();
FileEncryption::GetInstance()->SetDefaultKey();
delete keyV1;
delete keyV2;
delete keyV3;
INT32_FLAG(merge_log_count_limit) = defaultMergeLimit;
INT32_FLAG(max_holded_data_size) = defaultHoldSize;
Sender::Instance()->mCheckPeriod = defaultCheckPeriod;
gProjectNetEnableIndex = 0;
CaseCleanUp();
LOG_INFO(sLogger, ("TestSecondaryStorage() end", time(NULL)));
}
void TestEncryptAndDecrypt() {
LOG_INFO(sLogger, ("TestEncryptAndDecrypt() begin", time(NULL)));
for (size_t i = 0; i < 100; ++i) {
string data = GenerateRandomStr(1024 * 16, 1024 * 64);
char* des;
int32_t srcLength = (int32_t)data.size();
int32_t encryptionLength;
FileEncryption::GetInstance()->Encrypt(data.c_str(), srcLength, des, encryptionLength);
string encryption = string(des, encryptionLength);
delete[] des;
char* src = new char[srcLength];
FileEncryption::GetInstance()->Decrypt(encryption.c_str(), encryptionLength, src, srcLength, 1);
string decryption = string(src, srcLength);
delete[] src;
APSARA_TEST_TRUE(data == decryption);
APSARA_TEST_TRUE(encryption != decryption);
int32_t blockBytes = FileEncryption::GetInstance()->mKeyMap[1]->mBlockBytes;
APSARA_TEST_TRUE_DESC((encryptionLength >= srcLength && encryptionLength < (srcLength + blockBytes)),
"encryption.size:" + ToString(encryptionLength) + ";decryption.size:"
+ ToString(srcLength) + ";blockBytes:" + ToString(blockBytes));
}
LOG_INFO(sLogger, ("TestEncryptAndDecrypt() end", time(NULL)));
}
// Wait several seconds to make sure test log files have been read.
static void WaitForFileBeenRead() {
#if defined(_MSC_VER)
// Because of lackness of event based discovery, on Windows, we can only use
// polling, so wait for polling interval plus several process time (2s here).
logtail::sleep(INT32_FLAG(dirfile_check_interval_ms) / 1000 + 2);
#endif
}
void TestDiscardOldData() {
LOG_INFO(sLogger, ("TestDiscardOldData() begin", time(NULL)));
CaseSetUp();
EnableNetWork();
BOOL_FLAG(ilogtail_discard_old_data) = true;
// case#1
gCounter = 0;
OneJob(10, gRootDir, "Job", true, time(NULL));
WaitForFileBeenRead();
sleep(2 * INT32_FLAG(batch_send_interval) + 1);
APSARA_TEST_EQUAL(gCounter, 10);
// case#2
gCounter = 0;
OneJob(10, gRootDir, "Job", true, time(NULL) - INT32_FLAG(ilogtail_discard_interval) - 100);
WaitForFileBeenRead();
sleep(2 * INT32_FLAG(batch_send_interval) + 1);
APSARA_TEST_EQUAL(gCounter, 0);
// case#3
gCounter = 0;
time_t timeBefore1970 = -100000;
#if defined(__linux__)
OneJob(10, gRootDir, "Job", true, timeBefore1970);
WaitForFileBeenRead();
sleep(2 * INT32_FLAG(batch_send_interval) + 1);
APSARA_TEST_EQUAL(gCounter, 0);
#endif
// case#4
gCounter = 0;
OneJob(1, gRootDir, "Job", true, time(NULL));
#if defined(__linux__)
OneJob(1, gRootDir, "Job", true, timeBefore1970);
#endif
OneJob(1, gRootDir, "Job", true, time(NULL));
OneJob(1, gRootDir, "Job", true, time(NULL) - INT32_FLAG(ilogtail_discard_interval) - 100);
OneJob(1, gRootDir, "Job", true, time(NULL));
#if defined(__linux__)
OneJob(1, gRootDir, "Job", true, timeBefore1970);
#endif
OneJob(1, gRootDir, "Job", true, time(NULL));
OneJob(1, gRootDir, "Job", true, time(NULL) - INT32_FLAG(ilogtail_discard_interval) - 100);
WaitForFileBeenRead();
sleep(2 * INT32_FLAG(batch_send_interval) + 1);
APSARA_TEST_EQUAL(gCounter, 4);
BOOL_FLAG(ilogtail_discard_old_data) = false;
// case#5
gCounter = 0;
OneJob(10, gRootDir, "Job", true, time(NULL) - INT32_FLAG(ilogtail_discard_interval) - 100);
WaitForFileBeenRead();
sleep(2 * INT32_FLAG(batch_send_interval) + 1);
APSARA_TEST_EQUAL(gCounter, 10);
// case#6
gCounter = 0;
#if defined(__linux__)
OneJob(10, gRootDir, "Job", true, timeBefore1970);
WaitForFileBeenRead();
sleep(2 * INT32_FLAG(batch_send_interval) + 1);
APSARA_TEST_EQUAL(gCounter, 0);
#endif
// case#7
gCounter = 0;
OneJob(1, gRootDir, "Job", true, time(NULL));
#if defined(__linux__)
OneJob(1, gRootDir, "Job", true, timeBefore1970);
#endif
OneJob(1, gRootDir, "Job", true, time(NULL));
OneJob(1, gRootDir, "Job", true, time(NULL) - INT32_FLAG(ilogtail_discard_interval) - 100);
OneJob(1, gRootDir, "Job", true, time(NULL));
#if defined(__linux__)
OneJob(1, gRootDir, "Job", true, timeBefore1970);
#endif
OneJob(1, gRootDir, "Job", true, time(NULL));
OneJob(1, gRootDir, "Job", true, time(NULL) - INT32_FLAG(ilogtail_discard_interval) - 100);
WaitForFileBeenRead();
sleep(2 * INT32_FLAG(batch_send_interval) + 1);
APSARA_TEST_EQUAL(gCounter, 6);
CaseCleanUp();
LOG_INFO(sLogger, ("TestDiscardOldData() end", time(NULL)));
}
void TestConnect() {
LOG_INFO(sLogger, ("TestConnect() begin", time(NULL)));
CaseSetUp();
EnableNetWork();
OneJob(1, gRootDir, "Job", true, time(NULL));
WaitForFileBeenRead();
sleep(2 * INT32_FLAG(batch_send_interval) + 1);
APSARA_TEST_EQUAL(gCounter, 1);
CaseCleanUp();
LOG_INFO(sLogger, ("TestConnect() end", time(NULL)));
}
void TestDisConnect() {
LOG_INFO(sLogger, ("TestDisConnect() begin", time(NULL)));
CaseSetUp();
LogGroup logGroup;
logGroup.set_category("app_log");
Log* logPtr = logGroup.add_logs();
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key("ip");
contentPtr->set_value("10.7.241.21");
contentPtr = logPtr->add_contents();
contentPtr->set_key("time");
contentPtr->set_value(ToString(time(NULL)));
contentPtr = logPtr->add_contents();
contentPtr->set_key("nothing");
contentPtr->set_value(ToString(
"abcdefghijklmnopqrsputskjueiguwdhruwldirudsjhdklguejsldiuuwjskldgsksjdkdjfksjsdkfjsksdjfksjdkfuujss"));
contentPtr = logPtr->add_contents();
contentPtr->set_key("seq");
contentPtr->set_value(ToString("0"));
DisableNetWork();
OneJob(1, gRootDir, "Job", true, time(NULL));
WaitForFileBeenRead();
sleep(INT32_FLAG(batch_send_interval) * 2 + 2);
vector<string> filesToSend;
Sender::Instance()->LoadFileToSend(time(NULL), filesToSend);
APSARA_TEST_EQUAL(gCounter, 0);
// APSARA_TEST_EQUAL(filesToSend.size(), 1);
EnableNetWork();
sleep(INT32_FLAG(batch_send_interval) * 3 + 1);
gRecvLogGroupLock.lock();
APSARA_TEST_EQUAL(gRcvLogGroup.category(), logGroup.category());
APSARA_TEST_EQUAL(gRcvLogGroup.logs_size(), logGroup.logs_size());
for (int i = 0; i < gRcvLogGroup.logs_size() && i < logGroup.logs_size(); i++) {
const Log& log1 = gRcvLogGroup.logs(i);
const Log& log2 = logGroup.logs(i);
APSARA_TEST_EQUAL_FATAL(log1.contents_size(), log2.contents_size());
for (int j = 0; j < log1.contents_size(); j++) {
const Log_Content& content1 = log1.contents(j);
const Log_Content& content2 = log2.contents(j);
APSARA_TEST_EQUAL(content1.key(), content2.key());
APSARA_TEST_TRUE_DESC(content1.key() == "time" || content1.value() == content2.value(),
content1.key() + content1.value() + content2.key() + content2.value());
}
}
gRecvLogGroupLock.unlock();
APSARA_TEST_EQUAL(gCounter, 1);
filesToSend.clear();
Sender::Instance()->LoadFileToSend(time(NULL), filesToSend);
APSARA_TEST_EQUAL(filesToSend.size(), 0);
CaseCleanUp();
LOG_INFO(sLogger, ("TestDisConnect() end", time(NULL)));
}
void TestChangeStat() {
LOG_INFO(sLogger, ("TestChangeStat() begin", time(NULL)));
CaseSetUp();
DisableNetWork();
int32_t num = 10;
for (int32_t i = 0; i < num; ++i) {
OneJob(1, gRootDir, "job", true, time(NULL));
WaitForFileBeenRead();
sleep(INT32_FLAG(batch_send_interval) * 2 + 1);
if (i % 2 == 0)
EnableNetWork();
else
DisableNetWork();
}
EnableNetWork();
sleep(INT32_FLAG(batch_send_interval) * 4 + 1);
APSARA_TEST_EQUAL(gCounter, num);
vector<string> filesToSend;
Sender::Instance()->LoadFileToSend(time(NULL), filesToSend);
APSARA_TEST_EQUAL(filesToSend.size(), 0);
CaseCleanUp();
LOG_INFO(sLogger, ("TestChangeStat() end", time(NULL)));
}
void TestFlowControl() {
LOG_INFO(sLogger, ("TestFlowControl() begin", time(NULL)));
CaseSetUp();
DisableNetWork();
AppConfig::GetInstance()->mBytePerSec = 1 * 1024 * 1024;
int32_t MAX_BUFFER_FILE_SIZE = 50 * 1024 * 1024;
while (true) {
// if not use fixed time, then there may be many 10 seconds logs
// if the 10 seconds have different minutes, then
// ((value->mLogGroup).logs(0).time() / 60 != (*(mutableLogPtr + logIdx))->time() / 60)
// is true, so there will be many little packages and send time will take very long time
// may be one hour....................................
OneJob(10, gRootDir, "job", true, time(NULL), "", 0, true);
ifstream fin((gRootDir + PATH_SEPARATOR + "job.log").c_str());
fin.seekg(0, ios::end);
int32_t pos = fin.tellg();
if (pos > MAX_BUFFER_FILE_SIZE) {
fin.close();
break;
}
fin.close();
}
WaitForFileBeenRead();
sleep(3 * INT32_FLAG(batch_send_interval) + 1);
// vector<string> filesToSend;
// Sender::Instance() -> LoadFileToSend(time(NULL),filesToSend);
// LOG_INFO(sLogger,("buffer file num",filesToSend.size()));
// APSARA_TEST_TRUE_FATAL(filesToSend.size() > 1);
int32_t defaultMaxBps = AppConfig::GetInstance()->mMaxBytePerSec;
AppConfig::GetInstance()->mMaxBytePerSec = 2 * 1024 * 1024;
EnableNetWork();
Sender::Instance()->TestEndpoint(STRING_FLAG(default_region_name), "");
//{
// PTScopedLock lock(Sender::Instance()->mBufferWait);
// Sender::Instance()->mBufferWait.signal();
//}
uint64_t start = GetCurrentTimeInMicroSeconds();
sleep(20);
while (true) {
sleep(1);
Sender* pSender = Sender::Instance();
Aggregator* pAgg = Aggregator::GetInstance();
if (ProcessorRunner::GetInstance()->mLogFeedbackQueue.IsEmpty() && pAgg->IsMergeMapEmpty()
&& pSender->IsBatchMapEmpty() && pSender->GetSendingCount() == 0 && pSender->IsSecondaryBufferEmpty()) {
break;
}
// pSender->mSenderQueue.PrintStatus();
// filesToSend.clear();
// Sender::Instance() -> LoadFileToSend(time(NULL),filesToSend);
// if(filesToSend.size() == 0)
// break;
}
double interval = (GetCurrentTimeInMicroSeconds() - start) / 1000000.0;
double speed = gMessageSize * 1.0 / interval;
double diffRatio = speed / AppConfig::GetInstance()->GetMaxBytePerSec();
APSARA_TEST_TRUE_DESC(diffRatio >= 0.9 && diffRatio <= 1.3, diffRatio);
LOG_INFO(sLogger, ("actual speed", speed)("diffRatio", diffRatio));
AppConfig::GetInstance()->mMaxBytePerSec = defaultMaxBps;
if (interval < 40.) {
sleep((uint32_t)(40. - interval));
}
CaseCleanUp();
LOG_INFO(sLogger, ("TestFlowControl() end", time(NULL)));
}
void TestLogstoreFlowControl() {
LOG_INFO(sLogger, ("TestLogstoreFlowControl() begin", time(NULL)));
CaseSetUp(false, false, false, 1, false, 2 * 1024 * 1024, 0);
DisableNetWork();
int32_t MAX_BUFFER_FILE_SIZE = 50 * 1024 * 1024;
while (true) {
// if not use fixed time, then there may be many 10 seconds logs
// if the 10 seconds have different minutes, then
// ((value->mLogGroup).logs(0).time() / 60 != (*(mutableLogPtr + logIdx))->time() / 60)
// is true, so there will be many little packages and send time will take very long time
// may be one hour....................................
OneJob(10, gRootDir, "job", true, time(NULL), "", 0, true);
ifstream fin((gRootDir + PATH_SEPARATOR + "job.log").c_str());
fin.seekg(0, ios::end);
int32_t pos = fin.tellg();
if (pos > MAX_BUFFER_FILE_SIZE) {
fin.close();
break;
}
fin.close();
}
WaitForFileBeenRead();
sleep(3 * INT32_FLAG(batch_send_interval) + 1);
// vector<string> filesToSend;
// Sender::Instance() -> LoadFileToSend(time(NULL),filesToSend);
// LOG_INFO(sLogger,("buffer file num",filesToSend.size()));
// APSARA_TEST_TRUE_FATAL(filesToSend.size() > 1);
EnableNetWork();
Sender::Instance()->TestEndpoint(STRING_FLAG(default_region_name), "");
//{
// PTScopedLock lock(Sender::Instance()->mBufferWait);
// Sender::Instance()->mBufferWait.signal();
//}
uint64_t start = GetCurrentTimeInMicroSeconds();
sleep(20);
while (true) {
sleep(1);
Sender* pSender = Sender::Instance();
Aggregator* pAgg = Aggregator::GetInstance();
if (ProcessorRunner::GetInstance()->mLogFeedbackQueue.IsEmpty() && pAgg->IsMergeMapEmpty()
&& pSender->IsBatchMapEmpty() && pSender->GetSendingCount() == 0 && pSender->IsSecondaryBufferEmpty()) {
break;
}
// pSender->mSenderQueue.PrintStatus();
// filesToSend.clear();
// Sender::Instance() -> LoadFileToSend(time(NULL),filesToSend);
// if(filesToSend.size() == 0)
// break;
}
double interval = (GetCurrentTimeInMicroSeconds() - start) / 1000000.0;
double speed = gMessageSize * 1.0 / interval;
double diffRatio = speed / (2 * 1024 * 1024);
APSARA_TEST_TRUE_DESC(diffRatio >= 0.8 && diffRatio <= 1.3, diffRatio);
LOG_INFO(sLogger, ("actual speed", speed)("diffRatio", diffRatio));
if (interval < 40.) {
sleep((uint32_t)(40. - interval));
}
CaseCleanUp();
LOG_INFO(sLogger, ("TestLogstoreFlowControl() end", time(NULL)));
}
void TestLogstoreFlowControlPause() {
LOG_INFO(sLogger, ("TestLogstoreFlowControlPause() begin", time(NULL)));
CaseSetUp(false, false, false, 1, false, 0, 0);
EnableNetWork();
for (int i = 0; i < 100; ++i) {
OneJob(10, gRootDir, "job", true, time(NULL), "", 0, true);
}
WaitForFileBeenRead();
sleep(3 * INT32_FLAG(batch_send_interval) + 1);
APSARA_TEST_EQUAL(gCounter, 0);
string project = "1000000_proj";
string logstore = "app_log";
LogstoreFeedBackKey key = GenerateLogstoreFeedBackKey(project, logstore);
Sender::Instance()->SetLogstoreFlowControl(key, -1, 0);
sleep(3 * INT32_FLAG(batch_send_interval) + 1);
APSARA_TEST_EQUAL(gCounter, 1000);
CaseCleanUp();
LOG_INFO(sLogger, ("TestLogstoreFlowControlPause() end", time(NULL)));
}
void TestLogstoreFlowControlExpire() {
LOG_INFO(sLogger, ("TestLogstoreFlowControlExpire() begin", time(NULL)));
CaseSetUp(false, false, false, 1, false, 0, time(NULL) + 20);
EnableNetWork();
for (int i = 0; i < 100; ++i) {
OneJob(10, gRootDir, "job", true, time(NULL), "", 0, true);
}
sleep(10);
APSARA_TEST_EQUAL(gCounter, 0);
sleep(20);
APSARA_TEST_EQUAL(gCounter, 1000);
CaseCleanUp();
LOG_INFO(sLogger, ("TestLogstoreFlowControlExpire() end", time(NULL)));
}
char GetnoneUTF8Char() {
char noneUTF8Chars[] = {char(0xff), char(0xc0), char(0xe0), char(0xf0)};
static int index = 0;
index++;
index = index % 4;
return noneUTF8Chars[index];
}
void GenerateNoneUTF8Char(LogGroup& logGroup) {
auto now = GetCurrentLogtailTime();
for (int i = 0; i < 10; ++i) {
Log* logPtr = logGroup.add_logs();
SetLogTime(logPtr, now.tv_sec);
for (int j = 0; j < 10; ++j) {
Log_Content* contentPtr = logPtr->add_contents();
if (j == i) {
contentPtr->set_key("key" + ToString(j));
contentPtr->set_value(string(i, 'v') + string(i + 1, GetnoneUTF8Char()) + string(i, 'v'));
} else if (j == (i + 1)) {
contentPtr->set_key(string(i, 'v') + string(i + 1, GetnoneUTF8Char()) + string(i, 'v'));
contentPtr->set_value("value" + ToString(j));
} else if (j == (i + 2)) {
contentPtr->set_key("key" + ToString(j));
contentPtr->set_value(string(i, 'v') + string(i + 1, GetnoneUTF8Char()));
} else if (j == (i + 3)) {
contentPtr->set_key("key" + ToString(j));
contentPtr->set_value(string(i + 1, GetnoneUTF8Char()) + string(i, 'v'));
} else {
contentPtr->set_key("key" + ToString(j));
contentPtr->set_value("value" + ToString(j));
}
}
}
}
void TestMergeByCount() {
LOG_INFO(sLogger, ("TestMergeByCount() begin", time(NULL)));
int32_t defaultMergeLimit = INT32_FLAG(merge_log_count_limit);
INT32_FLAG(merge_log_count_limit) = rand() % 100 + 1;
LOG_INFO(sLogger, ("merge_log_count_limit", INT32_FLAG(merge_log_count_limit)));
CaseSetUp();
EnableNetWork();
for (size_t caseId = 0; caseId < 3; ++caseId) {
gBufferLogGroups.clear();
size_t totalSendLogNum = 0;
for (size_t j = 0; j < 10; j++) {
size_t randSendLogNum = rand() % INT32_FLAG(merge_log_count_limit);
totalSendLogNum += randSendLogNum;
OneJob(randSendLogNum, gRootDir, "Job", true, time(NULL), "");
LOG_INFO(sLogger, ("send loop", j)("totalSendLogNum", totalSendLogNum)("current size", randSendLogNum));
}
WaitForFileBeenRead();
sleep(INT32_FLAG(batch_send_interval) + 3);
size_t totalRecieveLogNum = 0;
for (size_t j = 0; j < (size_t)gBufferLogGroups.size(); ++j) {
LogGroup& logGroup = gBufferLogGroups[j];
APSARA_TEST_TRUE_DESC(logGroup.logs_size() <= INT32_FLAG(merge_log_count_limit), logGroup.logs_size());
totalRecieveLogNum += logGroup.logs_size();
LOG_INFO(
sLogger,
("recv loop", j)("totalRecieveLogNum", totalRecieveLogNum)("current size", logGroup.logs_size())(
"merge_log_count_limit", INT32_FLAG(merge_log_count_limit)));
}
APSARA_TEST_EQUAL(totalSendLogNum, totalRecieveLogNum);
}
CaseCleanUp();
INT32_FLAG(merge_log_count_limit) = defaultMergeLimit;
LOG_INFO(sLogger, ("TestMergeByCount() end", time(NULL)));
}
void TestFilterRule() {
LOG_INFO(sLogger, ("TestFilterRule() begin", time(NULL)));
CaseSetUp(true);
EnableNetWork();
OneJob(1, gRootDir, "Job", true, time(NULL), "filter", 23234); // not match
OneJob(1, gRootDir, "Job", true, time(NULL), "testfilter", 5); // not match
OneJob(1, gRootDir, "Job", true, time(NULL), "filtersdfas", 5); // match
OneJob(1, gRootDir, "Job", true, time(NULL), "sdfsds", 5); // not match
WaitForFileBeenRead();
sleep(INT32_FLAG(batch_send_interval) + 3);
APSARA_TEST_EQUAL(gCounter, 1);
for (size_t i = 0; i < gBufferLogGroups.size(); i++) {
LogGroup& loggroup = gBufferLogGroups[i];
for (int32_t j = 0; j < loggroup.logs_size(); j++) {
const Log& log = loggroup.logs(j);
for (int k = 0; k < log.contents_size(); k++) {
const Log_Content& content = log.contents(k);
if (content.key() == "nothing") {
APSARA_TEST_EQUAL(content.value(), "filtersdfas");
}
if (content.key() == "seq") {
APSARA_TEST_EQUAL(content.value(), "5");
}
}
}
}
CaseCleanUp();
LOG_INFO(sLogger, ("TestFilterRule() end", time(NULL)));
}
void TestMonitor() {
LOG_INFO(sLogger, ("TestMonitor() begin", time(NULL)));
int32_t defaultMonitorInterval = INT32_FLAG(monitor_interval);
INT32_FLAG(monitor_interval) = 3;
CaseSetUp();
LogtailMonitor::GetInstance()->Init();
gStatusLogGroup.Clear();
gStatusCount = 0;
EnableNetWork();
sleep(INT32_FLAG(monitor_interval) * 5 + 1);
APSARA_TEST_TRUE_DESC(gStatusCount >= 2, gStatusCount);
APSARA_TEST_EQUAL(gStatusLogGroup.category(), "logtail_status_profile");
const Log& log = gStatusLogGroup.logs(0);
int idx = 0;
APSARA_TEST_EQUAL(log.contents(idx++).key(), "cpu");
#if defined(__linux__)
APSARA_TEST_EQUAL(log.contents(idx++).key(), "os_cpu");
#endif
APSARA_TEST_EQUAL(log.contents(idx++).key(), "mem");
APSARA_TEST_EQUAL(log.contents(idx++).key(), "version");
APSARA_TEST_EQUAL(log.contents(idx++).key(), "uuid");
APSARA_TEST_EQUAL(log.contents(idx++).key(), "user_defined_id");
APSARA_TEST_EQUAL(log.contents(idx++).key(), "aliuids");
APSARA_TEST_EQUAL(log.contents(idx++).key(), "projects");
APSARA_TEST_EQUAL(log.contents(idx++).key(), "instance_id");
APSARA_TEST_EQUAL(log.contents(idx++).key(), "syslog_open");
APSARA_TEST_EQUAL(log.contents(idx++).key(), "ip");
APSARA_TEST_EQUAL(log.contents(idx++).key(), "hostname");
APSARA_TEST_EQUAL(log.contents(idx++).key(), "os");
APSARA_TEST_EQUAL(log.contents(idx++).key(), "os_detail");
APSARA_TEST_EQUAL(log.contents(idx++).key(), "user");
#if defined(__linux__)
APSARA_TEST_EQUAL(log.contents(idx++).key(), "load");
#endif
APSARA_TEST_EQUAL(log.contents(idx++).key(), "metric_json");
APSARA_TEST_EQUAL(log.contents(idx++).key(), "status");
LogtailMonitor::GetInstance()->RemoveMonitor();
CaseCleanUp();
INT32_FLAG(monitor_interval) = defaultMonitorInterval;
LOG_INFO(sLogger, ("TestMonitor() end", time(NULL)));
}
void TestFlushOut() {
LOG_INFO(sLogger, ("TestFlushOut() begin", time(NULL)));
AppConfig::GetInstance()->mMaxBufferNum = 500;
EnableNetWork();
CaseSetUp();
int32_t seqNo = 5;
for (int round = 0; round < 10; ++round) {
OneJob(100, gRootDir, "Job", true, time(NULL), "testflushout", seqNo);
usleep(100 * 1000);
}
WaitForFileBeenRead();
LOG_INFO(sLogger, ("Flush out", "begin"));
APSARA_TEST_TRUE(Sender::Instance()->FlushOut(5 * 1000)); // this operation will write buffer file
LOG_INFO(sLogger, ("Flush out", "end"));
sleep(2);
{
WaitObject::Lock lock(Sender::Instance()->mBufferWait);
Sender::Instance()->mBufferWait.signal();
}
sleep(10);
APSARA_TEST_EQUAL(gCounter, 1000);
gRecvLogGroupLock.lock();
const Log& log = gRcvLogGroup.logs(0);
APSARA_TEST_EQUAL(log.contents(2).value(), "testflushout");
APSARA_TEST_EQUAL(log.contents(3).value(), ToString(seqNo));
gRecvLogGroupLock.unlock();
CaseCleanUp();
AppConfig::GetInstance()->mMaxBufferNum = INT32_FLAG(max_buffer_num);
LOG_INFO(sLogger, ("TestFlushOut() end", time(NULL)));
}
void TestMergeByMinute() {
LOG_INFO(sLogger, ("TestMergeByMinute() begin", time(NULL)));
CaseSetUp();
EnableNetWork();
for (size_t caseId = 0; caseId < 3; ++caseId) {
{
PTScopedLock lock(gBufferLogGroupsLock);
gBufferLogGroups.clear();
}
int32_t curTime = (time(NULL) / 60) * 60;
int32_t logTimes[10] = {
curTime - 300, // new loggroup
curTime - 150, // new loggroup
curTime - 121,
curTime - 120, // new loggroup
curTime - 60, // new loggroup
curTime - 59,
curTime - 3,
curTime, // new loggroup
curTime + 59,
curTime + 60 // new loggroup
};
int32_t logNums[10] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
size_t totalSendLogNum = 0;
LOG_INFO(sLogger, ("caseId", caseId));
for (size_t j = 0; j < 10; j++) {
logNums[j] = (rand() % (INT32_FLAG(merge_log_count_limit) / 50)) + 1;
OneJob(logNums[j], gRootDir, "Job", true, logTimes[j], "", 0, true);
LOG_INFO(sLogger, ("logTime", logTimes[j])("logNum", logNums[j]));
totalSendLogNum += logNums[j];
}
int32_t mergeLogNums[6] = {logNums[0],
logNums[1] + logNums[2],
logNums[3],
logNums[4] + logNums[5] + logNums[6],
logNums[7] + logNums[8],
logNums[9]};
WaitForFileBeenRead();
sleep(INT32_FLAG(batch_send_interval) * 2 + 1);
size_t totalRecieveLogNum = 0;
map<int32_t, int32_t> recvLogCountMap; //<timestamp, count>
gBufferLogGroupsLock.lock();
APSARA_TEST_EQUAL_DESC(gBufferLogGroups.size(), 6, string("caseId:") + ToString(caseId));
if (gBufferLogGroups.size() == 6) {
for (size_t idx = 0; idx < gBufferLogGroups.size(); ++idx) {
const LogGroup& logGroup = gBufferLogGroups[idx];
APSARA_TEST_EQUAL_DESC(logGroup.logs_size(),
mergeLogNums[idx],
string("caseId:") + ToString(caseId) + ", idx:" + ToString(idx));
totalRecieveLogNum += logGroup.logs_size();
for (int32_t logIdx = 0; logIdx < logGroup.logs_size(); ++logIdx) {
int32_t logtime = logGroup.logs(logIdx).time();
if (recvLogCountMap.find(logtime) == recvLogCountMap.end())
recvLogCountMap[logtime] = 1;
else
recvLogCountMap[logtime] += 1;
}
}
}
gBufferLogGroupsLock.unlock();
for (map<int32_t, int32_t>::iterator iter = recvLogCountMap.begin(); iter != recvLogCountMap.end(); ++iter)
LOG_INFO(sLogger, ("recvLogCountMap, time", iter->first)("count", iter->second));
APSARA_TEST_EQUAL_DESC(totalSendLogNum, totalRecieveLogNum, string("caseId:") + ToString(caseId));
for (int32_t idx = 0; idx < 10; ++idx) {
if (recvLogCountMap.find(logTimes[idx]) != recvLogCountMap.end())
APSARA_TEST_EQUAL_DESC(recvLogCountMap[logTimes[idx]],
logNums[idx],
string("caseId:") + ToString(caseId) + ", idx:" + ToString(idx));
else
APSARA_TEST_TRUE_DESC(recvLogCountMap.find(logTimes[idx]) != recvLogCountMap.end(),
string("caseId:") + ToString(caseId) + ", idx:" + ToString(idx));
}
}
CaseCleanUp();
LOG_INFO(sLogger, ("TestMergeByMinute() end", time(NULL)));
}
void TestRealIpSend() {
LOG_INFO(sLogger, ("TestRealIpSend() begin", time(NULL)));
CaseSetUp(false, true, true);
EnableNetWork();
// enable real ip
BOOL_FLAG(send_prefer_real_ip) = true;
INT32_FLAG(send_switch_real_ip_interval) = 10;
Sender::Instance()->MockGetRealIp = SenderUnittest::MockGetRealIp;
// start real ip update thread
gRealIpSendThread = CreateThread(std::bind(&Sender::RealIpUpdateThread, Sender::Instance()));
string aliuid = "1234567890";
string region = AppConfig::GetInstance()->GetDefaultRegion();
LOG_INFO(sLogger, (aliuid, region));
sleep(1);
LOG_INFO(sLogger, ("start generate log", ""));
int32_t seqNo = 5;
for (int round = 0; round < 10; ++round) {
OneJob(100, gRootDir, "Job", true, time(NULL), "TestRealIpSend", seqNo);
usleep(100 * 1000);
}
sleep(10);
// check data
APSARA_TEST_EQUAL(gCounter, 1000);
gRecvLogGroupLock.lock();
const Log& log = gRcvLogGroup.logs(0);
APSARA_TEST_EQUAL(log.contents(2).value(), "TestRealIpSend");
APSARA_TEST_EQUAL(log.contents(3).value(), ToString(seqNo));
gRecvLogGroupLock.unlock();
// check send host
auto client = Sender::Instance()->GetSendClient(region, aliuid);
APSARA_TEST_TRUE(client->GetRawSlsHostFlag());
APSARA_TEST_EQUAL(client->GetRawSlsHost().substr(0, strlen("10.123.32.")), string("10.123.32."));
CaseCleanUp();
// reset flag
BOOL_FLAG(send_prefer_real_ip) = false;
LOG_INFO(sLogger, ("TestRealIpSend() end", time(NULL)));
}
void TestEmptyRealIp() {
LOG_INFO(sLogger, ("TestEmptyRealIp() begin", time(NULL)));
CaseSetUp(false, true, false);
EnableNetWork();
// enable real ip
BOOL_FLAG(send_prefer_real_ip) = true;
INT32_FLAG(send_switch_real_ip_interval) = 10;
Sender::Instance()->MockGetRealIp = SenderUnittest::MockGetEmptyRealIp;
// start real ip update thread
gRealIpSendThread = CreateThread(std::bind(&Sender::RealIpUpdateThread, Sender::Instance()));
string aliuid = "1234567890";
string region = AppConfig::GetInstance()->GetDefaultRegion();
LOG_INFO(sLogger, (aliuid, region));
sleep(1);
LOG_INFO(sLogger, ("start generate log", ""));
int32_t seqNo = 5;
for (int round = 0; round < 10; ++round) {
OneJob(100, gRootDir, "Job", true, time(NULL), "TestRealIpSend", seqNo);
usleep(100 * 1000);
}
sleep(10);
// check data
APSARA_TEST_EQUAL(gCounter, 1000);
gRecvLogGroupLock.lock();
const Log& log = gRcvLogGroup.logs(0);
APSARA_TEST_EQUAL(log.contents(2).value(), "TestRealIpSend");
APSARA_TEST_EQUAL(log.contents(3).value(), ToString(seqNo));
gRecvLogGroupLock.unlock();
// check send host
sdk::Client* client = Sender::Instance()->GetSendClient(region, aliuid);
APSARA_TEST_EQUAL(client->GetRawSlsHostFlag(), false);
CaseCleanUp();
// reset flag
BOOL_FLAG(send_prefer_real_ip) = false;
LOG_INFO(sLogger, ("TestEmptyRealIp() end", time(NULL)));
}
void TestRealIpSendFailAndRecover() {
LOG_INFO(sLogger, ("TestRealIpSendFailAndRecover() begin", time(NULL)));
CaseSetUp(false, true, true);
DisableNetWork();
gSendFailType = 1;
// enable real ip
BOOL_FLAG(send_prefer_real_ip) = true;
INT32_FLAG(send_switch_real_ip_interval) = 60;
Sender::Instance()->MockGetRealIp = SenderUnittest::MockGetRealIp;
// start real ip update thread
gStartIp = 0;
gRealIpSendThread = CreateThread(std::bind(&Sender::RealIpUpdateThread, Sender::Instance()));
string aliuid = "1234567890";
string region = AppConfig::GetInstance()->GetDefaultRegion();
LOG_INFO(sLogger, (aliuid, region));
sleep(1);
LOG_INFO(sLogger, ("start generate log", ""));
int32_t seqNo = 5;
for (int round = 0; round < 10; ++round) {
OneJob(100, gRootDir, "Job", true, time(NULL), "TestRealIpSendFailAndRecover", seqNo);
usleep(10 * 1000);
}
LOG_INFO(sLogger, ("generate end", ""));
sleep(10);
APSARA_TEST_EQUAL(gCounter, 0);
auto client = Sender::Instance()->GetSendClient(region, aliuid);
APSARA_TEST_TRUE(client->GetRawSlsHostFlag());
APSARA_TEST_TRUE_DESC(gStartIp >= 2, gStartIp);
LOG_INFO(sLogger, ("enable network", ""));
EnableNetWork();
sleep(8);
// check data
APSARA_TEST_EQUAL(gCounter, 1000);
gRecvLogGroupLock.lock();
const Log& log = gRcvLogGroup.logs(0);
APSARA_TEST_EQUAL(log.contents(2).value(), "TestRealIpSendFailAndRecover");
APSARA_TEST_EQUAL(log.contents(3).value(), ToString(seqNo));
gRecvLogGroupLock.unlock();
// make sure real ip is enabled
int moreLogNum = 0;
for (int i = 0; i < 10; ++i) {
if (client->GetRawSlsHostFlag() != true) {
LOG_INFO(sLogger, ("write log to switch real ip", ""));
moreLogNum += 10;
OneJob(10, gRootDir, "Job", true, time(NULL), "TestRealIpSendFailAndRecover", seqNo);
sleep(1);
} else
break;
}
// check send host
APSARA_TEST_TRUE(client->GetRawSlsHostFlag());
APSARA_TEST_EQUAL(client->GetRawSlsHost().substr(0, strlen("10.123.32.")), string("10.123.32."));
int32_t lastTime = time(NULL);
INT32_FLAG(send_switch_real_ip_interval) = 1000;
sleep(5);
string lastIp = client->GetRawSlsHost();
gClient = client;
gDisabledIp = "10.123.32.";
gDisableIpFlag = true;
LOG_INFO(sLogger, ("disable ip", lastIp));
for (int round = 0; round < 10; ++round) {
OneJob(100, gRootDir, "Job", true, time(NULL), "TestRealIpSendFailAndRecover", seqNo);
usleep(1000 * 1000);
}
WaitForFileBeenRead();
sleep(5);
LOG_INFO(sLogger, ("check ip switch", lastIp));
Sender::Instance()->mRegionRealIpLock.lock();
APSARA_TEST_NOT_EQUAL(lastIp, Sender::Instance()->mRegionRealIpMap[region]->mRealIp);
APSARA_TEST_TRUE(Sender::Instance()->mRegionRealIpMap[region]->mLastUpdateTime >= lastTime);
Sender::Instance()->mRegionRealIpLock.unlock();
gDisableIpFlag = false;
sleep(10);
APSARA_TEST_EQUAL(gCounter, 2000 + moreLogNum);
sleep(3);
gDisableIpFlag = false;
gClient = NULL;
INT32_FLAG(send_switch_real_ip_interval) = 10;
CaseCleanUp();
// reset flag
BOOL_FLAG(send_prefer_real_ip) = false;
LOG_INFO(sLogger, ("TestRealIpSendFailAndRecover() end", time(NULL)));
}
// TestRegionConcurreny tests if the send concurrency control on region level works.
// 1. Set the send_request_concurrency to 30 and create 4 projects.
// 2. Generate some datas and then assert the number of received data.
// 3. Disable network and generate some datas, then assert the send concurrency of each region.
// 4. Enable network but only enable one project, generate lots of data of another three
// projects, and then assert that data to the enabled project can be received.
void TestRegionConcurreny() {
LOG_INFO(sLogger, ("TestRegionConcurreny() begin", time(NULL)));
// There are three built-in regions, and project 0 will use the __default__ region, so
// the number of regions is 6 (3+3).
LOG_INFO(sLogger,
("Set send_request_concurrency to 30 and "
"create 4 projects",
time(NULL)));
auto bakSendRequestConcurrency = INT32_FLAG(send_request_concurrency);
INT32_FLAG(send_request_concurrency) = 30;
CaseSetUp(false, false, true, 4);
EnableNetWork();
gProjectNetEnableIndex = 3;
LOG_INFO(sLogger,
("Generate some data and "
"assert the number of received data",
""));
for (int prjIndex = 0; prjIndex < 4; ++prjIndex) {
OneJob(60 * 10, gRootDir, "Job", true, time(NULL), "TestRegionConcurreny", 0, false, prjIndex);
}
WaitForFileBeenRead();
sleep(INT32_FLAG(batch_send_interval) * 2 + 2);
sleep(10);
auto counter = gCounter;
EXPECT_EQ(counter, 60 * 10 * 4);
LOG_INFO(sLogger,
("Disable network and generate more data, "
"assert the send concurrency of each region",
""));
sleep(1);
gSendFailType = 1;
DisableNetWork();
sleep(1);
for (int prjIndex = 0; prjIndex < 4; ++prjIndex) {
OneJob(60 * 10, gRootDir, "Job", true, time(NULL), "TestRegionConcurreny", 0, false, prjIndex);
}
WaitForFileBeenRead();
sleep(INT32_FLAG(batch_send_interval) * 2 + 2);
EXPECT_EQ(gCounter, counter);
{
PTScopedLock lock(Sender::Instance()->mRegionEndpointEntryMapLock);
auto& regionMap = Sender::Instance()->mRegionEndpointEntryMap;
LOG_INFO(sLogger, ("Access region map", regionMap.size()));
for (auto iter = regionMap.begin(); iter != regionMap.end(); ++iter) {
auto& regionName = iter->first;
LOG_INFO(sLogger, (regionName, iter->second->mConcurrency));
// Only tests __default__ and 100000[1-3]_proj.
int32_t projectIndex = -1;
if (0 == regionName.find("__default"))
projectIndex = 0;
else if (0 == regionName.find("1000")) {
StringTo(regionName.substr(std::string("100000").length(), 1), projectIndex);
}
if (0 <= projectIndex && projectIndex <= 3) {
EXPECT_EQ(iter->second->mConcurrency, 30 / regionMap.size());
}
}
}
LOG_INFO(sLogger,
("Enable network but only enable one project, "
"generate lots of data of another three projects, and then "
"assert that data to the enabled project can be received on time.",
""));
sleep(1);
gProjectNetEnableIndex = 0;
EnableNetWork();
sleep(INT32_FLAG(batch_send_interval) * 2 + 2);
counter += 60 * 10;
EXPECT_EQ(gCounter, counter);
for (int prjIndex = 1; prjIndex < 4; ++prjIndex) {
OneJob(60 * 300, gRootDir, "Job", true, time(NULL), "TestRegionConcurreny", 0, false, prjIndex);
}
OneJob(60 * 100, gRootDir, "Job", true, time(NULL), "TestRegionConcurreny", 0, false, 0);
WaitForFileBeenRead();
sleep(INT32_FLAG(batch_send_interval) * 2 + 2);
counter += 60 * 100;
EXPECT_EQ(gCounter, counter);
INT32_FLAG(send_request_concurrency) = bakSendRequestConcurrency;
CaseCleanUp();
LOG_INFO(sLogger, ("TestRegionConcurreny() end", time(NULL)));
}
void TestTooOldFilesIntegrity() {
LOG_INFO(sLogger, ("TestTooOldFilesIntegrity() begin", time(NULL)));
const int PROJECT_NUM = 8;
EnableNetWork();
gLogIntegrityTestFlag = true;
auto bakFileEliminateInterval = INT32_FLAG(file_eliminate_interval);
CaseSetUp(false, true, true, PROJECT_NUM, false, -1, 0, 900);
INT32_FLAG(file_eliminate_interval) = 3;
for (int round = 0; round < 10; ++round) {
for (int prjIndex = 0; prjIndex < PROJECT_NUM; ++prjIndex) {
// use 100, let time to be separated in different minutes, so there will be two packets
OneJob(100, gRootDir, "Job", true, time(NULL), "TestTooOldFilesIntegrity", 0, false, prjIndex);
}
usleep(200 * 1000);
}
WaitForFileBeenRead();
// wait 10s
sleep(10);
// test
const LogIntegrity::RegionOutDatedFileMap& regionOutDatedFileMap
= LogIntegrity::GetInstance()->mRegionOutDatedFileMap;
APSARA_TEST_EQUAL(regionOutDatedFileMap.size(), 4);
for (LogIntegrity::RegionOutDatedFileMap::const_iterator regionIter = regionOutDatedFileMap.begin();
regionIter != regionOutDatedFileMap.end();
++regionIter) {
// get seq
const std::string& getRegion = regionIter->first;
int seq = getRegion[6] - '0';
char region[16] = {0};
char projectName[16] = {0};
char logFileName[16] = {0};
snprintf(region, sizeof(region), "%d_proj", 1000000 + seq);
snprintf(projectName, sizeof(projectName), "%d_proj", 1000000 + seq);
if (seq == 0)
strcpy(logFileName, "Job.log");
else
snprintf(logFileName, sizeof(logFileName), "Job.log%d", seq);
const LogIntegrity::OutDatedFileMap* outDatedFileMap = regionIter->second;
APSARA_TEST_EQUAL(regionIter->first, std::string(region));
APSARA_TEST_EQUAL(outDatedFileMap->size(), (size_t)1);
for (LogIntegrity::OutDatedFileMap::const_iterator fileIter = outDatedFileMap->begin();
fileIter != outDatedFileMap->end();
++fileIter) {
const OutDatedFile* outDatedFile = fileIter->second;
APSARA_TEST_EQUAL(outDatedFile->mRegion, std::string(region));
APSARA_TEST_EQUAL(outDatedFile->mProjectName, std::string(projectName));
APSARA_TEST_EQUAL(outDatedFile->mLogstore, "app_log");
APSARA_TEST_EQUAL(outDatedFile->mFilename, gRootDir + PATH_SEPARATOR + std::string(logFileName));
APSARA_TEST_EQUAL(outDatedFile->mIntegrityProject, std::string(projectName));
APSARA_TEST_EQUAL(outDatedFile->mIntegrityLogstore, "test-integrity-logstore");
}
}
gLogIntegrityTestFlag = false;
CaseCleanUp();
INT32_FLAG(file_eliminate_interval) = bakFileEliminateInterval;
LOG_INFO(sLogger, ("TestTooOldFilesIntegrity() end", time(NULL)));
}
void TestGlobalMarkOffset() {
LOG_INFO(sLogger, ("TestGlobalMarkOffset() begin", time(NULL)));
// prepare
auto bakDefaultGlobalMarkOffsetFlag = BOOL_FLAG(default_global_mark_offset_flag);
auto bakDefaultGlobalFuseMode = BOOL_FLAG(default_global_fuse_mode);
BOOL_FLAG(default_global_mark_offset_flag) = true;
BOOL_FLAG(default_global_fuse_mode) = false;
gGlobalMarkOffsetTestFlag = true;
std::string dir = gRootDir + "MarkOffsetTest";
bfs::create_directories(dir);
auto& PS = PATH_SEPARATOR;
// test
EnableNetWork();
CaseSetUp(false, true, true, 1, false, -1, 0, 900);
auto PrintMapInfo = []() {
auto inst = LogFileCollectOffsetIndicator::GetInstance();
LOG_INFO(sLogger, ("mLogFileOffsetInfoMap", ""));
for (auto& i : inst->mLogFileOffsetInfoMap) {
LOG_INFO(sLogger, (i.first.ToString(), i.second->mLogFileInfo.mFilename));
}
LOG_INFO(sLogger, ("mLogFileOffsetInfoMap", "done"));
LOG_INFO(sLogger, ("mLogFileCollectProgressMap", ""));
for (auto& i : inst->mLogFileCollectProgressMap) {
LOG_INFO(sLogger, (i.first.ToString(), i.second.IsValid()));
}
LOG_INFO(sLogger, ("mLogFileCollectProgressMap", "done"));
};
LOG_INFO(sLogger, ("write a.log", ""));
for (int round = 0; round < 10; ++round) {
OneJob(100, dir, "a", true, time(NULL));
usleep(200 * 1000);
}
WaitForFileBeenRead();
sleep(5);
LOG_INFO(sLogger, ("print map info", "after writing a.log"));
PrintMapInfo();
LOG_INFO(sLogger, ("check map status", "1"));
const LogFileCollectProgressMap& m1 = LogFileCollectOffsetIndicator::GetInstance()->mLogFileCollectProgressMap;
APSARA_TEST_EQUAL(m1.size(), 1);
fsutil::PathStat buf_a;
bool statRet_a = fsutil::PathStat::stat(std::string(dir + PS + "a.log").c_str(), buf_a);
APSARA_TEST_TRUE_DESC(statRet_a, "a.log must exists");
const LogFileInfo& info_a = m1.begin()->first;
const LogFileCollectProgress& progress_a = m1.begin()->second;
APSARA_TEST_EQUAL(info_a.mFilename, dir + PS + "a.log");
APSARA_TEST_EQUAL(progress_a.mFileSize, buf_a.GetFileSize());
APSARA_TEST_EQUAL(progress_a.mFileLastPos, progress_a.mFileSize);
APSARA_TEST_EQUAL(progress_a.mFileReadPos, progress_a.mFileSize);
APSARA_TEST_TRUE(progress_a.IsFinished());
LOG_INFO(sLogger, ("write b.log", ""));
for (int round = 0; round < 20; ++round) {
OneJob(100, dir, "b", true, time(NULL));
usleep(200 * 1000);
}
WaitForFileBeenRead();
sleep(11);
LOG_INFO(sLogger, ("print map info", "after writing b.log"));
PrintMapInfo();
LOG_INFO(sLogger, ("rotate b.log and write new b.log", ""));
bfs::rename(std::string(dir + PS + "b.log").c_str(), std::string(dir + PS + "b.log.1").c_str());
for (int round = 0; round < 10; ++round) {
OneJob(100, dir, "b", true, time(NULL));
usleep(200 * 1000);
}
WaitForFileBeenRead();
sleep(11);
LOG_INFO(sLogger, ("print map info", "after rotating b.log"));
PrintMapInfo();
LOG_INFO(sLogger, ("check map status", "2"));
const LogFileCollectProgressMap& m2 = LogFileCollectOffsetIndicator::GetInstance()->mLogFileCollectProgressMap;
APSARA_TEST_EQUAL(m2.size(), 2);
fsutil::PathStat buf_b;
bool statRet_b = fsutil::PathStat::stat(std::string(dir + PS + "b.log").c_str(), buf_b);
APSARA_TEST_TRUE_DESC(statRet_b, "b.log must exists");
for (LogFileCollectProgressMap::const_iterator iter = m2.begin(); iter != m2.end(); ++iter) {
const LogFileInfo& info_b = iter->first;
const LogFileCollectProgress& progress_b = iter->second;
if (info_b.mFilename == dir + PS + "b.log") {
APSARA_TEST_EQUAL(info_b.mFilename, dir + PS + "b.log");
APSARA_TEST_EQUAL(progress_b.mFileSize, buf_b.GetFileSize());
APSARA_TEST_EQUAL(progress_b.mFileLastPos, progress_b.mFileSize);
APSARA_TEST_EQUAL(progress_b.mFileReadPos, progress_b.mFileSize);
APSARA_TEST_TRUE(progress_b.IsFinished());
}
}
// write file, c.log, rotate, then delete c.log
LOG_INFO(sLogger, ("write c.log", ""));
for (int round = 0; round < 20; ++round) {
OneJob(100, dir, "c", true, time(NULL));
usleep(200 * 1000);
}
WaitForFileBeenRead();
sleep(11);
LOG_INFO(sLogger, ("print map info", "after writing c.log"));
PrintMapInfo();
LOG_INFO(sLogger, ("rotate c.log and write new c.log", ""));
bfs::rename(std::string(dir + PS + "c.log").c_str(), std::string(dir + PS + "c.log.1").c_str());
for (int round = 0; round < 10; ++round) {
OneJob(100, dir, "c", true, time(NULL));
usleep(200 * 1000);
}
WaitForFileBeenRead();
sleep(11);
LOG_INFO(sLogger, ("print map info", "after rotating c.log"));
PrintMapInfo();
LOG_INFO(sLogger, ("check map status", "3"));
const LogFileCollectProgressMap& m3 = LogFileCollectOffsetIndicator::GetInstance()->mLogFileCollectProgressMap;
APSARA_TEST_EQUAL(m3.size(), 3);
LOG_INFO(sLogger, ("delete c.log", ""));
bfs::remove(std::string(dir + PS + "c.log").c_str());
sleep(11);
LOG_INFO(sLogger, ("print map info", "after deleting c.log"));
PrintMapInfo();
LOG_INFO(sLogger, ("check map status", "4"));
const LogFileCollectProgressMap& m4 = LogFileCollectOffsetIndicator::GetInstance()->mLogFileCollectProgressMap;
APSARA_TEST_EQUAL(m4.size(), 2);
bool found = false;
for (LogFileCollectProgressMap::const_iterator iter = m2.begin(); iter != m2.end(); ++iter) {
const LogFileInfo& info_c = iter->first;
if (info_c.mFilename == dir + PS + "c.log") {
found = true;
}
}
APSARA_TEST_TRUE(!found);
// case clean up
CaseCleanUp();
bfs::remove_all(dir);
gGlobalMarkOffsetTestFlag = false;
BOOL_FLAG(default_global_fuse_mode) = bakDefaultGlobalFuseMode;
BOOL_FLAG(default_global_mark_offset_flag) = bakDefaultGlobalMarkOffsetFlag;
LOG_INFO(sLogger, ("TestGlobalMarkOffset() end", time(NULL)));
}
static void MockExactlyOnceSend(LoggroupTimeValue* data);
void TestExactlyOnceDataSendSequence();
void TestExactlyOncePartialBlockConcurrentSend();
void TestExactlyOnceCompleteBlockConcurrentSend();
};
APSARA_UNIT_TEST_CASE(SenderUnittest, TestSecondaryStorage, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestEncryptAndDecrypt, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestDiscardOldData, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestConnect, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestDisConnect, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestChangeStat, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestFlowControl, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestMergeByCount, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestFlushOut, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestMonitor, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestFilterRule, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestDumpSnapshot, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestMergeByMinute, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestMultiUserSeperationAndDiscardFailAndOtherFail, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestMultiUserSeperationAndRetryQuotaRecovery, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestMultiUserSeperationAndTestNetWorkRecovery, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestMultiUserSeperationAndRetryIntervalRecovery, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestLogstoreFlowControl, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestLogstoreFlowControlPause, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestLogstoreFlowControlExpire, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestTooOldFilesIntegrity, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestGlobalMarkOffset, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestRealIpSend, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestEmptyRealIp, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestRealIpSendFailAndRecover, gCaseID);
APSARA_UNIT_TEST_CASE(SenderUnittest, TestRegionConcurreny, gCaseID);
UNIT_TEST_CASE(SenderUnittest, TestExactlyOnceDataSendSequence);
UNIT_TEST_CASE(SenderUnittest, TestExactlyOncePartialBlockConcurrentSend);
UNIT_TEST_CASE(SenderUnittest, TestExactlyOnceCompleteBlockConcurrentSend);
decltype(SenderUnittest::sProcessQueueMap) SenderUnittest::sProcessQueueMap = nullptr;
decltype(SenderUnittest::sSenderQueueMap) SenderUnittest::sSenderQueueMap = nullptr;
// Record checkpoint and forward to MockAsyncSend.
void SenderUnittest::MockExactlyOnceSend(LoggroupTimeValue* data) {
auto& cpt = data->mLogGroupContext.mExactlyOnceCheckpoint;
if (!cpt) {
return;
}
auto closure = new SendClosure;
closure->mDataPtr = data;
{
auto hashKey = cpt->data.hash_key();
std::lock_guard<std::mutex> lock(gMockExactlyOnceSendLock);
if (gBlockedHashKeySet.find(hashKey) != gBlockedHashKeySet.end()) {
LOG_INFO(sLogger, ("hash key is blocked", hashKey)("checkpoint", cpt->data.DebugString()));
auto res = new sdk::PostLogStoreLogsResponse;
res->statusCode = 403;
res->requestId = "RequestIDInMockExactlyOnceSend";
closure->OnFail(res, sdk::LOGE_WRITE_QUOTA_EXCEED, "reject by exactly once send");
return;
}
}
gRangeCheckpoints.push_back(RangeCheckpointPtr(new RangeCheckpoint(*(cpt.get()))));
LOG_INFO(sLogger, ("checkpoint key", cpt->key)("checkpoint", cpt->data.DebugString()));
MockAsyncSend(data->mProjectName,
data->mLogstore,
data->mLogData,
data->mDataType,
data->mRawSize,
data->mLogGroupContext.mCompressType,
closure);
}
// Test if data's sequence is generated orderly:
// - Incremental sequence ID.
// - Incremental checkpoint index.
//
// Procedure: generate N logs continuously, use MockExactlyOnceSend to catch checkpoints,
// after N logs are sent, test if these logs break order.
void SenderUnittest::TestExactlyOnceDataSendSequence() {
LOG_INFO(sLogger, ("TestExactlyOnceDataSendSequence() begin", time(NULL)));
gEnableExactlyOnce = true;
EnableNetWork();
CaseSetUp(false, true, true, 1, false, -1, 0, 900);
std::string dir = gRootDir + PATH_SEPARATOR + "ExactlyOnceDataSendSequence";
bfs::create_directories(dir);
LOG_INFO(sLogger, ("write a.log", dir));
const size_t kRound = 3;
for (size_t round = 0; round < kConcurrency * kRound; ++round) {
OneJob(100, dir, "a", true, time(NULL));
usleep(200 * 1000);
}
WaitForFileBeenRead();
sleep(5);
LOG_INFO(sLogger, ("test sequence", ""));
ModifyHandler* handler = nullptr;
{
auto& pathMap = sEventDispatcher->mPathWdMap;
auto iter = pathMap.find(dir);
ASSERT_TRUE(iter != pathMap.end());
auto dirHandler = static_cast<CreateModifyHandler*>(sEventDispatcher->mWdDirInfoMap[iter->second]->mHandler);
EXPECT_EQ(dirHandler->mModifyHandlerPtrMap.size(), 1);
handler = dirHandler->mModifyHandlerPtrMap.begin()->second;
}
auto& readerMap = handler->mNameReaderMap;
EXPECT_EQ(readerMap.size(), 1);
auto& readerArray = readerMap.begin()->second;
EXPECT_EQ(readerArray.size(), 1);
auto& reader = readerArray.front();
auto& eo = reader->mEOOption;
EXPECT_TRUE(eo);
std::set<std::string> hashKeySet;
for (auto& cpt : eo->rangeCheckpointPtrs) {
hashKeySet.insert(cpt->data.hash_key());
}
EXPECT_EQ(hashKeySet.size(), kConcurrency);
EXPECT_GE(gRangeCheckpoints.size(), kConcurrency);
std::unordered_map<std::string, std::vector<uint64_t> > hashKeySeqIDs;
for (auto& cpt : gRangeCheckpoints) {
hashKeySeqIDs[cpt->data.hash_key()].push_back(cpt->data.sequence_id());
}
for (auto& iter : hashKeySeqIDs) {
auto sortedSequenceIDs = iter.second;
std::sort(sortedSequenceIDs.begin(), sortedSequenceIDs.end());
EXPECT_EQ(iter.second, sortedSequenceIDs);
}
CaseCleanUp();
gEnableExactlyOnce = false;
LOG_INFO(sLogger, ("TestExactlyOnceDataSendSequence() end", time(NULL)));
}
// Test if data read will block as expected when send is blocked.
// Reader will be blocked only when all concurrency is blocked.
//
// Procedure: generate N logs continuously, use MockExactlyOnceSend to catch checkpoints,
// after N logs are sent, block specified concurrency in MockExactlyOnceSend.
void SenderUnittest::TestExactlyOncePartialBlockConcurrentSend() {
LOG_INFO(sLogger, ("TestExactlyOncePartialBlockConcurrentSend() begin", time(NULL)));
auto bakClientQuotaSendRetryIntervalMax = INT32_FLAG(client_quota_send_retry_interval_max);
auto bakClientQuotaSendConcurrencyMin = INT32_FLAG(client_quota_send_concurrency_min);
auto bakClientQuotaSendRetryInterval = INT32_FLAG(client_quota_send_retry_interval);
INT32_FLAG(client_quota_send_retry_interval_max) = 1;
INT32_FLAG(client_quota_send_concurrency_min) = kConcurrency;
INT32_FLAG(client_quota_send_retry_interval) = 1;
gEnableExactlyOnce = true;
EnableNetWork();
CaseSetUp(false, true, true, 1, false, -1, 0, 900);
std::string dir = gRootDir + PATH_SEPARATOR + "ExactlyOncePartialBlockConcurrentSend";
bfs::create_directories(dir);
auto globalRangeCheckpoints2HashKeySet = [](std::vector<std::string>* hashKeys = nullptr) {
std::set<std::string> hashKeySet;
for (auto& cpt : gRangeCheckpoints) {
hashKeySet.insert(cpt->data.hash_key());
if (hashKeys) {
hashKeys->push_back(cpt->data.hash_key());
}
}
return hashKeySet;
};
LOG_INFO(sLogger, ("send round 1 data", "collect checkpoint keys"));
for (size_t round = 0; round < kConcurrency * 2; ++round) {
OneJob(100, dir, "a", true, time(NULL));
usleep(100 * 1000);
}
WaitForFileBeenRead();
sleep(5);
std::vector<std::string> hashKeys;
{
// All concurrency have been used.
auto hashKeySet = globalRangeCheckpoints2HashKeySet(&hashKeys);
EXPECT_EQ(hashKeySet.size(), kConcurrency);
gRangeCheckpoints.clear();
}
const auto blockedIndex = kConcurrency / 2;
const auto blockedHashKey = hashKeys[blockedIndex];
LOG_INFO(sLogger, ("block hash key", blockedHashKey)("send round 2 data", ""));
gBlockedHashKeySet.insert(blockedHashKey);
for (size_t round = 0; round < kConcurrency * 2; ++round) {
OneJob(100, dir, "a", true, time(NULL));
usleep(100 * 1000);
}
WaitForFileBeenRead();
sleep(5);
{
// Concurrency except for bloced one have been used.
auto hashKeySet = globalRangeCheckpoints2HashKeySet();
EXPECT_EQ(hashKeySet.size(), kConcurrency - 1);
EXPECT_TRUE(hashKeySet.find(blockedHashKey) == hashKeySet.end());
gRangeCheckpoints.clear();
}
LOG_INFO(sLogger, ("unblock hash key", blockedHashKey));
gBlockedHashKeySet.clear();
for (size_t round = 0; round < kConcurrency * 2; ++round) {
OneJob(100, dir, "a", true, time(NULL));
usleep(100 * 1000);
}
WaitForFileBeenRead();
sleep(5);
{
// The blocked data can be sent now.
auto hashKeySet = globalRangeCheckpoints2HashKeySet();
EXPECT_GE(hashKeySet.size(), 1);
EXPECT_TRUE(hashKeySet.find(blockedHashKey) != hashKeySet.end());
gRangeCheckpoints.clear();
}
LOG_INFO(sLogger, ("send round 4 data", "recovery"));
for (size_t round = 0; round < kConcurrency * 2; ++round) {
OneJob(100, dir, "a", true, time(NULL));
usleep(100 * 1000);
}
WaitForFileBeenRead();
sleep(5);
{
// All concurrency have been used.
auto hashKeySet = globalRangeCheckpoints2HashKeySet();
EXPECT_EQ(hashKeySet.size(), kConcurrency);
gRangeCheckpoints.clear();
}
CaseCleanUp();
gEnableExactlyOnce = false;
INT32_FLAG(client_quota_send_retry_interval_max) = bakClientQuotaSendRetryIntervalMax;
INT32_FLAG(client_quota_send_concurrency_min) = bakClientQuotaSendConcurrencyMin;
INT32_FLAG(client_quota_send_retry_interval) = bakClientQuotaSendRetryInterval;
LOG_INFO(sLogger, ("TestExactlyOncePartialBlockConcurrentSend() end", time(NULL)));
}
// Test if data read will block as expected when send is blocked.
// Reader will be blocked only when all concurrency is blocked.
//
// Procedure: generate N logs continuously, use MockExactlyOnceSend to catch checkpoints,
// after N logs are sent, block all concurrency in MockExactlyOnceSend.
void SenderUnittest::TestExactlyOnceCompleteBlockConcurrentSend() {
LOG_INFO(sLogger, ("TestExactlyOnceCompleteBlockConcurrentSend() begin", time(NULL)));
auto bakClientQuotaSendRetryIntervalMax = INT32_FLAG(client_quota_send_retry_interval_max);
auto bakClientQuotaSendConcurrencyMin = INT32_FLAG(client_quota_send_concurrency_min);
auto bakClientQuotaSendRetryInterval = INT32_FLAG(client_quota_send_retry_interval);
INT32_FLAG(client_quota_send_retry_interval_max) = 1;
INT32_FLAG(client_quota_send_concurrency_min) = kConcurrency;
INT32_FLAG(client_quota_send_retry_interval) = 1;
gEnableExactlyOnce = true;
EnableNetWork();
CaseSetUp(false, true, true, 1, false, -1, 0, 900);
std::string dir = gRootDir + PATH_SEPARATOR + "ExactlyOnceCompleteBlockConcurrentSend";
bfs::create_directories(dir);
auto globalRangeCheckpoints2HashKeySet = [](std::vector<std::string>* hashKeys = nullptr) {
std::set<std::string> hashKeySet;
for (auto& cpt : gRangeCheckpoints) {
hashKeySet.insert(cpt->data.hash_key());
if (hashKeys) {
hashKeys->push_back(cpt->data.hash_key());
}
}
return hashKeySet;
};
LOG_INFO(sLogger, ("send round 1 data", "collect checkpoint keys"));
for (size_t round = 0; round < kConcurrency * 2; ++round) {
OneJob(100, dir, "a", true, time(NULL));
usleep(100 * 1000);
}
WaitForFileBeenRead();
sleep(5);
std::vector<std::string> hashKeys;
{
// All concurrency have been used.
auto hashKeySet = globalRangeCheckpoints2HashKeySet(&hashKeys);
EXPECT_EQ(hashKeySet.size(), kConcurrency);
gRangeCheckpoints.clear();
}
LOG_INFO(sLogger, ("block all hash keys", "send round 2 data"));
gBlockedHashKeySet.insert(hashKeys.begin(), hashKeys.end());
for (size_t round = 0; round < kConcurrency * 2; ++round) {
OneJob(100, dir, "a", true, time(NULL));
usleep(100 * 1000);
}
WaitForFileBeenRead();
sleep(5);
{
// No data can be sent.
auto hashKeySet = globalRangeCheckpoints2HashKeySet();
EXPECT_EQ(hashKeySet.size(), 0);
EXPECT_EQ(gRangeCheckpoints.size(), 0);
}
const auto unlockedIndex = kConcurrency / 2;
const auto unlockedHashKey = hashKeys[unlockedIndex];
LOG_INFO(sLogger, ("unblock one hash key", unlockedHashKey));
{
std::lock_guard<std::mutex> lock(gMockExactlyOnceSendLock);
gBlockedHashKeySet.erase(unlockedHashKey);
}
sleep(INT32_FLAG(client_quota_send_retry_interval_max) * 2);
{
// All data is send through the concurrency.
auto hashKeySet = globalRangeCheckpoints2HashKeySet();
EXPECT_GE(hashKeySet.size(), 1);
EXPECT_TRUE(hashKeySet.find(unlockedHashKey) != hashKeySet.end());
gRangeCheckpoints.clear();
}
LOG_INFO(sLogger, ("unblocked all hash keys", "send round 3 data"));
gBlockedHashKeySet.clear();
for (size_t round = 0; round < kConcurrency * 2; ++round) {
OneJob(100, dir, "a", true, time(NULL));
usleep(100 * 1000);
}
WaitForFileBeenRead();
sleep(5);
{
// All concurrency have been used.
auto hashKeySet = globalRangeCheckpoints2HashKeySet();
EXPECT_EQ(hashKeySet.size(), kConcurrency);
gRangeCheckpoints.clear();
}
CaseCleanUp();
gEnableExactlyOnce = false;
INT32_FLAG(client_quota_send_retry_interval_max) = bakClientQuotaSendRetryIntervalMax;
INT32_FLAG(client_quota_send_concurrency_min) = bakClientQuotaSendConcurrencyMin;
INT32_FLAG(client_quota_send_retry_interval) = bakClientQuotaSendRetryInterval;
LOG_INFO(sLogger, ("TestExactlyOnceCompleteBlockConcurrentSend() end", time(NULL)));
}
} // namespace logtail
int main(int argc, char** argv) {
logtail::Logger::Instance().InitGlobalLoggers();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}