void FlusherSLSUnittest::TestSend()

in core/unittest/flusher/FlusherSLSUnittest.cpp [1429:1762]


void FlusherSLSUnittest::TestSend() {
    {
        // exactly once enabled
        // create flusher
        Json::Value configJson, optionalGoPipeline;
        string configStr, errorMsg;
        configStr = R"(
            {
                "Type": "flusher_sls",
                "Project": "test_project",
                "Logstore": "test_logstore",
                "Region": "test_region",
                "Endpoint": "test_region.log.aliyuncs.com",
                "Aliuid": "123456789"
            }
        )";
        ParseJsonTable(configStr, configJson, errorMsg);
        FlusherSLS flusher;
        CollectionPipelineContext ctx;
        ctx.SetConfigName("test_config");
        ctx.SetExactlyOnceFlag(true);
        flusher.SetContext(ctx);
        flusher.SetMetricsRecordRef(FlusherSLS::sName, "1");
        flusher.Init(configJson, optionalGoPipeline);

        // create exactly once queue
        vector<RangeCheckpointPtr> checkpoints;
        for (size_t i = 0; i < 2; ++i) {
            auto cpt = make_shared<RangeCheckpoint>();
            cpt->index = i;
            cpt->data.set_hash_key("hash_key_" + ToString(i));
            cpt->data.set_sequence_id(0);
            checkpoints.emplace_back(cpt);
        }
        QueueKey eooKey = QueueKeyManager::GetInstance()->GetKey("eoo");
        ExactlyOnceQueueManager::GetInstance()->CreateOrUpdateQueue(
            eooKey, ProcessQueueManager::sMaxPriority, flusher.GetContext(), checkpoints);

        {
            // replayed group
            PipelineEventGroup group(make_shared<SourceBuffer>());
            group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id"));
            group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1");
            group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid");
            group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic");
            auto cpt = make_shared<RangeCheckpoint>();
            cpt->index = 1;
            cpt->fbKey = eooKey;
            cpt->data.set_hash_key("hash_key_1");
            cpt->data.set_sequence_id(0);
            cpt->data.set_read_offset(0);
            cpt->data.set_read_length(10);
            group.SetExactlyOnceCheckpoint(cpt);
            auto e = group.AddLogEvent();
            e->SetTimestamp(1234567890);
            e->SetContent(string("content_key"), string("content_value"));

            APSARA_TEST_TRUE(flusher.Send(std::move(group)));
            vector<SenderQueueItem*> res;
            ExactlyOnceQueueManager::GetInstance()->GetAvailableSenderQueueItems(res, 80);
            APSARA_TEST_EQUAL(1U, res.size());
            auto item = static_cast<SLSSenderQueueItem*>(res[0]);
            APSARA_TEST_EQUAL(RawDataType::EVENT_GROUP, item->mType);
            APSARA_TEST_FALSE(item->mBufferOrNot);
            APSARA_TEST_EQUAL(&flusher, item->mFlusher);
            APSARA_TEST_EQUAL(eooKey, item->mQueueKey);
            APSARA_TEST_EQUAL("hash_key_1", item->mShardHashKey);
            APSARA_TEST_EQUAL(flusher.mLogstore, item->mLogstore);
            APSARA_TEST_EQUAL(cpt, item->mExactlyOnceCheckpoint);

            auto compressor
                = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4);
            string output, errorMsg;
            output.resize(item->mRawSize);
            APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg));

            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(output));
            APSARA_TEST_EQUAL("topic", logGroup.topic());
            APSARA_TEST_EQUAL("uuid", logGroup.machineuuid());
            APSARA_TEST_EQUAL("172.0.0.1", logGroup.source());
            APSARA_TEST_EQUAL(1, logGroup.logtags_size());
            APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key());
            APSARA_TEST_EQUAL(1, logGroup.logs_size());
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
            APSARA_TEST_EQUAL("content_key", logGroup.logs(0).contents(0).key());
            APSARA_TEST_EQUAL("content_value", logGroup.logs(0).contents(0).value());

            ExactlyOnceQueueManager::GetInstance()->RemoveSenderQueueItem(eooKey, item);
        }
        {
            // non-replay group
            flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(1);
            PipelineEventGroup group(make_shared<SourceBuffer>());
            group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id"));
            group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1");
            group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid");
            group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic");
            auto cpt = make_shared<RangeCheckpoint>();
            cpt->fbKey = eooKey;
            cpt->data.set_read_offset(0);
            cpt->data.set_read_length(10);
            group.SetExactlyOnceCheckpoint(cpt);
            auto e = group.AddLogEvent();
            e->SetTimestamp(1234567890);
            e->SetContent(string("content_key"), string("content_value"));

            APSARA_TEST_TRUE(flusher.Send(std::move(group)));
            vector<SenderQueueItem*> res;
            ExactlyOnceQueueManager::GetInstance()->GetAvailableSenderQueueItems(res, 80);
            APSARA_TEST_EQUAL(1U, res.size());
            auto item = static_cast<SLSSenderQueueItem*>(res[0]);
            APSARA_TEST_EQUAL(RawDataType::EVENT_GROUP, item->mType);
            APSARA_TEST_FALSE(item->mBufferOrNot);
            APSARA_TEST_EQUAL(&flusher, item->mFlusher);
            APSARA_TEST_EQUAL(eooKey, item->mQueueKey);
            APSARA_TEST_EQUAL("hash_key_0", item->mShardHashKey);
            APSARA_TEST_EQUAL(flusher.mLogstore, item->mLogstore);
            APSARA_TEST_EQUAL(checkpoints[0], item->mExactlyOnceCheckpoint);

            auto compressor
                = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4);
            string output, errorMsg;
            output.resize(item->mRawSize);
            APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg));

            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(output));
            APSARA_TEST_EQUAL("topic", logGroup.topic());
            APSARA_TEST_EQUAL("uuid", logGroup.machineuuid());
            APSARA_TEST_EQUAL("172.0.0.1", logGroup.source());
            APSARA_TEST_EQUAL(1, logGroup.logtags_size());
            APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key());
            APSARA_TEST_EQUAL(1, logGroup.logs_size());
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
            APSARA_TEST_EQUAL("content_key", logGroup.logs(0).contents(0).key());
            APSARA_TEST_EQUAL("content_value", logGroup.logs(0).contents(0).value());

            ExactlyOnceQueueManager::GetInstance()->RemoveSenderQueueItem(eooKey, item);
        }
    }
    {
        // normal flusher, without group batch
        Json::Value configJson, optionalGoPipeline;
        string configStr, errorMsg;
        configStr = R"(
            {
                "Type": "flusher_sls",
                "Project": "test_project",
                "Logstore": "test_logstore",
                "Region": "test_region",
                "Endpoint": "test_region.log.aliyuncs.com",
                "Aliuid": "123456789",
                "ShardHashKeys": [
                    "tag_key"
                ]
            }
        )";
        ParseJsonTable(configStr, configJson, errorMsg);
        FlusherSLS flusher;
        flusher.SetContext(ctx);
        flusher.SetMetricsRecordRef(FlusherSLS::sName, "1");
        flusher.Init(configJson, optionalGoPipeline);
        {
            // empty group
            PipelineEventGroup group(make_shared<SourceBuffer>());
            APSARA_TEST_TRUE(flusher.Send(std::move(group)));
            vector<SenderQueueItem*> res;
            SenderQueueManager::GetInstance()->GetAvailableItems(res, 80);
            APSARA_TEST_TRUE(res.empty());
        }
        {
            // group
            flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(1);
            PipelineEventGroup group(make_shared<SourceBuffer>());
            group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id"));
            group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1");
            group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid");
            group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic");
            group.SetTag(string("tag_key"), string("tag_value"));
            auto e = group.AddLogEvent();
            e->SetTimestamp(1234567890);
            e->SetContent(string("content_key"), string("content_value"));

            APSARA_TEST_TRUE(flusher.Send(std::move(group)));
            vector<SenderQueueItem*> res;
            SenderQueueManager::GetInstance()->GetAvailableItems(res, 80);
            APSARA_TEST_EQUAL(1U, res.size());
            auto item = static_cast<SLSSenderQueueItem*>(res[0]);
            APSARA_TEST_EQUAL(RawDataType::EVENT_GROUP, item->mType);
            APSARA_TEST_TRUE(item->mBufferOrNot);
            APSARA_TEST_EQUAL(&flusher, item->mFlusher);
            APSARA_TEST_EQUAL(flusher.mQueueKey, item->mQueueKey);
            APSARA_TEST_EQUAL(CalcMD5("tag_value"), item->mShardHashKey);
            APSARA_TEST_EQUAL(flusher.mLogstore, item->mLogstore);

            auto compressor
                = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4);
            string output, errorMsg;
            output.resize(item->mRawSize);
            APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg));

            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(output));
            APSARA_TEST_EQUAL("topic", logGroup.topic());
            APSARA_TEST_EQUAL("uuid", logGroup.machineuuid());
            APSARA_TEST_EQUAL("172.0.0.1", logGroup.source());
            APSARA_TEST_EQUAL(2, logGroup.logtags_size());
            APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key());
            APSARA_TEST_EQUAL("tag_key", logGroup.logtags(1).key());
            APSARA_TEST_EQUAL("tag_value", logGroup.logtags(1).value());
            APSARA_TEST_EQUAL(1, logGroup.logs_size());
            APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
            APSARA_TEST_EQUAL("content_key", logGroup.logs(0).contents(0).key());
            APSARA_TEST_EQUAL("content_value", logGroup.logs(0).contents(0).value());

            SenderQueueManager::GetInstance()->RemoveItem(item->mQueueKey, item);
            flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(4000);
        }
        {
            // oversized group
            INT32_FLAG(max_send_log_group_size) = 1;
            flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(1);
            PipelineEventGroup group(make_shared<SourceBuffer>());
            auto e = group.AddLogEvent();
            e->SetTimestamp(1234567890);
            e->SetContent(string("content_key"), string("content_value"));
            APSARA_TEST_FALSE(flusher.Send(std::move(group)));
            INT32_FLAG(max_send_log_group_size) = 10 * 1024 * 1024;
            flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(4000);
        }
    }
    {
        // normal flusher, with group batch
        Json::Value configJson, optionalGoPipeline;
        string configStr, errorMsg;
        configStr = R"(
            {
                "Type": "flusher_sls",
                "Project": "test_project",
                "Logstore": "test_logstore",
                "Region": "test_region",
                "Endpoint": "test_region.log.aliyuncs.com",
                "Aliuid": "123456789"
            }
        )";
        ParseJsonTable(configStr, configJson, errorMsg);
        FlusherSLS flusher;
        flusher.SetContext(ctx);
        flusher.SetMetricsRecordRef(FlusherSLS::sName, "1");
        flusher.Init(configJson, optionalGoPipeline);

        PipelineEventGroup group(make_shared<SourceBuffer>());
        group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id"));
        group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1");
        group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid");
        group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic");
        {
            auto e = group.AddLogEvent();
            e->SetTimestamp(1234567890);
            e->SetContent(string("content_key"), string("content_value"));
        }
        {
            auto e = group.AddLogEvent();
            e->SetTimestamp(1234567990);
            e->SetContent(string("content_key"), string("content_value"));
        }
        flusher.mBatcher.GetGroupFlushStrategy()->SetMinSizeBytes(group.DataSize());
        // flush the above two events from group item by the following event
        {
            auto e = group.AddLogEvent();
            e->SetTimestamp(1234568990);
            e->SetContent(string("content_key"), string("content_value"));
        }

        APSARA_TEST_TRUE(flusher.Send(std::move(group)));
        vector<SenderQueueItem*> res;
        SenderQueueManager::GetInstance()->GetAvailableItems(res, 80);
        APSARA_TEST_EQUAL(1U, res.size());
        auto item = static_cast<SLSSenderQueueItem*>(res[0]);
        APSARA_TEST_EQUAL(RawDataType::EVENT_GROUP_LIST, item->mType);
        APSARA_TEST_TRUE(item->mBufferOrNot);
        APSARA_TEST_EQUAL(&flusher, item->mFlusher);
        APSARA_TEST_EQUAL(flusher.mQueueKey, item->mQueueKey);
        APSARA_TEST_EQUAL("", item->mShardHashKey);
        APSARA_TEST_EQUAL(flusher.mLogstore, item->mLogstore);

        auto compressor
            = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4);

        sls_logs::SlsLogPackageList packageList;
        APSARA_TEST_TRUE(packageList.ParseFromString(item->mData));
        APSARA_TEST_EQUAL(2, packageList.packages_size());
        uint32_t rawSize = 0;
        for (size_t i = 0; i < 2; ++i) {
            APSARA_TEST_EQUAL(sls_logs::SlsCompressType::SLS_CMP_LZ4, packageList.packages(i).compress_type());

            string output, errorMsg;
            rawSize += packageList.packages(i).uncompress_size();
            output.resize(packageList.packages(i).uncompress_size());
            APSARA_TEST_TRUE(compressor->UnCompress(packageList.packages(i).data(), output, errorMsg));

            sls_logs::LogGroup logGroup;
            APSARA_TEST_TRUE(logGroup.ParseFromString(output));
            APSARA_TEST_EQUAL("topic", logGroup.topic());
            APSARA_TEST_EQUAL("uuid", logGroup.machineuuid());
            APSARA_TEST_EQUAL("172.0.0.1", logGroup.source());
            APSARA_TEST_EQUAL(1, logGroup.logtags_size());
            APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key());
            APSARA_TEST_EQUAL(1, logGroup.logs_size());
            if (i == 0) {
                APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time());
            } else {
                APSARA_TEST_EQUAL(1234567990U, logGroup.logs(0).time());
            }
            APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size());
            APSARA_TEST_EQUAL("content_key", logGroup.logs(0).contents(0).key());
            APSARA_TEST_EQUAL("content_value", logGroup.logs(0).contents(0).value());
        }
        APSARA_TEST_EQUAL(rawSize, item->mRawSize);

        SenderQueueManager::GetInstance()->RemoveItem(item->mQueueKey, item);
        flusher.FlushAll();
        res.clear();
        SenderQueueManager::GetInstance()->GetAvailableItems(res, 80);
        for (auto& tmp : res) {
            SenderQueueManager::GetInstance()->RemoveItem(tmp->mQueueKey, tmp);
        }
        flusher.mBatcher.GetGroupFlushStrategy()->SetMinSizeBytes(256 * 1024);
    }
}