in src/log_producer_sender.c [238:361]
int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info)
{
log_producer_send_result send_result = AosStatusToResult(result);
log_producer_manager * producer_manager = (log_producer_manager *)send_param->producer_manager;
if (producer_manager->send_done_function != NULL)
{
log_producer_result callback_result = send_result == LOG_SEND_OK ?
LOG_PRODUCER_OK :
(LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
producer_manager->send_done_function(producer_manager->producer_config->logstore,
callback_result,
send_param->log_buf->raw_length,
send_param->log_buf->length,
result->requestID,
result->errorMessage,
send_param->log_buf->data);
}
switch (send_result)
{
case LOG_SEND_OK:
break;
case LOG_SEND_TIME_ERROR:
// if no this marco, drop data
#ifdef SEND_TIME_INVALID_FIX
error_info->last_send_error = LOG_SEND_TIME_ERROR;
error_info->last_sleep_ms = INVALID_TIME_TRY_INTERVAL;
return error_info->last_sleep_ms;
#else
break;
#endif
case LOG_SEND_QUOTA_EXCEED:
if (error_info->last_send_error != LOG_SEND_QUOTA_EXCEED)
{
error_info->last_send_error = LOG_SEND_QUOTA_EXCEED;
error_info->last_sleep_ms = BASE_QUOTA_ERROR_SLEEP_MS;
error_info->first_error_time = time(NULL);
}
else
{
if (error_info->last_sleep_ms < MAX_QUOTA_ERROR_SLEEP_MS)
{
error_info->last_sleep_ms *= 2;
}
#ifndef SEND_TIME_INVALID_FIX
// only drop data when SEND_TIME_INVALID_FIX not defined
if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
{
break;
}
#endif
}
aos_warn_log("send quota error, project : %s, logstore : %s, buffer len : %d, raw len : %d, code : %d, error msg : %s",
send_param->producer_config->project,
send_param->producer_config->logstore,
(int)send_param->log_buf->length,
(int)send_param->log_buf->raw_length,
result->statusCode,
result->errorMessage);
return error_info->last_sleep_ms;
case LOG_SEND_SERVER_ERROR :
case LOG_SEND_NETWORK_ERROR:
if (error_info->last_send_error != LOG_SEND_NETWORK_ERROR)
{
error_info->last_send_error = LOG_SEND_NETWORK_ERROR;
error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
error_info->first_error_time = time(NULL);
}
else
{
if (error_info->last_sleep_ms < MAX_NETWORK_ERROR_SLEEP_MS)
{
error_info->last_sleep_ms *= 2;
}
#ifndef SEND_TIME_INVALID_FIX
// only drop data when SEND_TIME_INVALID_FIX not defined
if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
{
break;
}
#endif
}
aos_warn_log("send network error, project : %s, logstore : %s, buffer len : %d, raw len : %d, code : %d, error msg : %s",
send_param->producer_config->project,
send_param->producer_config->logstore,
(int)send_param->log_buf->length,
(int)send_param->log_buf->raw_length,
result->statusCode,
result->errorMessage == NULL ? "" : result->errorMessage);
return error_info->last_sleep_ms;
default:
// discard data
break;
}
CS_ENTER(producer_manager->lock);
producer_manager->totalBufferSize -= send_param->log_buf->length;
CS_LEAVE(producer_manager->lock);
if (send_result == LOG_SEND_OK)
{
aos_debug_log("send success, project : %s, logstore : %s, buffer len : %d, raw len : %d, total buffer : %d, code : %d, error msg : %s",
send_param->producer_config->project,
send_param->producer_config->logstore,
(int)send_param->log_buf->length,
(int)send_param->log_buf->raw_length,
(int)producer_manager->totalBufferSize,
result->statusCode,
result->errorMessage);
}
else
{
aos_warn_log("send fail, discard data, project : %s, logstore : %s, buffer len : %d, raw len : %d, total buffer : %d, code : %d, error msg : %s",
send_param->producer_config->project,
send_param->producer_config->logstore,
(int)send_param->log_buf->length,
(int)send_param->log_buf->raw_length,
(int)producer_manager->totalBufferSize,
result->statusCode,
result->errorMessage);
}
return 0;
}