Sources/aliyun-log-c-sdk/log_producer_config.c (495 lines of code) (raw):
//
// Created by ZhangCheng on 20/11/2017.
//
#include "log_producer_config.h"
#include "log_sds.h"
#include <string.h>
#include <stdlib.h>
#include "inner_log.h"
static void _set_default_producer_config(log_producer_config * pConfig)
{
pConfig->logBytesPerPackage = 3 * 1024 * 1024;
pConfig->logCountPerPackage = 2048;
pConfig->packageTimeoutInMS = 3000;
pConfig->maxBufferBytes = 64 * 1024 * 1024;
pConfig->flushIntervalInMS = LOG_PRODUCER_FLUSH_INTERVAL_MS;
pConfig->logQueuePopIntervalInMS = LOG_PRODUCER_QUEUE_POP_INTERVAL_MS;
pConfig->connectTimeoutSec = 10;
pConfig->sendTimeoutSec = 15;
pConfig->destroySenderWaitTimeoutSec = 1;
pConfig->destroyFlusherWaitTimeoutSec = 1;
pConfig->compressType = 1;
pConfig->ntpTimeOffset = 0;
pConfig->using_https = 0;
pConfig->maxLogDelayTime = 7*24*3600;
pConfig->dropDelayLog = 1;
pConfig->callbackFromSenderThread = 1;
pConfig->webTracking = 0;
pConfig->mode = 0;
pConfig->user_params = NULL;
}
static void _copy_config_string(const char * value, log_sds * src_value)
{
if (src_value == NULL)
{
return;
}
if (value == NULL)
{
*src_value = NULL;
return;
}
size_t strLen = strlen(value);
if (*src_value == NULL)
{
*src_value = log_sdsnewEmpty(strLen);
}
*src_value = log_sdscpylen(*src_value, value, strLen);
}
log_producer_config * create_log_producer_config()
{
log_producer_config* pConfig = (log_producer_config*)malloc(sizeof(log_producer_config));
memset(pConfig, 0, sizeof(log_producer_config));
_set_default_producer_config(pConfig);
return pConfig;
}
void destroy_log_producer_config(log_producer_config * pConfig)
{
if (pConfig->project != NULL)
{
log_sdsfree(pConfig->project);
}
if (pConfig->logstore != NULL)
{
log_sdsfree(pConfig->logstore);
}
if (pConfig->endpoint != NULL)
{
log_sdsfree(pConfig->endpoint);
}
if (pConfig->accessKey != NULL)
{
log_sdsfree(pConfig->accessKey);
}
if (pConfig->accessKeyId != NULL)
{
log_sdsfree(pConfig->accessKeyId);
}
if (pConfig->topic != NULL)
{
log_sdsfree(pConfig->topic);
}
if (pConfig->source != NULL)
{
log_sdsfree(pConfig->source);
}
if (pConfig->netInterface != NULL)
{
log_sdsfree(pConfig->netInterface);
}
if (pConfig->securityToken != NULL)
{
log_sdsfree(pConfig->securityToken);
}
if (pConfig->securityTokenLock != NULL)
{
ReleaseCriticalSection(pConfig->securityTokenLock);
}
if (pConfig->tagCount > 0 && pConfig->tags != NULL)
{
int i = 0;
for (; i < pConfig->tagCount; ++i)
{
log_sdsfree(pConfig->tags[i].key);
log_sdsfree(pConfig->tags[i].value);
}
free(pConfig->tags);
}
if (pConfig->persistentFilePath != NULL)
{
log_sdsfree(pConfig->persistentFilePath);
}
if (pConfig->mode == 1 && NULL != pConfig->shardKey) {
log_sdsfree(pConfig->shardKey);
}
free(pConfig);
}
#ifdef LOG_PRODUCER_DEBUG
void log_producer_config_print(log_producer_config * pConfig, FILE * file)
{
fprintf(file, "endpoint : %s\n", pConfig->endpoint);
fprintf(file,"project : %s\n", pConfig->project);
fprintf(file,"logstore : %s\n", pConfig->logstore);
fprintf(file,"accessKeyId : %s\n", pConfig->accessKeyId);
fprintf(file,"accessKey : %s\n", pConfig->accessKey);
fprintf(file,"configName : %s\n", pConfig->configName);
fprintf(file,"topic : %s\n", pConfig->topic);
fprintf(file,"logLevel : %d\n", pConfig->logLevel);
fprintf(file,"packageTimeoutInMS : %d\n", pConfig->packageTimeoutInMS);
fprintf(file, "logCountPerPackage : %d\n", pConfig->logCountPerPackage);
fprintf(file, "logBytesPerPackage : %d\n", pConfig->logBytesPerPackage);
fprintf(file, "maxBufferBytes : %d\n", pConfig->maxBufferBytes);
fprintf(file, "tags: \n");
int32_t i = 0;
for (i = 0; i < pConfig->tagCount; ++i)
{
fprintf(file, "tag key : %s, value : %s \n", pConfig->tags[i].key, pConfig->tags[i].value);
}
}
#endif
void log_producer_config_set_packet_timeout(log_producer_config * config, int32_t time_out_ms)
{
if (config == NULL || time_out_ms < 0)
{
return;
}
config->packageTimeoutInMS = time_out_ms;
}
void log_producer_config_set_packet_log_count(log_producer_config * config, int32_t log_count)
{
if (config == NULL || log_count < 0)
{
return;
}
config->logCountPerPackage = log_count;
}
void log_producer_config_set_packet_log_bytes(log_producer_config * config, int32_t log_bytes)
{
if (config == NULL || log_bytes < 0)
{
return;
}
config->logBytesPerPackage = log_bytes;
}
void log_producer_config_set_max_buffer_limit(log_producer_config * config, int64_t max_buffer_bytes)
{
if (config == NULL || max_buffer_bytes < 0)
{
return;
}
config->maxBufferBytes = max_buffer_bytes;
}
void log_producer_config_set_flush_interval(log_producer_config * config, int32_t flush_interval_in_ms)
{
if (NULL == config || flush_interval_in_ms < 500)
{
return;
}
config->flushIntervalInMS = flush_interval_in_ms;
}
void log_producer_config_set_log_queue_interval(log_producer_config * config, int32_t log_queue_in_ms)
{
if (NULL == config || log_queue_in_ms < 500)
{
return;
}
config->logQueuePopIntervalInMS = log_queue_in_ms;
}
void log_producer_config_set_send_thread_count(log_producer_config * config, int32_t thread_count)
{
if (config == NULL || thread_count < 0)
{
return;
}
config->sendThreadCount = thread_count;
}
void log_producer_config_set_net_interface(log_producer_config * config, const char * net_interface)
{
if (config == NULL || net_interface == NULL)
{
return;
}
_copy_config_string(net_interface, &config->netInterface);
}
void log_producer_config_set_connect_timeout_sec(log_producer_config * config, int32_t connect_timeout_sec)
{
if (config == NULL || connect_timeout_sec <= 0)
{
return;
}
config->connectTimeoutSec = connect_timeout_sec;
}
void log_producer_config_set_send_timeout_sec(log_producer_config * config, int32_t send_timeout_sec)
{
if (config == NULL || send_timeout_sec <= 0)
{
return;
}
config->sendTimeoutSec = send_timeout_sec;
}
void log_producer_config_set_destroy_flusher_wait_sec(log_producer_config * config, int32_t destroy_flusher_wait_sec)
{
if (config == NULL || destroy_flusher_wait_sec <= 0)
{
return;
}
config->destroyFlusherWaitTimeoutSec = destroy_flusher_wait_sec;
}
void log_producer_config_set_destroy_sender_wait_sec(log_producer_config * config, int32_t destroy_sender_wait_sec)
{
if (config == NULL || destroy_sender_wait_sec <= 0)
{
return;
}
config->destroySenderWaitTimeoutSec = destroy_sender_wait_sec;
}
void log_producer_config_set_compress_type(log_producer_config * config, int32_t compress_type)
{
if (config == NULL || compress_type < 0 || compress_type > 1)
{
return;
}
config->compressType = compress_type;
}
void log_producer_config_set_ntp_time_offset(log_producer_config * config, int32_t ntp_time_offset)
{
if (config == NULL)
{
return;
}
config->ntpTimeOffset = ntp_time_offset;
}
void log_producer_config_add_tag(log_producer_config * pConfig, const char * key, const char * value)
{
if(key == NULL || value == NULL)
{
return;
}
++pConfig->tagCount;
if (pConfig->tags == NULL || pConfig->tagCount > pConfig->tagAllocSize)
{
if(pConfig->tagAllocSize == 0)
{
pConfig->tagAllocSize = 4;
}
else
{
pConfig->tagAllocSize *= 2;
}
log_producer_config_tag * tagArray = (log_producer_config_tag *)malloc(sizeof(log_producer_config_tag) * pConfig->tagAllocSize);
if (pConfig->tags != NULL)
{
memcpy(tagArray, pConfig->tags, sizeof(log_producer_config_tag) * (pConfig->tagCount - 1));
free(pConfig->tags);
}
pConfig->tags = tagArray;
}
int32_t tagIndex = pConfig->tagCount - 1;
pConfig->tags[tagIndex].key = log_sdsnew(key);
pConfig->tags[tagIndex].value = log_sdsnew(value);
}
void log_producer_config_set_endpoint(log_producer_config * config, const char * endpoint)
{
if (!endpoint) {
_copy_config_string(NULL, &config->endpoint);
return;
}
if (strlen(endpoint) < 8) {
return;
}
if (strncmp(endpoint, "http://", 7) == 0)
{
endpoint += 7;
}
else if (strncmp(endpoint, "https://", 8) == 0)
{
config->using_https = 1;
endpoint += 8;
}
_copy_config_string(endpoint, &config->endpoint);
}
void log_producer_config_set_project(log_producer_config * config, const char * project)
{
_copy_config_string(project, &config->project);
}
void log_producer_config_set_logstore(log_producer_config * config, const char * logstore)
{
_copy_config_string(logstore, &config->logstore);
}
void log_producer_config_set_access_id(log_producer_config * config, const char * access_id)
{
_copy_config_string(access_id, &config->accessKeyId);
}
void log_producer_config_set_access_key(log_producer_config * config, const char * access_key)
{
_copy_config_string(access_key, &config->accessKey);
}
void log_producer_config_reset_security_token(log_producer_config * config, const char * access_id, const char * access_secret, const char * security_token)
{
if (config->securityTokenLock == NULL)
{
config->securityTokenLock = CreateCriticalSection();
}
CS_ENTER(config->securityTokenLock);
_copy_config_string(access_id, &config->accessKeyId);
_copy_config_string(access_secret, &config->accessKey);
_copy_config_string(security_token, &config->securityToken);
CS_LEAVE(config->securityTokenLock);
}
void log_producer_config_get_security(log_producer_config * config, char ** access_id, char ** access_secret, char ** security_token)
{
if (config->securityTokenLock == NULL)
{
_copy_config_string(config->accessKeyId, access_id);
_copy_config_string(config->accessKey, access_secret);
}
else
{
CS_ENTER(config->securityTokenLock);
_copy_config_string(config->accessKeyId, access_id);
_copy_config_string(config->accessKey, access_secret);
_copy_config_string(config->securityToken, security_token);
CS_LEAVE(config->securityTokenLock);
}
}
void log_producer_config_set_topic(log_producer_config * config, const char * topic)
{
_copy_config_string(topic, &config->topic);
}
void log_producer_config_set_source(log_producer_config * config, const char * source)
{
_copy_config_string(source, &config->source);
}
int log_producer_config_is_valid(log_producer_config * config)
{
if (config == NULL)
{
aos_error_log("invalid producer config");
return 0;
}
if (config->endpoint == NULL || config->project == NULL || config->logstore == NULL)
{
aos_error_log("invalid producer config destination params");
// return 0;
}
if (config->accessKey == NULL || config->accessKeyId == NULL)
{
aos_error_log("invalid producer config authority params");
// return 0;
}
if (config->packageTimeoutInMS < 0 || config->maxBufferBytes < 0 || config->logCountPerPackage < 0 || config->logBytesPerPackage < 0)
{
aos_error_log("invalid producer config log merge and buffer params");
return 0;
}
if (config->usePersistent)
{
if (config->persistentFilePath == NULL || config->maxPersistentFileCount <= 0 || config->maxPersistentLogCount <= 0 || config->maxPersistentFileSize <=0 )
{
aos_error_log("invalid producer persistent config params");
return 0;
}
}
return 1;
}
void log_producer_config_set_using_http(log_producer_config * config, int32_t using_https)
{
if (config == NULL || using_https < 0)
{
return;
}
config->using_https = using_https;
}
int log_producer_persistent_config_is_enabled(log_producer_config *config)
{
if (config == NULL)
{
aos_error_log("invalid producer config");
return 0;
}
if (config->usePersistent == 0)
{
return 0;
}
return 1;
}
void log_producer_config_set_persistent(log_producer_config *config,
int32_t persistent)
{
if (config == NULL)
return;
config->usePersistent = persistent;
}
void log_producer_config_set_persistent_file_path(log_producer_config *config,
const char *file_path)
{
if (config == NULL)
return;
_copy_config_string(file_path, &config->persistentFilePath);
}
void
log_producer_config_set_persistent_max_file_count(log_producer_config *config,
int32_t file_count)
{
if (config == NULL)
return;
config->maxPersistentFileCount = file_count;
}
void
log_producer_config_set_persistent_max_file_size(log_producer_config *config,
int32_t file_size)
{
if (config == NULL)
return;
config->maxPersistentFileSize = file_size;
}
void log_producer_config_set_persistent_force_flush(log_producer_config *config,
int32_t force)
{
if (config == NULL)
return;
config->forceFlushDisk = force;
}
void log_producer_config_set_persistent_max_log_count(log_producer_config *config,
int32_t max_log_count)
{
if (config == NULL)
return;
config->maxPersistentLogCount = max_log_count;
}
void log_producer_config_set_max_log_delay_time(log_producer_config *config,
int32_t max_log_delay_time)
{
if (config == NULL)
return;
config->maxLogDelayTime = max_log_delay_time;
}
void log_producer_config_set_drop_delay_log(log_producer_config *config,
int32_t drop_or_rewrite)
{
if (config == NULL)
return;
config->dropDelayLog = drop_or_rewrite;
}
void log_producer_config_set_drop_unauthorized_log(log_producer_config *config,
int32_t drop_or_not)
{
if (config == NULL)
return;
config->dropUnauthorizedLog = drop_or_not;
}
void log_producer_config_set_callback_from_sender_thread(log_producer_config * config,
int32_t callback_from_sender_thread)
{
if (NULL == config) {
return;
}
config->callbackFromSenderThread = callback_from_sender_thread;
}
LOG_EXPORT void log_producer_config_set_use_webtracking(log_producer_config * config, int32_t webtracking)
{
if (NULL == config)
{
return;
}
config->webTracking = webtracking;
}
void log_producer_config_set_mode(log_producer_config *config, int32_t mode) {
if (NULL == config)
{
return;
}
config->mode = mode;
}
void log_producer_config_set_shardkey(log_producer_config *config, const char *shardKey) {
if (NULL == config)
{
return;
}
_copy_config_string(shardKey, &config->shardKey);
}