log_producer_result log_producer_manager_add_log()

in src/log_producer_manager.c [404:458]


log_producer_result log_producer_manager_add_log(log_producer_manager * producer_manager, uint32_t log_time, int32_t pair_count, char ** keys, size_t * key_lens, char ** values, size_t * val_lens)
{
    if (producer_manager->totalBufferSize > producer_manager->producer_config->maxBufferBytes)
    {
        return LOG_PRODUCER_DROP_ERROR;
    }
    CS_ENTER(producer_manager->lock);
    if (producer_manager->builder == NULL)
    {
        // if queue is full, return drop error
        if (log_queue_isfull(producer_manager->loggroup_queue))
        {
            CS_LEAVE(producer_manager->lock);
            return LOG_PRODUCER_DROP_ERROR;
        }
        int32_t now_time = time(NULL);

        producer_manager->builder = log_group_create();
        producer_manager->firstLogTime = now_time;
        producer_manager->builder->private_value = producer_manager;
    }

    add_log_full(producer_manager->builder, log_time, pair_count, keys, key_lens, values, val_lens);

    log_group_builder * builder = producer_manager->builder;

    int32_t nowTime = time(NULL);
    if (producer_manager->builder->loggroup_size < producer_manager->producer_config->logBytesPerPackage &&
            nowTime - producer_manager->firstLogTime < producer_manager->producer_config->packageTimeoutInMS / 1000 &&
            producer_manager->builder->grp->n_logs < producer_manager->producer_config->logCountPerPackage)
    {
        CS_LEAVE(producer_manager->lock);
        return LOG_PRODUCER_OK;
    }

    producer_manager->builder = NULL;

    size_t loggroup_size = builder->loggroup_size;
    aos_debug_log("try push loggroup to flusher, size : %d, log count %d", (int)builder->loggroup_size, (int)builder->grp->n_logs);
    int status = log_queue_push(producer_manager->loggroup_queue, builder);
    if (status != 0)
    {
        aos_error_log("try push loggroup to flusher failed, force drop this log group, error code : %d", status);
        log_group_destroy(builder);
    }
    else
    {
        producer_manager->totalBufferSize += loggroup_size;
        COND_SIGNAL(producer_manager->triger_cond);
    }

    CS_LEAVE(producer_manager->lock);

    return LOG_PRODUCER_OK;
}