in core/unittest/sender/SenderUnittest.cpp [671:791]
static void MockAsyncSendInner(const std::string& projectName,
const std::string& logstore,
const std::string& logData,
SEND_DATA_TYPE dataType,
int32_t rawSize,
sls_logs::SlsCompressType compressType,
SendClosure* sendClosure) {
LOG_INFO(sLogger, ("MockAsyncSend, projectName", projectName)("logstore", logstore)("dataType", dataType));
vector<LogGroup> logGroupVec;
Sender::ParseLogGroupFromString(logData, dataType, rawSize, compressType, logGroupVec);
if (gDisableIpFlag && gClient != NULL && gDisabledIp.size() > 0) {
if (gClient->GetRawSlsHost().find(gDisabledIp) != std::string::npos) {
auto sr = new sdk::PostLogStoreLogsResponse;
sr->statusCode = 500;
sr->requestId = "";
sendClosure->OnFail(sr, sdk::LOGE_REQUEST_ERROR, "timeout to connect network");
return;
}
}
bool projectDisabled = true;
for (vector<LogGroup>::iterator iter = logGroupVec.begin(); iter != logGroupVec.end(); ++iter) {
// Record LogGroup information for debug.
{
LogGroup& logGroup = *iter;
std::string logCase;
if (logGroup.logs_size() > 0) {
auto& log = logGroup.logs(0);
if (log.contents_size() >= 3) {
logCase = log.contents(2).value();
}
}
LOG_INFO(sLogger,
("LogGroupDebugInfo",
logCase)("Project", projectName)("Logstore", logstore)("LogCount", logGroup.logs_size()));
}
if (iter->logs_size() > 0) {
if (gNetWorkStat && iter->category() == "app_log") {
PTScopedLock lock(gBufferLogGroupsLock);
gBufferLogGroups.push_back(*iter);
}
if (gNetWorkStat && (projectName.find("_proj") != string::npos)) /* "1000000_proj" */
{
int prjIndex = atoi(projectName.substr(0, 7).c_str()) - 1000000;
if (prjIndex <= gProjectNetEnableIndex) {
projectDisabled = false;
gRecvLogGroupLock.lock();
gRcvLogGroup = *iter;
gCounter += gRcvLogGroup.logs_size();
gMessageSize += gRcvLogGroup.ByteSize();
gRecvLogGroupLock.unlock();
} else {
// printf("Reject %s %d\n", projectName.c_str(), gProjectNetEnableIndex);
++gAsynProjectSendFailCount;
}
}
if (gNetWorkStat && projectName == STRING_FLAG(profile_project_name)
&& iter->category() == "logtail_status_profile") {
gStatusCount++;
gStatusLogGroup = *iter;
}
}
}
// can't put this code block in for, if have multi loggroup, the sendClosure will be destroyed multi times.
if (gNetWorkStat && !projectDisabled) {
// printf("[#MockAsyncSend] success %s %s %d.\n", projectName.c_str(), logstore.c_str(), rawSize);
sdk::PostLogStoreLogsResponse* sr = new sdk::PostLogStoreLogsResponse;
sr->statusCode = 200;
sr->requestId = "mock_request_id";
sendClosure->OnSuccess(sr);
} else {
sdk::PostLogStoreLogsResponse* sr = new sdk::PostLogStoreLogsResponse;
sr->statusCode = 500;
sr->requestId = "mock_request_id";
// printf("[#MockAsyncSend] fail %s %s %d.\n", projectName.c_str(), logstore.c_str(), rawSize);
if (gSendFailType == 1)
sendClosure->OnFail(sr, sdk::LOGE_REQUEST_ERROR, "timeout to connect network");
else if (gSendFailType == 2) {
int randInt = rand() % 4;
if (randInt == 0)
sendClosure->OnFail(sr, sdk::LOGE_REQUEST_ERROR, "Can not connect to server.");
else if (randInt == 1)
sendClosure->OnFail(sr, sdk::LOGE_WRITE_QUOTA_EXCEED, "project write quota exceed.");
else if (randInt == 2)
sendClosure->OnFail(sr, sdk::LOGE_SERVER_BUSY, "connect to server timeout.");
else if (randInt == 3)
sendClosure->OnFail(sr, sdk::LOGE_INTERNAL_SERVER_ERROR, "connect to server timeout.");
} else if (gSendFailType == 3) {
int randInt = rand() % 5;
if (randInt == 0)
sendClosure->OnFail(sr, sdk::LOGE_UNAUTHORIZED, "LOGE_UNAUTHORIZED.");
else if (randInt == 1)
sendClosure->OnFail(sr, sdk::LOGE_BAD_RESPONSE, "LOGE_BAD_RESPONSE.");
else if (randInt == 2)
sendClosure->OnFail(sr, sdk::LOGE_CATEGORY_NOT_EXIST, "LOGE_CATEGORY_NOT_EXIST.");
else if (randInt == 3)
sendClosure->OnFail(sr, sdk::LOGE_PROJECT_NOT_EXIST, "LOGE_PROJECT_NOT_EXIST.");
else if (randInt == 4)
sendClosure->OnFail(sr, sdk::LOGE_TOPIC_NOT_EXIST, "LOGE_TOPIC_NOT_EXIST.");
} else if (gSendFailType == 4) {
int randInt = rand() % 2;
if (randInt == 0) {
sendClosure->OnFail(sr, sdk::LOGE_WRITE_QUOTA_EXCEED, "LOGE_WRITE_QUOTA_EXCEED");
} else if (randInt == 1) {
sendClosure->OnFail(sr, sdk::LOGE_SHARD_WRITE_QUOTA_EXCEED, "LOGE_SHARD_WRITE_QUOTA_EXCEED");
}
} else if (gSendFailType == 5) {
int randInt = rand() % 2;
if (randInt == 0) {
sendClosure->OnFail(sr, sdk::LOGE_SERVER_BUSY, "LOGE_WRITE_QUOTA_EXCEED");
} else if (randInt == 1) {
sendClosure->OnFail(sr, sdk::LOGE_INTERNAL_SERVER_ERROR, "LOGE_SHARD_WRITE_QUOTA_EXCEED");
}
}
}
}