in core/unittest/flusher/FlusherSLSUnittest.cpp [1429:1762]
void FlusherSLSUnittest::TestSend() {
{
// exactly once enabled
// create flusher
Json::Value configJson, optionalGoPipeline;
string configStr, errorMsg;
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com",
"Aliuid": "123456789"
}
)";
ParseJsonTable(configStr, configJson, errorMsg);
FlusherSLS flusher;
CollectionPipelineContext ctx;
ctx.SetConfigName("test_config");
ctx.SetExactlyOnceFlag(true);
flusher.SetContext(ctx);
flusher.SetMetricsRecordRef(FlusherSLS::sName, "1");
flusher.Init(configJson, optionalGoPipeline);
// create exactly once queue
vector<RangeCheckpointPtr> checkpoints;
for (size_t i = 0; i < 2; ++i) {
auto cpt = make_shared<RangeCheckpoint>();
cpt->index = i;
cpt->data.set_hash_key("hash_key_" + ToString(i));
cpt->data.set_sequence_id(0);
checkpoints.emplace_back(cpt);
}
QueueKey eooKey = QueueKeyManager::GetInstance()->GetKey("eoo");
ExactlyOnceQueueManager::GetInstance()->CreateOrUpdateQueue(
eooKey, ProcessQueueManager::sMaxPriority, flusher.GetContext(), checkpoints);
{
// replayed group
PipelineEventGroup group(make_shared<SourceBuffer>());
group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id"));
group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1");
group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid");
group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic");
auto cpt = make_shared<RangeCheckpoint>();
cpt->index = 1;
cpt->fbKey = eooKey;
cpt->data.set_hash_key("hash_key_1");
cpt->data.set_sequence_id(0);
cpt->data.set_read_offset(0);
cpt->data.set_read_length(10);
group.SetExactlyOnceCheckpoint(cpt);
auto e = group.AddLogEvent();
e->SetTimestamp(1234567890);
e->SetContent(string("content_key"), string("content_value"));
APSARA_TEST_TRUE(flusher.Send(std::move(group)));
vector<SenderQueueItem*> res;
ExactlyOnceQueueManager::GetInstance()->GetAvailableSenderQueueItems(res, 80);
APSARA_TEST_EQUAL(1U, res.size());
auto item = static_cast<SLSSenderQueueItem*>(res[0]);
APSARA_TEST_EQUAL(RawDataType::EVENT_GROUP, item->mType);
APSARA_TEST_FALSE(item->mBufferOrNot);
APSARA_TEST_EQUAL(&flusher, item->mFlusher);
APSARA_TEST_EQUAL(eooKey, item->mQueueKey);
APSARA_TEST_EQUAL("hash_key_1", item->mShardHashKey);
APSARA_TEST_EQUAL(flusher.mLogstore, item->mLogstore);
APSARA_TEST_EQUAL(cpt, item->mExactlyOnceCheckpoint);
auto compressor
= CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4);
string output, errorMsg;
output.resize(item->mRawSize);
APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(output));
APSARA_TEST_EQUAL("topic", logGroup.topic());
APSARA_TEST_EQUAL("uuid", logGroup.machineuuid());
APSARA_TEST_EQUAL("172.0.0.1", logGroup.source());
APSARA_TEST_EQUAL(1, logGroup.logtags_size());
APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key());
APSARA_TEST_EQUAL(1, logGroup.logs_size());
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
APSARA_TEST_EQUAL("content_key", logGroup.logs(0).contents(0).key());
APSARA_TEST_EQUAL("content_value", logGroup.logs(0).contents(0).value());
ExactlyOnceQueueManager::GetInstance()->RemoveSenderQueueItem(eooKey, item);
}
{
// non-replay group
flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(1);
PipelineEventGroup group(make_shared<SourceBuffer>());
group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id"));
group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1");
group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid");
group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic");
auto cpt = make_shared<RangeCheckpoint>();
cpt->fbKey = eooKey;
cpt->data.set_read_offset(0);
cpt->data.set_read_length(10);
group.SetExactlyOnceCheckpoint(cpt);
auto e = group.AddLogEvent();
e->SetTimestamp(1234567890);
e->SetContent(string("content_key"), string("content_value"));
APSARA_TEST_TRUE(flusher.Send(std::move(group)));
vector<SenderQueueItem*> res;
ExactlyOnceQueueManager::GetInstance()->GetAvailableSenderQueueItems(res, 80);
APSARA_TEST_EQUAL(1U, res.size());
auto item = static_cast<SLSSenderQueueItem*>(res[0]);
APSARA_TEST_EQUAL(RawDataType::EVENT_GROUP, item->mType);
APSARA_TEST_FALSE(item->mBufferOrNot);
APSARA_TEST_EQUAL(&flusher, item->mFlusher);
APSARA_TEST_EQUAL(eooKey, item->mQueueKey);
APSARA_TEST_EQUAL("hash_key_0", item->mShardHashKey);
APSARA_TEST_EQUAL(flusher.mLogstore, item->mLogstore);
APSARA_TEST_EQUAL(checkpoints[0], item->mExactlyOnceCheckpoint);
auto compressor
= CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4);
string output, errorMsg;
output.resize(item->mRawSize);
APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(output));
APSARA_TEST_EQUAL("topic", logGroup.topic());
APSARA_TEST_EQUAL("uuid", logGroup.machineuuid());
APSARA_TEST_EQUAL("172.0.0.1", logGroup.source());
APSARA_TEST_EQUAL(1, logGroup.logtags_size());
APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key());
APSARA_TEST_EQUAL(1, logGroup.logs_size());
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
APSARA_TEST_EQUAL("content_key", logGroup.logs(0).contents(0).key());
APSARA_TEST_EQUAL("content_value", logGroup.logs(0).contents(0).value());
ExactlyOnceQueueManager::GetInstance()->RemoveSenderQueueItem(eooKey, item);
}
}
{
// normal flusher, without group batch
Json::Value configJson, optionalGoPipeline;
string configStr, errorMsg;
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com",
"Aliuid": "123456789",
"ShardHashKeys": [
"tag_key"
]
}
)";
ParseJsonTable(configStr, configJson, errorMsg);
FlusherSLS flusher;
flusher.SetContext(ctx);
flusher.SetMetricsRecordRef(FlusherSLS::sName, "1");
flusher.Init(configJson, optionalGoPipeline);
{
// empty group
PipelineEventGroup group(make_shared<SourceBuffer>());
APSARA_TEST_TRUE(flusher.Send(std::move(group)));
vector<SenderQueueItem*> res;
SenderQueueManager::GetInstance()->GetAvailableItems(res, 80);
APSARA_TEST_TRUE(res.empty());
}
{
// group
flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(1);
PipelineEventGroup group(make_shared<SourceBuffer>());
group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id"));
group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1");
group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid");
group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic");
group.SetTag(string("tag_key"), string("tag_value"));
auto e = group.AddLogEvent();
e->SetTimestamp(1234567890);
e->SetContent(string("content_key"), string("content_value"));
APSARA_TEST_TRUE(flusher.Send(std::move(group)));
vector<SenderQueueItem*> res;
SenderQueueManager::GetInstance()->GetAvailableItems(res, 80);
APSARA_TEST_EQUAL(1U, res.size());
auto item = static_cast<SLSSenderQueueItem*>(res[0]);
APSARA_TEST_EQUAL(RawDataType::EVENT_GROUP, item->mType);
APSARA_TEST_TRUE(item->mBufferOrNot);
APSARA_TEST_EQUAL(&flusher, item->mFlusher);
APSARA_TEST_EQUAL(flusher.mQueueKey, item->mQueueKey);
APSARA_TEST_EQUAL(CalcMD5("tag_value"), item->mShardHashKey);
APSARA_TEST_EQUAL(flusher.mLogstore, item->mLogstore);
auto compressor
= CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4);
string output, errorMsg;
output.resize(item->mRawSize);
APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(output));
APSARA_TEST_EQUAL("topic", logGroup.topic());
APSARA_TEST_EQUAL("uuid", logGroup.machineuuid());
APSARA_TEST_EQUAL("172.0.0.1", logGroup.source());
APSARA_TEST_EQUAL(2, logGroup.logtags_size());
APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key());
APSARA_TEST_EQUAL("tag_key", logGroup.logtags(1).key());
APSARA_TEST_EQUAL("tag_value", logGroup.logtags(1).value());
APSARA_TEST_EQUAL(1, logGroup.logs_size());
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
APSARA_TEST_EQUAL("content_key", logGroup.logs(0).contents(0).key());
APSARA_TEST_EQUAL("content_value", logGroup.logs(0).contents(0).value());
SenderQueueManager::GetInstance()->RemoveItem(item->mQueueKey, item);
flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(4000);
}
{
// oversized group
INT32_FLAG(max_send_log_group_size) = 1;
flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(1);
PipelineEventGroup group(make_shared<SourceBuffer>());
auto e = group.AddLogEvent();
e->SetTimestamp(1234567890);
e->SetContent(string("content_key"), string("content_value"));
APSARA_TEST_FALSE(flusher.Send(std::move(group)));
INT32_FLAG(max_send_log_group_size) = 10 * 1024 * 1024;
flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(4000);
}
}
{
// normal flusher, with group batch
Json::Value configJson, optionalGoPipeline;
string configStr, errorMsg;
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com",
"Aliuid": "123456789"
}
)";
ParseJsonTable(configStr, configJson, errorMsg);
FlusherSLS flusher;
flusher.SetContext(ctx);
flusher.SetMetricsRecordRef(FlusherSLS::sName, "1");
flusher.Init(configJson, optionalGoPipeline);
PipelineEventGroup group(make_shared<SourceBuffer>());
group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id"));
group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1");
group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid");
group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic");
{
auto e = group.AddLogEvent();
e->SetTimestamp(1234567890);
e->SetContent(string("content_key"), string("content_value"));
}
{
auto e = group.AddLogEvent();
e->SetTimestamp(1234567990);
e->SetContent(string("content_key"), string("content_value"));
}
flusher.mBatcher.GetGroupFlushStrategy()->SetMinSizeBytes(group.DataSize());
// flush the above two events from group item by the following event
{
auto e = group.AddLogEvent();
e->SetTimestamp(1234568990);
e->SetContent(string("content_key"), string("content_value"));
}
APSARA_TEST_TRUE(flusher.Send(std::move(group)));
vector<SenderQueueItem*> res;
SenderQueueManager::GetInstance()->GetAvailableItems(res, 80);
APSARA_TEST_EQUAL(1U, res.size());
auto item = static_cast<SLSSenderQueueItem*>(res[0]);
APSARA_TEST_EQUAL(RawDataType::EVENT_GROUP_LIST, item->mType);
APSARA_TEST_TRUE(item->mBufferOrNot);
APSARA_TEST_EQUAL(&flusher, item->mFlusher);
APSARA_TEST_EQUAL(flusher.mQueueKey, item->mQueueKey);
APSARA_TEST_EQUAL("", item->mShardHashKey);
APSARA_TEST_EQUAL(flusher.mLogstore, item->mLogstore);
auto compressor
= CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4);
sls_logs::SlsLogPackageList packageList;
APSARA_TEST_TRUE(packageList.ParseFromString(item->mData));
APSARA_TEST_EQUAL(2, packageList.packages_size());
uint32_t rawSize = 0;
for (size_t i = 0; i < 2; ++i) {
APSARA_TEST_EQUAL(sls_logs::SlsCompressType::SLS_CMP_LZ4, packageList.packages(i).compress_type());
string output, errorMsg;
rawSize += packageList.packages(i).uncompress_size();
output.resize(packageList.packages(i).uncompress_size());
APSARA_TEST_TRUE(compressor->UnCompress(packageList.packages(i).data(), output, errorMsg));
sls_logs::LogGroup logGroup;
APSARA_TEST_TRUE(logGroup.ParseFromString(output));
APSARA_TEST_EQUAL("topic", logGroup.topic());
APSARA_TEST_EQUAL("uuid", logGroup.machineuuid());
APSARA_TEST_EQUAL("172.0.0.1", logGroup.source());
APSARA_TEST_EQUAL(1, logGroup.logtags_size());
APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key());
APSARA_TEST_EQUAL(1, logGroup.logs_size());
if (i == 0) {
APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
} else {
APSARA_TEST_EQUAL(1234567990U, logGroup.logs(0).time());
}
APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
APSARA_TEST_EQUAL("content_key", logGroup.logs(0).contents(0).key());
APSARA_TEST_EQUAL("content_value", logGroup.logs(0).contents(0).value());
}
APSARA_TEST_EQUAL(rawSize, item->mRawSize);
SenderQueueManager::GetInstance()->RemoveItem(item->mQueueKey, item);
flusher.FlushAll();
res.clear();
SenderQueueManager::GetInstance()->GetAvailableItems(res, 80);
for (auto& tmp : res) {
SenderQueueManager::GetInstance()->RemoveItem(tmp->mQueueKey, tmp);
}
flusher.mBatcher.GetGroupFlushStrategy()->SetMinSizeBytes(256 * 1024);
}
}