in Sources/aliyun-log-c-sdk/log_producer_manager.c [110:240]
DWORD WINAPI log_producer_flush_thread(LPVOID param)
#else
void * log_producer_flush_thread(void * param)
#endif
{
log_producer_manager * root_producer_manager = (log_producer_manager*)param;
int32_t interval = root_producer_manager->producer_config->flushIntervalInMS;
aos_info_log("[flusher] start run flusher thread, config : %s, flush interval: %d", root_producer_manager->producer_config->logstore, interval);
while (root_producer_manager->shutdown == 0)
{
CS_ENTER(root_producer_manager->lock);
COND_WAIT_TIME(root_producer_manager->triger_cond,
root_producer_manager->lock,
interval);
CS_LEAVE(root_producer_manager->lock);
// try read queue
do
{
// if send queue is full, skip pack and send data
if (root_producer_manager->send_param_queue_write - root_producer_manager->send_param_queue_read >= root_producer_manager->send_param_queue_size)
{
break;
}
void * data = log_queue_trypop(root_producer_manager->loggroup_queue);
if (data != NULL)
{
// process data
log_group_builder * builder = (log_group_builder*)data;
log_producer_manager * producer_manager = (log_producer_manager *)builder->private_value;
CS_ENTER(root_producer_manager->lock);
producer_manager->totalBufferSize -= builder->loggroup_size;
CS_LEAVE(root_producer_manager->lock);
log_producer_config * config = producer_manager->producer_config;
int i = 0;
for (i = 0; i < config->tagCount; ++i)
{
add_tag(builder, config->tags[i].key, strlen(config->tags[i].key), config->tags[i].value, strlen(config->tags[i].value));
}
if (config->topic != NULL)
{
add_topic(builder, config->topic, strlen(config->topic));
}
if (producer_manager->source != NULL)
{
add_source(builder, producer_manager->source, strlen(producer_manager->source));
}
if (producer_manager->pack_prefix != NULL)
{
add_pack_id(builder, producer_manager->pack_prefix, strlen(producer_manager->pack_prefix), producer_manager->pack_index++);
}
lz4_log_buf * lz4_buf = NULL;
// check compress type
if (config->compressType == 1)
{
lz4_buf = serialize_to_proto_buf_with_malloc_lz4(builder);
}
else
{
lz4_buf = serialize_to_proto_buf_with_malloc_no_lz4(builder);
}
if (lz4_buf == NULL)
{
aos_error_log("[flusher] serialize loggroup to proto buf with lz4 failed");
if (producer_manager->send_done_function)
{
producer_manager->send_done_function(producer_manager->producer_config->logstore, LOG_PRODUCER_DROP_ERROR, builder->loggroup_size, 0,
NULL, "serialize loggroup to proto buf with lz4 failed", NULL, producer_manager->user_param);
}
if (producer_manager->uuid_send_done_function != NULL)
{
producer_manager->uuid_send_done_function(producer_manager->producer_config->logstore,
LOG_PRODUCER_INVALID,
builder->loggroup_size,
0,
NULL,
"invalid send param, magic num not found",
NULL,
producer_manager->uuid_user_param,
builder->start_uuid,
builder->end_uuid);
}
}
else
{
CS_ENTER(root_producer_manager->lock);
producer_manager->totalBufferSize += lz4_buf->length;
CS_LEAVE(root_producer_manager->lock);
aos_debug_log("[flusher] push loggroup to sender, config %s, loggroup size %d, lz4 size %d, now buffer size %d",
config->logstore, (int)lz4_buf->raw_length, (int)lz4_buf->length, (int)producer_manager->totalBufferSize);
// if use multi thread, should change producer_manager->send_pool to NULL
//apr_pool_t * pool = config->sendThreadCount == 1 ? producer_manager->send_pool : NULL;
log_producer_send_param * send_param = create_log_producer_send_param(config, producer_manager, lz4_buf, builder);
root_producer_manager->send_param_queue[root_producer_manager->send_param_queue_write++ % root_producer_manager->send_param_queue_size] = send_param;
}
log_group_destroy(builder);
continue;
}
break;
}while(1);
// if no job, check now loggroup
_try_flush_loggroup(root_producer_manager);
// send data
if (root_producer_manager->send_threads != NULL)
{
// if send thread count > 0, we just push send_param to sender queue
while (root_producer_manager->send_param_queue_write > root_producer_manager->send_param_queue_read && !log_queue_isfull(root_producer_manager->sender_data_queue))
{
log_producer_send_param * send_param = root_producer_manager->send_param_queue[root_producer_manager->send_param_queue_read++ % root_producer_manager->send_param_queue_size];
// push always success
log_queue_push(root_producer_manager->sender_data_queue, send_param);
}
}
else if (root_producer_manager->send_param_queue_write > root_producer_manager->send_param_queue_read)
{
// if no sender thread, we send this packet out in flush thread
log_producer_send_param * send_param = root_producer_manager->send_param_queue[root_producer_manager->send_param_queue_read++ % root_producer_manager->send_param_queue_size];
log_producer_send_data(send_param);
}
}
aos_info_log("[flusher] exit flusher thread, config : %s", root_producer_manager->producer_config->logstore);
return 0;
}