Sources/aliyun-log-c-sdk/log_producer_sender.c (491 lines of code) (raw):
//
// Created by ZhangCheng on 20/11/2017.
//
#include "log_producer_sender.h"
#include "log_api.h"
#include "log_producer_manager.h"
#include "inner_log.h"
#include "log_lz4.h"
#include "log_sds.h"
#include <stdlib.h>
#include <string.h>
#ifdef WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
unsigned int LOG_GET_TIME();
const char* LOGE_SERVER_BUSY = "ServerBusy";
const char* LOGE_INTERNAL_SERVER_ERROR = "InternalServerError";
const char* LOGE_UNAUTHORIZED = "Unauthorized";
const char* LOGE_WRITE_QUOTA_EXCEED = "WriteQuotaExceed";
const char* LOGE_SHARD_WRITE_QUOTA_EXCEED = "ShardWriteQuotaExceed";
const char* LOGE_TIME_EXPIRED = "RequestTimeExpired";
#define SEND_SLEEP_INTERVAL_MS 100
#define MAX_NETWORK_ERROR_SLEEP_MS 3000
#define BASE_NETWORK_ERROR_SLEEP_MS 300
#define MAX_QUOTA_ERROR_SLEEP_MS 10000
#define BASE_QUOTA_ERROR_SLEEP_MS 500
#define MAX_PARAMETER_ERROR_SLEEP_MS 3000
#define BASE_PARAMETER_ERROR_SLEEP_MS 300
#define INVALID_TIME_TRY_INTERVAL 500
#define DROP_FAIL_DATA_TIME_SECOND 86400
#define SEND_TIME_INVALID_FIX
typedef struct _send_error_info
{
log_producer_send_result last_send_error;
int32_t last_sleep_ms;
int32_t first_error_time;
}send_error_info;
int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info);
#ifdef SEND_TIME_INVALID_FIX
void pb_to_webtracking(lz4_log_buf *lz4_buf, lz4_log_buf **new_lz4_buf)
{
aos_debug_log("[sender] pb_to_webtracking start.");
char * buf = (char *)malloc(lz4_buf->raw_length);
if (LOG_LZ4_decompress_safe((const char* )lz4_buf->data, buf, lz4_buf->length, lz4_buf->raw_length) <= 0)
{
free(buf);
aos_fatal_log("[sender] pb_to_webtracking, LOG_LZ4_decompress_safe error");
return;
}
size_t len = serialize_pb_buffer_to_webtracking(buf, lz4_buf->raw_length, &buf);
int compress_bound = LOG_LZ4_compressBound(len);
char *compress_data = (char *)malloc(compress_bound);
int compressed_size = LOG_LZ4_compress_default((char *)buf, compress_data, len, compress_bound);
if(compressed_size <= 0)
{
aos_fatal_log("[sender] pb_to_webtracking, LOG_LZ4_compress_default error");
free(buf);
free(compress_data);
return;
}
*new_lz4_buf = (lz4_log_buf*)malloc(sizeof(lz4_log_buf) + compressed_size);
(*new_lz4_buf)->length = compressed_size;
(*new_lz4_buf)->raw_length = len;
memcpy((*new_lz4_buf)->data, compress_data, compressed_size);
free(buf);
free(compress_data);
aos_debug_log("[sender] pb_to_webtracking end.");
}
void _rebuild_time(lz4_log_buf * lz4_buf, lz4_log_buf ** new_lz4_buf)
{
aos_debug_log("[sender] rebuild log.");
char * buf = (char *)malloc(lz4_buf->raw_length);
if (LOG_LZ4_decompress_safe((const char* )lz4_buf->data, buf, lz4_buf->length, lz4_buf->raw_length) <= 0)
{
free(buf);
aos_fatal_log("[sender] LOG_LZ4_decompress_safe error");
return;
}
uint32_t nowTime = LOG_GET_TIME();
fix_log_group_time(buf, lz4_buf->raw_length, nowTime);
int compress_bound = LOG_LZ4_compressBound(lz4_buf->raw_length);
char *compress_data = (char *)malloc(compress_bound);
int compressed_size = LOG_LZ4_compress_default((char *)buf, compress_data, lz4_buf->raw_length, compress_bound);
if(compressed_size <= 0)
{
aos_fatal_log("[sender] LOG_LZ4_compress_default error");
free(buf);
free(compress_data);
return;
}
*new_lz4_buf = (lz4_log_buf*)malloc(sizeof(lz4_log_buf) + compressed_size);
(*new_lz4_buf)->length = compressed_size;
(*new_lz4_buf)->raw_length = lz4_buf->raw_length;
memcpy((*new_lz4_buf)->data, compress_data, compressed_size);
free(buf);
free(compress_data);
return;
}
#endif
#ifdef WIN32
DWORD WINAPI log_producer_send_thread(LPVOID param)
#else
void * log_producer_send_thread(void * param)
#endif
{
log_producer_manager * producer_manager = (log_producer_manager *)param;
if (producer_manager->sender_data_queue == NULL)
{
return 0;
}
int32_t interval = producer_manager->producer_config->logQueuePopIntervalInMS;
while (!producer_manager->shutdown)
{
// change from 30ms to 1000s, reduce wake up when app switch to back
void * send_param = log_queue_pop(producer_manager->sender_data_queue, interval);
if (send_param != NULL)
{
ATOMICINT_INC(&producer_manager->multi_thread_send_count);
log_producer_send_fun(send_param);
ATOMICINT_DEC(&producer_manager->multi_thread_send_count);
}
}
return 0;
}
void * log_producer_send_fun(void * param)
{
aos_info_log("[sender] start send log data.");
log_producer_send_param * send_param = (log_producer_send_param *)param;
if (send_param->magic_num != LOG_PRODUCER_SEND_MAGIC_NUM)
{
aos_fatal_log("[sender] invalid send param, magic num not found, num 0x%x", send_param->magic_num);
log_producer_manager * producer_manager = (log_producer_manager *)send_param->producer_manager;
if (producer_manager && producer_manager->send_done_function != NULL)
{
producer_manager->send_done_function(producer_manager->producer_config->logstore, LOG_PRODUCER_INVALID, send_param->log_buf->raw_length, send_param->log_buf->length,
NULL, "invalid send param, magic num not found", send_param->log_buf->data, producer_manager->user_param);
}
if (producer_manager && producer_manager->uuid_send_done_function != NULL)
{
producer_manager->uuid_send_done_function(producer_manager->producer_config->logstore,
LOG_PRODUCER_INVALID,
send_param->log_buf->raw_length,
send_param->log_buf->length,
NULL,
"invalid send param, magic num not found",
send_param->log_buf->data,
producer_manager->uuid_user_param,
send_param->start_uuid,
send_param->end_uuid);
}
return NULL;
}
log_producer_config * config = send_param->producer_config;
send_error_info error_info;
memset(&error_info, 0, sizeof(error_info));
log_producer_manager * producer_manager = (log_producer_manager *)send_param->producer_manager;
do
{
if (producer_manager->shutdown)
{
aos_info_log("[sender] send fail but shutdown signal received, force exit");
if (producer_manager->send_done_function != NULL)
{
producer_manager->send_done_function(producer_manager->producer_config->logstore, LOG_PRODUCER_SEND_EXIT_BUFFERED, send_param->log_buf->raw_length, send_param->log_buf->length,
NULL, "producer is being destroyed, producer has no time to send this buffer out", send_param->log_buf->data, producer_manager->user_param);
}
break;
}
lz4_log_buf * send_buf = send_param->log_buf;
#ifdef SEND_TIME_INVALID_FIX
uint32_t nowTime = LOG_GET_TIME();
if (nowTime - send_param->builder_time > 600 || send_param->builder_time - nowTime > 600 || error_info.last_send_error == LOG_SEND_TIME_ERROR)
{
_rebuild_time(send_param->log_buf, &send_buf);
send_param->builder_time = nowTime;
}
#endif
log_post_option option;
memset(&option, 0, sizeof(log_post_option));
option.connect_timeout = config->connectTimeoutSec;
option.operation_timeout = config->sendTimeoutSec;
option.interface = config->netInterface;
option.compress_type = config->compressType;
option.using_https = config->using_https;
option.ntp_time_offset = config->ntpTimeOffset;
option.mode = config->mode;
option.shardKey = config->shardKey;
post_log_result * rst;
if (config->webTracking)
{
pb_to_webtracking(send_param->log_buf, &send_buf);
rst = post_logs_from_lz4buf_webtracking(config->endpoint, config->project, config->logstore, send_buf, &option);
}
else
{
log_sds accessKeyId = NULL;
log_sds accessKey = NULL;
log_sds stsToken = NULL;
log_producer_config_get_security(config, &accessKeyId, &accessKey, &stsToken);
rst = post_logs_from_lz4buf_with_config(config, config->endpoint, config->project, config->logstore, accessKeyId, accessKey, stsToken, send_buf, &option);
log_sdsfree(accessKeyId);
log_sdsfree(accessKey);
log_sdsfree(stsToken);
}
aos_debug_log("[sender] send data result: statusCode: %d, errorMessage: %s, requestID :%s",
rst->statusCode, rst->errorMessage, rst->requestID);
int32_t sleepMs = log_producer_on_send_done(send_param, rst, &error_info);
post_log_result_destroy(rst);
// tmp buffer, free
if (send_buf != send_param->log_buf)
{
free(send_buf);
}
if (sleepMs <= 0)
{
break;
}
int i =0;
for (i = 0; i < sleepMs; i += SEND_SLEEP_INTERVAL_MS)
{
#ifdef WIN32
Sleep(SEND_SLEEP_INTERVAL_MS);
#else
usleep(SEND_SLEEP_INTERVAL_MS * 1000);
#endif
if (producer_manager->shutdown || producer_manager->networkRecover)
{
break;
}
}
if (producer_manager->networkRecover)
{
producer_manager->networkRecover = 0;
}
}while(1);
// at last, free all buffer
free_lz4_log_buf(send_param->log_buf);
free(send_param);
return NULL;
}
int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info)
{
log_producer_send_result send_result = AosStatusToResult(result);
log_producer_manager * producer_manager = (log_producer_manager *)send_param->producer_manager;
if (producer_manager->send_done_function != NULL)
{
log_producer_result callback_result = send_result == LOG_SEND_OK ?
LOG_PRODUCER_OK :
(LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
producer_manager->send_done_function(producer_manager->producer_config->logstore, callback_result, send_param->log_buf->raw_length, send_param->log_buf->length, result->requestID, result->errorMessage, send_param->log_buf->data, producer_manager->user_param);
}
if (producer_manager->uuid_send_done_function != NULL)
{
log_producer_result callback_result = send_result == LOG_SEND_OK ?
LOG_PRODUCER_OK :
(LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
producer_manager->uuid_send_done_function(producer_manager->producer_config->logstore,
callback_result,
send_param->log_buf->raw_length,
send_param->log_buf->length,
result->requestID,
result->errorMessage,
send_param->log_buf->data,
producer_manager->uuid_user_param,
send_param->start_uuid,
send_param->end_uuid);
}
if (send_result == LOG_SEND_UNAUTHORIZED)
{
// if do not drop unauthorized log, change the code to LOG_PRODUCER_SEND_NETWORK_ERROR
if (producer_manager->producer_config->dropUnauthorizedLog == 0)
{
send_result = LOG_PRODUCER_SEND_NETWORK_ERROR;
}
}
switch (send_result)
{
case LOG_SEND_OK:
break;
case LOG_SEND_TIME_ERROR:
// if no this marco, drop data
#ifdef SEND_TIME_INVALID_FIX
error_info->last_send_error = LOG_SEND_TIME_ERROR;
error_info->last_sleep_ms = INVALID_TIME_TRY_INTERVAL;
return error_info->last_sleep_ms;
#else
break;
#endif
case LOG_SEND_QUOTA_EXCEED:
if (error_info->last_send_error != LOG_SEND_QUOTA_EXCEED)
{
error_info->last_send_error = LOG_SEND_QUOTA_EXCEED;
error_info->last_sleep_ms = BASE_QUOTA_ERROR_SLEEP_MS;
error_info->first_error_time = time(NULL);
}
else
{
if (error_info->last_sleep_ms < MAX_QUOTA_ERROR_SLEEP_MS)
{
error_info->last_sleep_ms *= 2;
}
if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
{
break;
}
}
aos_warn_log("[sender] send quota error, project : %s, logstore : %s, buffer len : %d, raw len : %d, code : %d, error msg : %s",
send_param->producer_config->project,
send_param->producer_config->logstore,
(int)send_param->log_buf->length,
(int)send_param->log_buf->raw_length,
result->statusCode,
result->errorMessage);
return error_info->last_sleep_ms;
case LOG_SEND_SERVER_ERROR :
case LOG_SEND_NETWORK_ERROR:
if (error_info->last_send_error != LOG_SEND_NETWORK_ERROR)
{
error_info->last_send_error = LOG_SEND_NETWORK_ERROR;
error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
error_info->first_error_time = time(NULL);
}
else
{
if (error_info->last_sleep_ms < MAX_NETWORK_ERROR_SLEEP_MS)
{
error_info->last_sleep_ms *= 2;
}
if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
{
break;
}
}
aos_warn_log("[sender] send network error, project : %s, logstore : %s, buffer len : %d, raw len : %d, code : %d, error msg : %s",
send_param->producer_config->project,
send_param->producer_config->logstore,
(int)send_param->log_buf->length,
(int)send_param->log_buf->raw_length,
result->statusCode,
result->errorMessage);
return error_info->last_sleep_ms;
case LOG_SEND_PARAMETERS_ERROR:
if (error_info->last_send_error != LOG_SEND_PARAMETERS_ERROR)
{
error_info->last_send_error = LOG_SEND_PARAMETERS_ERROR;
error_info->last_sleep_ms = BASE_PARAMETER_ERROR_SLEEP_MS;
error_info->first_error_time = time(NULL);
}
else
{
if (error_info->last_sleep_ms < MAX_PARAMETER_ERROR_SLEEP_MS)
{
error_info->last_sleep_ms *= 2;
}
if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
{
break;
}
}
aos_warn_log("[sender] send parameters error, project : %s, logstore : %s, buffer len : %d, raw len : %d, code : %d, error msg : %s",
send_param->producer_config->project,
send_param->producer_config->logstore,
(int)send_param->log_buf->length,
(int)send_param->log_buf->raw_length,
result->statusCode,
result->errorMessage);
return error_info->last_sleep_ms;
default:
// discard data
break;
}
// always try once when discard error
if (LOG_SEND_OK != send_result && error_info->last_send_error == 0)
{
error_info->last_send_error = LOG_SEND_DISCARD_ERROR;
error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
error_info->first_error_time = time(NULL);
aos_warn_log("[sender] send fail, the error is discard data, retry once, project : %s, logstore : %s, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
send_param->producer_config->project,
send_param->producer_config->logstore,
(int)send_param->log_buf->length,
(int)send_param->log_buf->raw_length,
(int)producer_manager->totalBufferSize,
result->statusCode,
result->errorMessage);
return BASE_NETWORK_ERROR_SLEEP_MS;
}
CS_ENTER(producer_manager->lock);
producer_manager->totalBufferSize -= send_param->log_buf->length;
CS_LEAVE(producer_manager->lock);
if (send_result == LOG_SEND_OK)
{
aos_debug_log("[sender] send success, project : %s, logstore : %s, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
send_param->producer_config->project,
send_param->producer_config->logstore,
(int)send_param->log_buf->length,
(int)send_param->log_buf->raw_length,
(int)producer_manager->totalBufferSize,
result->statusCode,
result->errorMessage);
}
else
{
aos_warn_log("[sender] send fail, discard data, project : %s, logstore : %s, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
send_param->producer_config->project,
send_param->producer_config->logstore,
(int)send_param->log_buf->length,
(int)send_param->log_buf->raw_length,
(int)producer_manager->totalBufferSize,
result->statusCode,
result->errorMessage);
if (producer_manager->send_done_function != NULL)
{
producer_manager->send_done_function(producer_manager->producer_config->logstore,
LOG_PRODUCER_DROP_ERROR,
send_param->log_buf->raw_length,
send_param->log_buf->length,
result->requestID,
result->errorMessage,
send_param->log_buf->data,
producer_manager->user_param);
}
if (producer_manager->uuid_send_done_function != NULL)
{
producer_manager->uuid_send_done_function(producer_manager->producer_config->logstore,
LOG_PRODUCER_DROP_ERROR,
send_param->log_buf->raw_length,
send_param->log_buf->length,
result->requestID,
result->errorMessage,
send_param->log_buf->data,
producer_manager->uuid_user_param,
send_param->start_uuid,
send_param->end_uuid);
}
}
return 0;
}
log_producer_result log_producer_send_data(log_producer_send_param * send_param)
{
log_producer_send_fun(send_param);
return LOG_PRODUCER_OK;
}
log_producer_send_result AosStatusToResult(post_log_result * result)
{
if (result->statusCode / 100 == 2)
{
return LOG_SEND_OK;
}
if (result->statusCode <= 0)
{
return LOG_SEND_NETWORK_ERROR;
}
if (result->statusCode == 405)
{
return LOG_SEND_PARAMETERS_ERROR;
}
if (result->statusCode == 403)
{
return LOG_SEND_QUOTA_EXCEED;
}
if (result->statusCode == 401 || result->statusCode == 404)
{
return LOG_SEND_UNAUTHORIZED;
}
if (result->statusCode >= 500 || result->requestID == NULL)
{
return LOG_SEND_SERVER_ERROR;
}
if (result->errorMessage != NULL && strstr(result->errorMessage, LOGE_TIME_EXPIRED) != NULL)
{
return LOG_SEND_TIME_ERROR;
}
return LOG_SEND_DISCARD_ERROR;
}
log_producer_send_param * create_log_producer_send_param(log_producer_config * producer_config,
void * producer_manager,
lz4_log_buf * log_buf,
log_group_builder * builder)
{
log_producer_send_param * param = (log_producer_send_param *)malloc(sizeof(log_producer_send_param));
param->producer_config = producer_config;
param->producer_manager = producer_manager;
param->log_buf = log_buf;
param->magic_num = LOG_PRODUCER_SEND_MAGIC_NUM;
if (builder != NULL)
{
param->builder_time = builder->builder_time;
param->start_uuid = builder->start_uuid;
param->end_uuid = builder->end_uuid;
}
else
{
param->builder_time = time(NULL);
param->start_uuid = -1;
param->end_uuid = -1;
}
return param;
}