sample/log_producer_benchmark.c (291 lines of code) (raw):
//
// Created by ZhangCheng on 26/12/2017.
//
#include "inner_log.h"
#include "log_api.h"
#include "log_producer_config.h"
#include "log_producer_client.h"
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include "log_multi_thread.h"
int producer_send_thread_count = 16;
void builder_speed_test(int32_t logsPerGroup)
{
int32_t startTime = time(NULL);
int32_t count = 0;
for (; count < 1000000; ++count)
{
log_group_builder* bder = log_group_create();
add_source(bder,"mSource",sizeof("mSource"));
add_topic(bder,"mTopic", sizeof("mTopic"));
add_tag(bder, "taga_key", strlen("taga_key"), "taga_value", strlen("taga_value"));
add_tag(bder, "tagb_key", strlen("tagb_key"), "tagb_value", strlen("tagb_value"));
add_pack_id(bder, "123456789ABC", strlen("123456789ABC"), 0);
int i = 0;
for (; i < logsPerGroup; ++i)
{
char * keys[] = {
"content_key_1",
"content_key_1",
"content_key_1",
"content_key_1",
"content_key_1",
"content_key_1",
"content_key_1",
"content_key_1",
"content_key_1",
"content_key_1"
};
char * vals[] = {
"1abcdefghijklmnopqrstuvwxyz01234567891abcdefghijklmnopqrstuvwxyz01234567891abcdefghijklmnopqrstuvwxyz01234567891abcdefghijklmnopqrstuvwxyz0123456789",
"2abcdefghijklmnopqrstuvwxyz0123456789",
"2abcdefghijklmnopqrstuvwxyz0123456789",
"2abcdefghijklmnopqrstuvwxyz0123456789",
"2abcdefghijklmnopqrstuvwxyz0123456789",
"2abcdefghijklmnopqrstuvwxyz0123456789",
"2abcdefghijklmnopqrstuvwxyz0123456789",
"2abcdefghijklmnopqrstuvwxyz0123456789",
"2abcdefghijklmnopqrstuvwxyz0123456789",
"xxxxxxxxxxxxxxxxxxxxx"
};
size_t key_lens[] = {
strlen("content_key_1"),
strlen("content_key_1"),
strlen("content_key_1"),
strlen("content_key_1"),
strlen("content_key_1"),
strlen("content_key_1"),
strlen("content_key_1"),
strlen("content_key_1"),
strlen("content_key_1"),
strlen("index")
};
size_t val_lens[] = {
strlen("1abcdefghijklmnopqrstuvwxyz01234567891abcdefghijklmnopqrstuvwxyz01234567891abcdefghijklmnopqrstuvwxyz01234567891abcdefghijklmnopqrstuvwxyz0123456789"),
strlen("2abcdefghijklmnopqrstuvwxyz0123456789"),
strlen("2abcdefghijklmnopqrstuvwxyz0123456789"),
strlen("2abcdefghijklmnopqrstuvwxyz0123456789"),
strlen("2abcdefghijklmnopqrstuvwxyz0123456789"),
strlen("2abcdefghijklmnopqrstuvwxyz0123456789"),
strlen("2abcdefghijklmnopqrstuvwxyz0123456789"),
strlen("2abcdefghijklmnopqrstuvwxyz0123456789"),
strlen("2abcdefghijklmnopqrstuvwxyz0123456789"),
strlen("xxxxxxxxxxxxxxxxxxxxx")
};
add_log_full(bder, (uint32_t)time(NULL), 10, (char **)keys, (size_t *)key_lens, (char **)vals, (size_t *)val_lens);
}
log_buf buf = serialize_to_proto_buf_with_malloc(bder);
if (count % 1000 == 0)
{
int32_t nowTime = time(NULL);
aos_error_log("Done : %d %d %d %d\n", count, logsPerGroup, nowTime - startTime, (int32_t)buf.n_buffer);
}
log_group_destroy(bder);
}
aos_error_log("total time sec %d ", (time(NULL) - startTime));
}
void on_log_send_done(const char * config_name, log_producer_result result, size_t log_bytes, size_t compressed_bytes, const char * req_id, const char * message, const unsigned char * raw_buffer)
{
if (result == LOG_PRODUCER_OK)
{
if (aos_log_level == AOS_LOG_DEBUG)
{
printf("send success, config : %s, result : %d, log bytes : %d, compressed bytes : %d, request id : %s \n",
config_name, (result),
(int)log_bytes, (int)compressed_bytes, req_id);
}
}
else
{
printf("send fail, config : %s, result : %d, log bytes : %d, compressed bytes : %d, request id : %s, error message : %s\n",
config_name, (result),
(int)log_bytes, (int)compressed_bytes, req_id, message);
}
}
typedef struct _multi_write_log_param
{
log_producer_client * client;
size_t send_count;
}multi_write_log_param;
void * write_log_thread(void* param)
{
aos_error_log("Thread start");
multi_write_log_param * write_log_param = (multi_write_log_param *)param;
size_t i = 0;
char buffer[512];
for (; i < write_log_param->send_count; ++i)
{
sprintf(buffer, "The Logstore is a unit in Log Service for the collection, storage, and query of log data, %d", (int)i);
log_producer_client_add_log(write_log_param->client, 8, "Log group", "A log group is a collection of logs and is the basic unit for writing and reading",
"Log topic", "Logs in a Logstore can be classified by log topics",
"Project", "The project is the resource management unit in Log Service and is used to isolate and control resources",
"Logstore", "The Logstore is a unit in Log Service for the collection, storage, and query of log data");
}
aos_error_log("Thread done");
return NULL;
}
log_producer * create_log_producer_wrapper(on_log_producer_send_done_function on_send_done)
{
log_producer_config * config = create_log_producer_config();
// endpoint list: https://help.aliyun.com/document_detail/29008.html
log_producer_config_set_endpoint(config, "${your_endpoint}");
log_producer_config_set_project(config, "${your_project}");
log_producer_config_set_logstore(config, "${your_logstore}");
log_producer_config_set_access_id(config, "${your_access_key_id}");
log_producer_config_set_access_key(config, "${your_access_key_secret}");
//log_producer_config_set_remote_address(config, "192.168.12.12");
// if you do not need topic or tag, comment it
log_producer_config_set_topic(config, "test_topic");
log_producer_config_add_tag(config, "tag_1", "val_1");
log_producer_config_add_tag(config, "tag_2", "val_2");
log_producer_config_add_tag(config, "tag_3", "val_3");
log_producer_config_add_tag(config, "tag_4", "val_4");
log_producer_config_add_tag(config, "tag_5", "val_5");
// set resource params
log_producer_config_set_packet_log_bytes(config, 4*1024*1024);
log_producer_config_set_packet_log_count(config, 4096);
log_producer_config_set_packet_timeout(config, 3000);
log_producer_config_set_max_buffer_limit(config, 64*1024*1024);
// set send thread
log_producer_config_set_send_thread_count(config, producer_send_thread_count);
return create_log_producer(config, on_send_done);
}
#define MUTLI_THREAD_COUNT 16
#define MUTLI_PRODUCER_COUNT 4
#define GLOBAL_THREAD_FLAG
void log_producer_multi_thread(size_t logsPerSecond)
{
logsPerSecond *= 100;
if (log_producer_env_init(LOG_GLOBAL_ALL) != LOG_PRODUCER_OK) {
exit(1);
}
#ifdef GLOBAL_THREAD_FLAG
log_producer_global_send_thread_init(producer_send_thread_count, 100000);
producer_send_thread_count = 0;
#endif
log_producer * producers[MUTLI_PRODUCER_COUNT];
log_producer_client * clients[MUTLI_PRODUCER_COUNT];
multi_write_log_param param[MUTLI_PRODUCER_COUNT];
int i = 0;
for (; i < MUTLI_PRODUCER_COUNT; ++i) {
producers[i] = create_log_producer_wrapper(on_log_send_done);
if (producers[i] == NULL)
{
printf("create log producer by config file fail \n");
exit(1);
}
clients[i] = get_log_producer_client(producers[i], NULL);
if (clients[i] == NULL)
{
printf("create log producer client by config file fail \n");
exit(1);
}
param[i].send_count = logsPerSecond;
param[i].client = clients[i] ;
}
pthread_t allThread[MUTLI_THREAD_COUNT];
for (i = 0; i < MUTLI_THREAD_COUNT; ++i)
{
pthread_create(&allThread[i], NULL, write_log_thread, ¶m[i % MUTLI_PRODUCER_COUNT]);
}
for (i = 0; i < MUTLI_THREAD_COUNT; ++i)
{
pthread_join(allThread[i], NULL);
}
aos_error_log("All thread done");
for (i = 0; i < MUTLI_PRODUCER_COUNT; ++i) {
destroy_log_producer(producers[i]);
}
log_producer_env_destroy();
}
void log_producer_create_destroy()
{
int count = 0;
while (1)
{
if (log_producer_env_init(LOG_GLOBAL_ALL) != LOG_PRODUCER_OK) {
exit(1);
}
log_producer * producer = create_log_producer_wrapper(on_log_send_done);
if (producer == NULL)
{
printf("create log producer by config file fail \n");
exit(1);
}
log_producer_client * client = get_log_producer_client(producer, NULL);
if (client == NULL)
{
printf("create log producer client by config file fail \n");
exit(1);
}
destroy_log_producer(producer);
log_producer_env_destroy();
printf("%d \n", count++);
}
}
void log_producer_post_logs(int logsPerSecond, int sendSec)
{
//aos_log_level = AOS_LOG_DEBUG;
if (log_producer_env_init(LOG_GLOBAL_ALL) != LOG_PRODUCER_OK) {
exit(1);
}
log_producer * producer = create_log_producer_wrapper(on_log_send_done);
if (producer == NULL)
{
printf("create log producer by config file fail \n");
exit(1);
}
log_producer_client * client = get_log_producer_client(producer, NULL);
if (client == NULL)
{
printf("create log producer client by config file fail \n");
exit(1);
}
int32_t i = 0;
int32_t totalTime = 0;
for (i = 0; i < sendSec; ++i)
{
int64_t startTime = GET_TIME_US();
int j = 0;
for (; j < logsPerSecond; ++j)
{
char indexStr[32];
sprintf(indexStr, "%d", i * logsPerSecond + j);
log_producer_client_add_log(client, 20, "content_key_1", "1abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+",
"content_key_2", "2abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+",
"content_key_3", "3abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+",
"content_key_4", "4abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+",
"content_key_5", "5abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+",
"content_key_6", "6abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+",
"content_key_7", "7abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+",
"content_key_8", "8abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+",
"content_key_9", "9abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+",
"index", indexStr);
log_producer_result rst = log_producer_client_add_log(client, 8, "LogHub", "Real-time log collection and consumption",
"Search/Analytics", "Query and real-time analysis",
"Visualized", "dashboard and report functions",
"Interconnection", "Grafana and JDBC/SQL92");
if (rst != LOG_PRODUCER_OK)
{
printf("add log error %d \n", rst);
}
}
int64_t endTime = GET_TIME_US();
aos_error_log("Done : %d %d time %f us \n", i, logsPerSecond, (float)(endTime - startTime) * 1.0 / logsPerSecond / 2);
totalTime += endTime - startTime;
if (endTime - startTime < 1000000)
{
usleep(1000000 - (endTime - startTime));
}
}
aos_error_log("Total done : %f us, avg %f us", (float)totalTime / 180, (float)totalTime / (180 * logsPerSecond * 2));
//sleep(10);
destroy_log_producer(producer);
log_producer_env_destroy();
}
int main(int argc, char *argv[])
{
aos_log_level = AOS_LOG_DEBUG;
log_producer_multi_thread(999llu);
return 0;
//aos_log_level = AOS_LOG_TRACE;
int logsPerSec = 100;
int sendSec = 180;
if (argc == 3)
{
logsPerSec = atoi(argv[1]);
sendSec = atoi(argv[2]);
}
//log_producer_create_destroy();
//log_producer_multi_thread(logsPerSec / 10);
log_producer_post_logs(logsPerSec, sendSec);
//builder_speed_test(logsPerSec);
return 0;
}