in src/log_producer_manager.c [404:458]
log_producer_result log_producer_manager_add_log(log_producer_manager * producer_manager, uint32_t log_time, int32_t pair_count, char ** keys, size_t * key_lens, char ** values, size_t * val_lens)
{
if (producer_manager->totalBufferSize > producer_manager->producer_config->maxBufferBytes)
{
return LOG_PRODUCER_DROP_ERROR;
}
CS_ENTER(producer_manager->lock);
if (producer_manager->builder == NULL)
{
// if queue is full, return drop error
if (log_queue_isfull(producer_manager->loggroup_queue))
{
CS_LEAVE(producer_manager->lock);
return LOG_PRODUCER_DROP_ERROR;
}
int32_t now_time = time(NULL);
producer_manager->builder = log_group_create();
producer_manager->firstLogTime = now_time;
producer_manager->builder->private_value = producer_manager;
}
add_log_full(producer_manager->builder, log_time, pair_count, keys, key_lens, values, val_lens);
log_group_builder * builder = producer_manager->builder;
int32_t nowTime = time(NULL);
if (producer_manager->builder->loggroup_size < producer_manager->producer_config->logBytesPerPackage &&
nowTime - producer_manager->firstLogTime < producer_manager->producer_config->packageTimeoutInMS / 1000 &&
producer_manager->builder->grp->n_logs < producer_manager->producer_config->logCountPerPackage)
{
CS_LEAVE(producer_manager->lock);
return LOG_PRODUCER_OK;
}
producer_manager->builder = NULL;
size_t loggroup_size = builder->loggroup_size;
aos_debug_log("try push loggroup to flusher, size : %d, log count %d", (int)builder->loggroup_size, (int)builder->grp->n_logs);
int status = log_queue_push(producer_manager->loggroup_queue, builder);
if (status != 0)
{
aos_error_log("try push loggroup to flusher failed, force drop this log group, error code : %d", status);
log_group_destroy(builder);
}
else
{
producer_manager->totalBufferSize += loggroup_size;
COND_SIGNAL(producer_manager->triger_cond);
}
CS_LEAVE(producer_manager->lock);
return LOG_PRODUCER_OK;
}