Sources/aliyun-log-c-sdk/log_producer_client.c (333 lines of code) (raw):

// // Created by ZhangCheng on 20/11/2017. // #include "log_producer_client.h" #include "log_producer_manager.h" #include "inner_log.h" #include "log_api.h" #include <stdarg.h> #include <string.h> #include "log_persistent_manager.h" static uint32_t s_init_flag = 0; static log_producer_result s_last_result = 0; unsigned int LOG_GET_TIME(); typedef struct _producer_client_private { log_producer_manager * producer_manager; log_producer_config * producer_config; log_persistent_manager * persistent_manager; }producer_client_private ; struct _log_producer { log_producer_client * root_client; }; log_producer_result log_producer_env_init() { // if already init, just return s_last_result if (s_init_flag == 1) { return s_last_result; } s_init_flag = 1; if (0 != sls_log_init()) { s_last_result = LOG_PRODUCER_INVALID; } else { s_last_result = LOG_PRODUCER_OK; } return s_last_result; } void log_producer_env_destroy() { if (s_init_flag == 0) { return; } s_init_flag = 0; sls_log_destroy(); } log_producer * create_log_producer(log_producer_config * config, on_log_producer_send_done_function send_done_function, void *user_param) { if (!log_producer_config_is_valid(config)) { return NULL; } log_producer * producer = (log_producer *)malloc(sizeof(log_producer)); log_producer_client * producer_client = (log_producer_client *)malloc(sizeof(log_producer_client)); producer_client_private * client_private = (producer_client_private *)malloc(sizeof(producer_client_private)); producer_client->private_data = client_private; client_private->producer_config = config; client_private->producer_manager = create_log_producer_manager(config); client_private->producer_manager->send_done_function = send_done_function; client_private->producer_manager->user_param = user_param; client_private->persistent_manager = create_log_persistent_manager(config); if (client_private->persistent_manager != NULL) { client_private->producer_manager->uuid_user_param = client_private->persistent_manager; client_private->producer_manager->uuid_send_done_function = on_log_persistent_manager_send_done_uuid; int recoverRst = log_persistent_manager_recover(client_private->persistent_manager, client_private->producer_manager); if (recoverRst != 0) { aos_error_log("project %s, logstore %s, recover log persistent manager failed, result %d", config->project, config->logstore, recoverRst); } else { aos_info_log("project %s, logstore %s, recover log persistent manager success", config->project, config->logstore); } } aos_debug_log("create producer client success, config : %s", config->logstore); producer_client->valid_flag = 1; producer->root_client = producer_client; return producer; } void destroy_log_producer(log_producer * producer) { if (producer == NULL) { return; } log_producer_client * client = producer->root_client; client->valid_flag = 0; producer_client_private * client_private = (producer_client_private *)client->private_data; destroy_log_producer_manager(client_private->producer_manager); destroy_log_producer_config(client_private->producer_config); destroy_log_persistent_manager(client_private->persistent_manager); free(client_private); free(client); free(producer); } extern log_producer_client * get_log_producer_client(log_producer * producer, const char * config_name) { if (producer == NULL) { return NULL; } return producer->root_client; } void log_producer_client_network_recover(log_producer_client * client) { if (client == NULL) { return; } log_producer_manager * manager = ((producer_client_private *)client->private_data)->producer_manager; manager->networkRecover = 1; } log_producer_result log_producer_client_add_log(log_producer_client * client, int32_t kv_count, ...) { if (client == NULL || !client->valid_flag) { return LOG_PRODUCER_INVALID; } va_list argp; va_start(argp, kv_count); int32_t pairs = kv_count / 2; char ** keys = (char **)malloc(pairs * sizeof(char *)); char ** values = (char **)malloc(pairs * sizeof(char *)); size_t * key_lens = (size_t *)malloc(pairs * sizeof(size_t)); size_t * val_lens = (size_t *)malloc(pairs * sizeof(size_t)); int32_t i = 0; for (; i < pairs; ++i) { const char * key = va_arg(argp, const char *); const char * value = va_arg(argp, const char *); keys[i] = (char *)key; values[i] = (char *)value; key_lens[i] = strlen(key); val_lens[i] = strlen(value); } log_producer_result rst = log_producer_client_add_log_with_len(client, pairs, keys, key_lens, values, val_lens, 0); free(keys); free(values); free(key_lens); free(val_lens); return rst; } log_producer_result log_producer_client_add_log_with_len(log_producer_client * client, int32_t pair_count, char ** keys, size_t * key_lens, char ** values, size_t * val_lens, int flush) { if (client == NULL || !client->valid_flag) { return LOG_PRODUCER_INVALID; } log_producer_manager * manager = ((producer_client_private *)client->private_data)->producer_manager; log_persistent_manager * persistent_manager = ((producer_client_private *)client->private_data)->persistent_manager; if (persistent_manager != NULL && persistent_manager->is_invalid == 0) { CS_ENTER(persistent_manager->lock); add_log_full(persistent_manager->builder, LOG_GET_TIME(), pair_count, keys, key_lens, values, val_lens); char * logBuf = persistent_manager->builder->grp->logs.buffer; size_t logSize = persistent_manager->builder->grp->logs.now_buffer_len; clear_log_tag(&(persistent_manager->builder->grp->logs)); if (!log_persistent_manager_is_buffer_enough(persistent_manager, logSize) || manager->totalBufferSize > manager->producer_config->maxBufferBytes) { CS_LEAVE(persistent_manager->lock); return LOG_PRODUCER_DROP_ERROR; } int rst = log_persistent_manager_save_log(persistent_manager, logBuf, logSize); if (rst != LOG_PRODUCER_OK) { CS_LEAVE(persistent_manager->lock); return LOG_PRODUCER_DROP_ERROR; } rst = log_producer_manager_add_log_raw(manager, logBuf, logSize, flush, persistent_manager->checkpoint.now_log_uuid - 1); CS_LEAVE(persistent_manager->lock); return rst; } return log_producer_manager_add_log(manager, pair_count, keys, key_lens, values, val_lens, flush, -1); } log_producer_result log_producer_client_add_raw_log_buffer(log_producer_client * client, size_t log_bytes, size_t compressed_bytes, const unsigned char * raw_buffer) { if (client == NULL || !client->valid_flag || raw_buffer == NULL) { return LOG_PRODUCER_INVALID; } log_producer_manager * manager = ((producer_client_private *)client->private_data)->producer_manager; return log_producer_manager_send_raw_buffer(manager, log_bytes, compressed_bytes, raw_buffer); } log_producer_result log_producer_client_add_log_raw(log_producer_client *client, char *logBuf, size_t logSize, int flush) { if (client == NULL || !client->valid_flag) { return LOG_PRODUCER_INVALID; } log_producer_manager * manager = ((producer_client_private *)client->private_data)->producer_manager; log_persistent_manager * persistent_manager = ((producer_client_private *)client->private_data)->persistent_manager; if (persistent_manager != NULL && persistent_manager->is_invalid == 0) { CS_ENTER(persistent_manager->lock); if (!log_persistent_manager_is_buffer_enough(persistent_manager, logSize) || manager->totalBufferSize > manager->producer_config->maxBufferBytes) { CS_LEAVE(persistent_manager->lock); return LOG_PRODUCER_DROP_ERROR; } int rst = log_persistent_manager_save_log(persistent_manager, logBuf, logSize); if (rst != LOG_PRODUCER_OK) { CS_LEAVE(persistent_manager->lock); return LOG_PRODUCER_DROP_ERROR; } rst = log_producer_manager_add_log_raw(manager, logBuf, logSize, flush, persistent_manager->checkpoint.now_log_uuid - 1); CS_LEAVE(persistent_manager->lock); return rst; } int rst = log_producer_manager_add_log_raw(manager, logBuf, logSize, flush, -1); return rst; } log_producer_result log_producer_client_add_log_with_array(log_producer_client *client, uint32_t logTime, size_t logItemCount, const char *logItemsBuf, const uint32_t *logItemsSize, int flush) { if (client == NULL || !client->valid_flag) { return LOG_PRODUCER_INVALID; } log_producer_manager * manager = ((producer_client_private *)client->private_data)->producer_manager; log_persistent_manager * persistent_manager = ((producer_client_private *)client->private_data)->persistent_manager; if (persistent_manager != NULL && persistent_manager->is_invalid == 0) { CS_ENTER(persistent_manager->lock); add_log_full_v2(persistent_manager->builder, logTime, logItemCount, logItemsBuf, logItemsSize); char * logBuf = persistent_manager->builder->grp->logs.buffer; size_t logSize = persistent_manager->builder->grp->logs.now_buffer_len; clear_log_tag(&(persistent_manager->builder->grp->logs)); if (!log_persistent_manager_is_buffer_enough(persistent_manager, logSize) || manager->totalBufferSize > manager->producer_config->maxBufferBytes) { CS_LEAVE(persistent_manager->lock); return LOG_PRODUCER_DROP_ERROR; } int rst = log_persistent_manager_save_log(persistent_manager, logBuf, logSize); if (rst != LOG_PRODUCER_OK) { CS_LEAVE(persistent_manager->lock); return LOG_PRODUCER_DROP_ERROR; } rst = log_producer_manager_add_log_raw(manager, logBuf, logSize, flush, persistent_manager->checkpoint.now_log_uuid - 1); CS_LEAVE(persistent_manager->lock); return rst; } int rst = log_producer_manager_add_log_with_array(manager, logTime, logItemCount, logItemsBuf, logItemsSize, flush, -1); return rst; } log_producer_result log_producer_client_add_log_with_len_int32(log_producer_client *client, int32_t pair_count, char **keys, int32_t *key_lens, char **values, int32_t *value_lens, int flush) { if (client == NULL || !client->valid_flag) { return LOG_PRODUCER_INVALID; } log_producer_manager * manager = ((producer_client_private *)client->private_data)->producer_manager; log_persistent_manager * persistent_manager = ((producer_client_private *)client->private_data)->persistent_manager; if (persistent_manager != NULL && persistent_manager->is_invalid == 0) { CS_ENTER(persistent_manager->lock); add_log_full_int32(persistent_manager->builder, LOG_GET_TIME(), pair_count, keys, key_lens, values, value_lens); char * logBuf = persistent_manager->builder->grp->logs.buffer; size_t logSize = persistent_manager->builder->grp->logs.now_buffer_len; clear_log_tag(&(persistent_manager->builder->grp->logs)); if (!log_persistent_manager_is_buffer_enough(persistent_manager, logSize) || manager->totalBufferSize > manager->producer_config->maxBufferBytes) { CS_LEAVE(persistent_manager->lock); return LOG_PRODUCER_DROP_ERROR; } int rst = log_persistent_manager_save_log(persistent_manager, logBuf, logSize); if (rst != LOG_PRODUCER_OK) { CS_LEAVE(persistent_manager->lock); return LOG_PRODUCER_DROP_ERROR; } rst = log_producer_manager_add_log_raw(manager, logBuf, logSize, flush, persistent_manager->checkpoint.now_log_uuid - 1); CS_LEAVE(persistent_manager->lock); return rst; } return log_producer_manager_add_log_int32(manager, pair_count, keys, key_lens, values, value_lens, flush, -1); } log_producer_result log_producer_client_add_log_with_len_time_int32(log_producer_client *client, uint32_t time_sec, int32_t pair_count, char **keys, int32_t *key_lens, char **values, int32_t *value_lens, int flush) { if (client == NULL || !client->valid_flag) { return LOG_PRODUCER_INVALID; } log_producer_manager * manager = ((producer_client_private *)client->private_data)->producer_manager; log_persistent_manager * persistent_manager = ((producer_client_private *)client->private_data)->persistent_manager; if (persistent_manager != NULL && persistent_manager->is_invalid == 0) { CS_ENTER(persistent_manager->lock); add_log_full_int32(persistent_manager->builder, time_sec, pair_count, keys, key_lens, values, value_lens); char * logBuf = persistent_manager->builder->grp->logs.buffer; size_t logSize = persistent_manager->builder->grp->logs.now_buffer_len; clear_log_tag(&(persistent_manager->builder->grp->logs)); if (!log_persistent_manager_is_buffer_enough(persistent_manager, logSize) || manager->totalBufferSize > manager->producer_config->maxBufferBytes) { CS_LEAVE(persistent_manager->lock); return LOG_PRODUCER_DROP_ERROR; } int rst = log_persistent_manager_save_log(persistent_manager, logBuf, logSize); if (rst != LOG_PRODUCER_OK) { CS_LEAVE(persistent_manager->lock); return LOG_PRODUCER_DROP_ERROR; } rst = log_producer_manager_add_log_raw(manager, logBuf, logSize, flush, persistent_manager->checkpoint.now_log_uuid - 1); CS_LEAVE(persistent_manager->lock); return rst; } return log_producer_manager_add_log_int32(manager, pair_count, keys, key_lens, values, value_lens, flush, -1); }