Sources/aliyun-log-c-sdk/log_producer_config.c (495 lines of code) (raw):

// // Created by ZhangCheng on 20/11/2017. // #include "log_producer_config.h" #include "log_sds.h" #include <string.h> #include <stdlib.h> #include "inner_log.h" static void _set_default_producer_config(log_producer_config * pConfig) { pConfig->logBytesPerPackage = 3 * 1024 * 1024; pConfig->logCountPerPackage = 2048; pConfig->packageTimeoutInMS = 3000; pConfig->maxBufferBytes = 64 * 1024 * 1024; pConfig->flushIntervalInMS = LOG_PRODUCER_FLUSH_INTERVAL_MS; pConfig->logQueuePopIntervalInMS = LOG_PRODUCER_QUEUE_POP_INTERVAL_MS; pConfig->connectTimeoutSec = 10; pConfig->sendTimeoutSec = 15; pConfig->destroySenderWaitTimeoutSec = 1; pConfig->destroyFlusherWaitTimeoutSec = 1; pConfig->compressType = 1; pConfig->ntpTimeOffset = 0; pConfig->using_https = 0; pConfig->maxLogDelayTime = 7*24*3600; pConfig->dropDelayLog = 1; pConfig->callbackFromSenderThread = 1; pConfig->webTracking = 0; pConfig->mode = 0; pConfig->user_params = NULL; } static void _copy_config_string(const char * value, log_sds * src_value) { if (src_value == NULL) { return; } if (value == NULL) { *src_value = NULL; return; } size_t strLen = strlen(value); if (*src_value == NULL) { *src_value = log_sdsnewEmpty(strLen); } *src_value = log_sdscpylen(*src_value, value, strLen); } log_producer_config * create_log_producer_config() { log_producer_config* pConfig = (log_producer_config*)malloc(sizeof(log_producer_config)); memset(pConfig, 0, sizeof(log_producer_config)); _set_default_producer_config(pConfig); return pConfig; } void destroy_log_producer_config(log_producer_config * pConfig) { if (pConfig->project != NULL) { log_sdsfree(pConfig->project); } if (pConfig->logstore != NULL) { log_sdsfree(pConfig->logstore); } if (pConfig->endpoint != NULL) { log_sdsfree(pConfig->endpoint); } if (pConfig->accessKey != NULL) { log_sdsfree(pConfig->accessKey); } if (pConfig->accessKeyId != NULL) { log_sdsfree(pConfig->accessKeyId); } if (pConfig->topic != NULL) { log_sdsfree(pConfig->topic); } if (pConfig->source != NULL) { log_sdsfree(pConfig->source); } if (pConfig->netInterface != NULL) { log_sdsfree(pConfig->netInterface); } if (pConfig->securityToken != NULL) { log_sdsfree(pConfig->securityToken); } if (pConfig->securityTokenLock != NULL) { ReleaseCriticalSection(pConfig->securityTokenLock); } if (pConfig->tagCount > 0 && pConfig->tags != NULL) { int i = 0; for (; i < pConfig->tagCount; ++i) { log_sdsfree(pConfig->tags[i].key); log_sdsfree(pConfig->tags[i].value); } free(pConfig->tags); } if (pConfig->persistentFilePath != NULL) { log_sdsfree(pConfig->persistentFilePath); } if (pConfig->mode == 1 && NULL != pConfig->shardKey) { log_sdsfree(pConfig->shardKey); } free(pConfig); } #ifdef LOG_PRODUCER_DEBUG void log_producer_config_print(log_producer_config * pConfig, FILE * file) { fprintf(file, "endpoint : %s\n", pConfig->endpoint); fprintf(file,"project : %s\n", pConfig->project); fprintf(file,"logstore : %s\n", pConfig->logstore); fprintf(file,"accessKeyId : %s\n", pConfig->accessKeyId); fprintf(file,"accessKey : %s\n", pConfig->accessKey); fprintf(file,"configName : %s\n", pConfig->configName); fprintf(file,"topic : %s\n", pConfig->topic); fprintf(file,"logLevel : %d\n", pConfig->logLevel); fprintf(file,"packageTimeoutInMS : %d\n", pConfig->packageTimeoutInMS); fprintf(file, "logCountPerPackage : %d\n", pConfig->logCountPerPackage); fprintf(file, "logBytesPerPackage : %d\n", pConfig->logBytesPerPackage); fprintf(file, "maxBufferBytes : %d\n", pConfig->maxBufferBytes); fprintf(file, "tags: \n"); int32_t i = 0; for (i = 0; i < pConfig->tagCount; ++i) { fprintf(file, "tag key : %s, value : %s \n", pConfig->tags[i].key, pConfig->tags[i].value); } } #endif void log_producer_config_set_packet_timeout(log_producer_config * config, int32_t time_out_ms) { if (config == NULL || time_out_ms < 0) { return; } config->packageTimeoutInMS = time_out_ms; } void log_producer_config_set_packet_log_count(log_producer_config * config, int32_t log_count) { if (config == NULL || log_count < 0) { return; } config->logCountPerPackage = log_count; } void log_producer_config_set_packet_log_bytes(log_producer_config * config, int32_t log_bytes) { if (config == NULL || log_bytes < 0) { return; } config->logBytesPerPackage = log_bytes; } void log_producer_config_set_max_buffer_limit(log_producer_config * config, int64_t max_buffer_bytes) { if (config == NULL || max_buffer_bytes < 0) { return; } config->maxBufferBytes = max_buffer_bytes; } void log_producer_config_set_flush_interval(log_producer_config * config, int32_t flush_interval_in_ms) { if (NULL == config || flush_interval_in_ms < 500) { return; } config->flushIntervalInMS = flush_interval_in_ms; } void log_producer_config_set_log_queue_interval(log_producer_config * config, int32_t log_queue_in_ms) { if (NULL == config || log_queue_in_ms < 500) { return; } config->logQueuePopIntervalInMS = log_queue_in_ms; } void log_producer_config_set_send_thread_count(log_producer_config * config, int32_t thread_count) { if (config == NULL || thread_count < 0) { return; } config->sendThreadCount = thread_count; } void log_producer_config_set_net_interface(log_producer_config * config, const char * net_interface) { if (config == NULL || net_interface == NULL) { return; } _copy_config_string(net_interface, &config->netInterface); } void log_producer_config_set_connect_timeout_sec(log_producer_config * config, int32_t connect_timeout_sec) { if (config == NULL || connect_timeout_sec <= 0) { return; } config->connectTimeoutSec = connect_timeout_sec; } void log_producer_config_set_send_timeout_sec(log_producer_config * config, int32_t send_timeout_sec) { if (config == NULL || send_timeout_sec <= 0) { return; } config->sendTimeoutSec = send_timeout_sec; } void log_producer_config_set_destroy_flusher_wait_sec(log_producer_config * config, int32_t destroy_flusher_wait_sec) { if (config == NULL || destroy_flusher_wait_sec <= 0) { return; } config->destroyFlusherWaitTimeoutSec = destroy_flusher_wait_sec; } void log_producer_config_set_destroy_sender_wait_sec(log_producer_config * config, int32_t destroy_sender_wait_sec) { if (config == NULL || destroy_sender_wait_sec <= 0) { return; } config->destroySenderWaitTimeoutSec = destroy_sender_wait_sec; } void log_producer_config_set_compress_type(log_producer_config * config, int32_t compress_type) { if (config == NULL || compress_type < 0 || compress_type > 1) { return; } config->compressType = compress_type; } void log_producer_config_set_ntp_time_offset(log_producer_config * config, int32_t ntp_time_offset) { if (config == NULL) { return; } config->ntpTimeOffset = ntp_time_offset; } void log_producer_config_add_tag(log_producer_config * pConfig, const char * key, const char * value) { if(key == NULL || value == NULL) { return; } ++pConfig->tagCount; if (pConfig->tags == NULL || pConfig->tagCount > pConfig->tagAllocSize) { if(pConfig->tagAllocSize == 0) { pConfig->tagAllocSize = 4; } else { pConfig->tagAllocSize *= 2; } log_producer_config_tag * tagArray = (log_producer_config_tag *)malloc(sizeof(log_producer_config_tag) * pConfig->tagAllocSize); if (pConfig->tags != NULL) { memcpy(tagArray, pConfig->tags, sizeof(log_producer_config_tag) * (pConfig->tagCount - 1)); free(pConfig->tags); } pConfig->tags = tagArray; } int32_t tagIndex = pConfig->tagCount - 1; pConfig->tags[tagIndex].key = log_sdsnew(key); pConfig->tags[tagIndex].value = log_sdsnew(value); } void log_producer_config_set_endpoint(log_producer_config * config, const char * endpoint) { if (!endpoint) { _copy_config_string(NULL, &config->endpoint); return; } if (strlen(endpoint) < 8) { return; } if (strncmp(endpoint, "http://", 7) == 0) { endpoint += 7; } else if (strncmp(endpoint, "https://", 8) == 0) { config->using_https = 1; endpoint += 8; } _copy_config_string(endpoint, &config->endpoint); } void log_producer_config_set_project(log_producer_config * config, const char * project) { _copy_config_string(project, &config->project); } void log_producer_config_set_logstore(log_producer_config * config, const char * logstore) { _copy_config_string(logstore, &config->logstore); } void log_producer_config_set_access_id(log_producer_config * config, const char * access_id) { _copy_config_string(access_id, &config->accessKeyId); } void log_producer_config_set_access_key(log_producer_config * config, const char * access_key) { _copy_config_string(access_key, &config->accessKey); } void log_producer_config_reset_security_token(log_producer_config * config, const char * access_id, const char * access_secret, const char * security_token) { if (config->securityTokenLock == NULL) { config->securityTokenLock = CreateCriticalSection(); } CS_ENTER(config->securityTokenLock); _copy_config_string(access_id, &config->accessKeyId); _copy_config_string(access_secret, &config->accessKey); _copy_config_string(security_token, &config->securityToken); CS_LEAVE(config->securityTokenLock); } void log_producer_config_get_security(log_producer_config * config, char ** access_id, char ** access_secret, char ** security_token) { if (config->securityTokenLock == NULL) { _copy_config_string(config->accessKeyId, access_id); _copy_config_string(config->accessKey, access_secret); } else { CS_ENTER(config->securityTokenLock); _copy_config_string(config->accessKeyId, access_id); _copy_config_string(config->accessKey, access_secret); _copy_config_string(config->securityToken, security_token); CS_LEAVE(config->securityTokenLock); } } void log_producer_config_set_topic(log_producer_config * config, const char * topic) { _copy_config_string(topic, &config->topic); } void log_producer_config_set_source(log_producer_config * config, const char * source) { _copy_config_string(source, &config->source); } int log_producer_config_is_valid(log_producer_config * config) { if (config == NULL) { aos_error_log("invalid producer config"); return 0; } if (config->endpoint == NULL || config->project == NULL || config->logstore == NULL) { aos_error_log("invalid producer config destination params"); // return 0; } if (config->accessKey == NULL || config->accessKeyId == NULL) { aos_error_log("invalid producer config authority params"); // return 0; } if (config->packageTimeoutInMS < 0 || config->maxBufferBytes < 0 || config->logCountPerPackage < 0 || config->logBytesPerPackage < 0) { aos_error_log("invalid producer config log merge and buffer params"); return 0; } if (config->usePersistent) { if (config->persistentFilePath == NULL || config->maxPersistentFileCount <= 0 || config->maxPersistentLogCount <= 0 || config->maxPersistentFileSize <=0 ) { aos_error_log("invalid producer persistent config params"); return 0; } } return 1; } void log_producer_config_set_using_http(log_producer_config * config, int32_t using_https) { if (config == NULL || using_https < 0) { return; } config->using_https = using_https; } int log_producer_persistent_config_is_enabled(log_producer_config *config) { if (config == NULL) { aos_error_log("invalid producer config"); return 0; } if (config->usePersistent == 0) { return 0; } return 1; } void log_producer_config_set_persistent(log_producer_config *config, int32_t persistent) { if (config == NULL) return; config->usePersistent = persistent; } void log_producer_config_set_persistent_file_path(log_producer_config *config, const char *file_path) { if (config == NULL) return; _copy_config_string(file_path, &config->persistentFilePath); } void log_producer_config_set_persistent_max_file_count(log_producer_config *config, int32_t file_count) { if (config == NULL) return; config->maxPersistentFileCount = file_count; } void log_producer_config_set_persistent_max_file_size(log_producer_config *config, int32_t file_size) { if (config == NULL) return; config->maxPersistentFileSize = file_size; } void log_producer_config_set_persistent_force_flush(log_producer_config *config, int32_t force) { if (config == NULL) return; config->forceFlushDisk = force; } void log_producer_config_set_persistent_max_log_count(log_producer_config *config, int32_t max_log_count) { if (config == NULL) return; config->maxPersistentLogCount = max_log_count; } void log_producer_config_set_max_log_delay_time(log_producer_config *config, int32_t max_log_delay_time) { if (config == NULL) return; config->maxLogDelayTime = max_log_delay_time; } void log_producer_config_set_drop_delay_log(log_producer_config *config, int32_t drop_or_rewrite) { if (config == NULL) return; config->dropDelayLog = drop_or_rewrite; } void log_producer_config_set_drop_unauthorized_log(log_producer_config *config, int32_t drop_or_not) { if (config == NULL) return; config->dropUnauthorizedLog = drop_or_not; } void log_producer_config_set_callback_from_sender_thread(log_producer_config * config, int32_t callback_from_sender_thread) { if (NULL == config) { return; } config->callbackFromSenderThread = callback_from_sender_thread; } LOG_EXPORT void log_producer_config_set_use_webtracking(log_producer_config * config, int32_t webtracking) { if (NULL == config) { return; } config->webTracking = webtracking; } void log_producer_config_set_mode(log_producer_config *config, int32_t mode) { if (NULL == config) { return; } config->mode = mode; } void log_producer_config_set_shardkey(log_producer_config *config, const char *shardKey) { if (NULL == config) { return; } _copy_config_string(shardKey, &config->shardKey); }