DWORD WINAPI log_producer_flush_thread()

in Sources/aliyun-log-c-sdk/log_producer_manager.c [110:240]


DWORD WINAPI log_producer_flush_thread(LPVOID param)
#else
void * log_producer_flush_thread(void * param)
#endif
{
    log_producer_manager * root_producer_manager = (log_producer_manager*)param;
    int32_t interval = root_producer_manager->producer_config->flushIntervalInMS;
    aos_info_log("[flusher] start run flusher thread, config : %s, flush interval: %d", root_producer_manager->producer_config->logstore, interval);
    while (root_producer_manager->shutdown == 0)
    {
        CS_ENTER(root_producer_manager->lock);
        COND_WAIT_TIME(root_producer_manager->triger_cond,
                       root_producer_manager->lock,
                       interval);
        CS_LEAVE(root_producer_manager->lock);

        // try read queue
        do
        {
            // if send queue is full, skip pack and send data
            if (root_producer_manager->send_param_queue_write - root_producer_manager->send_param_queue_read >= root_producer_manager->send_param_queue_size)
            {
                break;
            }
            void * data = log_queue_trypop(root_producer_manager->loggroup_queue);
            if (data != NULL)
            {
                // process data
                log_group_builder * builder = (log_group_builder*)data;

                log_producer_manager * producer_manager = (log_producer_manager *)builder->private_value;
                CS_ENTER(root_producer_manager->lock);
                producer_manager->totalBufferSize -= builder->loggroup_size;
                CS_LEAVE(root_producer_manager->lock);


                log_producer_config * config = producer_manager->producer_config;
                int i = 0;
                for (i = 0; i < config->tagCount; ++i)
                {
                    add_tag(builder, config->tags[i].key, strlen(config->tags[i].key), config->tags[i].value, strlen(config->tags[i].value));
                }
                if (config->topic != NULL)
                {
                    add_topic(builder, config->topic, strlen(config->topic));
                }
                if (producer_manager->source != NULL)
                {
                    add_source(builder, producer_manager->source, strlen(producer_manager->source));
                }
                if (producer_manager->pack_prefix != NULL)
                {
                    add_pack_id(builder, producer_manager->pack_prefix, strlen(producer_manager->pack_prefix), producer_manager->pack_index++);
                }

                lz4_log_buf * lz4_buf = NULL;
                // check compress type
                if (config->compressType == 1)
                {
                  lz4_buf = serialize_to_proto_buf_with_malloc_lz4(builder);
                }
                else
                {
                  lz4_buf = serialize_to_proto_buf_with_malloc_no_lz4(builder);
                }

                if (lz4_buf == NULL)
                {
                    aos_error_log("[flusher] serialize loggroup to proto buf with lz4 failed");
                    if (producer_manager->send_done_function)
                    {
                          producer_manager->send_done_function(producer_manager->producer_config->logstore, LOG_PRODUCER_DROP_ERROR, builder->loggroup_size, 0,
                                                               NULL, "serialize loggroup to proto buf with lz4 failed", NULL, producer_manager->user_param);
                    }
                    if (producer_manager->uuid_send_done_function != NULL)
                    {
                        producer_manager->uuid_send_done_function(producer_manager->producer_config->logstore,
                                                                  LOG_PRODUCER_INVALID,
                                                                  builder->loggroup_size,
                                                                  0,
                                                                  NULL,
                                                                  "invalid send param, magic num not found",
                                                                  NULL,
                                                                  producer_manager->uuid_user_param,
                                                                  builder->start_uuid,
                                                                  builder->end_uuid);
                    }
                }
                else
                {
                    CS_ENTER(root_producer_manager->lock);
                    producer_manager->totalBufferSize += lz4_buf->length;
                    CS_LEAVE(root_producer_manager->lock);

                    aos_debug_log("[flusher] push loggroup to sender, config %s, loggroup size %d, lz4 size %d, now buffer size %d",
                                  config->logstore, (int)lz4_buf->raw_length, (int)lz4_buf->length, (int)producer_manager->totalBufferSize);
                    // if use multi thread, should change producer_manager->send_pool to NULL
                    //apr_pool_t * pool = config->sendThreadCount == 1 ? producer_manager->send_pool : NULL;
                    log_producer_send_param * send_param = create_log_producer_send_param(config, producer_manager, lz4_buf, builder);
                    root_producer_manager->send_param_queue[root_producer_manager->send_param_queue_write++ % root_producer_manager->send_param_queue_size] = send_param;
                }
                log_group_destroy(builder);
                continue;
            }
            break;
        }while(1);

        // if no job, check now loggroup
        _try_flush_loggroup(root_producer_manager);

        // send data
        if (root_producer_manager->send_threads != NULL)
        {
            // if send thread count > 0, we just push send_param to sender queue
            while (root_producer_manager->send_param_queue_write > root_producer_manager->send_param_queue_read && !log_queue_isfull(root_producer_manager->sender_data_queue))
            {
                log_producer_send_param * send_param = root_producer_manager->send_param_queue[root_producer_manager->send_param_queue_read++ % root_producer_manager->send_param_queue_size];
                // push always success
                log_queue_push(root_producer_manager->sender_data_queue, send_param);
            }
        }
        else if (root_producer_manager->send_param_queue_write > root_producer_manager->send_param_queue_read)
        {
            // if no sender thread, we send this packet out in flush thread
            log_producer_send_param * send_param = root_producer_manager->send_param_queue[root_producer_manager->send_param_queue_read++ % root_producer_manager->send_param_queue_size];
            log_producer_send_data(send_param);
        }
    }
    aos_info_log("[flusher] exit flusher thread, config : %s", root_producer_manager->producer_config->logstore);
    return 0;
}