in src/log_producer_manager.c [325:402]
void destroy_log_producer_manager(log_producer_manager * manager)
{
// when destroy instance, flush last loggroup
_push_last_loggroup(manager);
aos_info_log("flush out producer loggroup begin");
int32_t total_wait_count = manager->producer_config->destroyFlusherWaitTimeoutSec > 0 ? manager->producer_config->destroyFlusherWaitTimeoutSec * 100 : MAX_MANAGER_FLUSH_COUNT;
total_wait_count += manager->producer_config->destroySenderWaitTimeoutSec > 0 ? manager->producer_config->destroySenderWaitTimeoutSec * 100 : MAX_SENDER_FLUSH_COUNT;
usleep(10 * 1000);
int waitCount = 0;
while (log_queue_size(manager->loggroup_queue) > 0 ||
manager->send_param_queue_write - manager->send_param_queue_read > 0 ||
(manager->sender_data_queue != NULL && log_queue_size(manager->sender_data_queue) > 0) )
{
usleep(10 * 1000);
if (++waitCount == total_wait_count)
{
break;
}
}
if (waitCount == total_wait_count)
{
aos_error_log("try flush out producer loggroup error, force exit, now loggroup %d", (int)(log_queue_size(manager->loggroup_queue)));
}
else
{
aos_info_log("flush out producer loggroup success");
}
manager->shutdown = 1;
// destroy root resources
COND_SIGNAL(manager->triger_cond);
aos_info_log("join flush thread begin");
THREAD_JOIN(manager->flush_thread);
aos_info_log("join flush thread success");
if (manager->send_threads != NULL)
{
aos_info_log("join sender thread pool begin");
int32_t threadId = 0;
for (; threadId < manager->producer_config->sendThreadCount; ++threadId)
{
THREAD_JOIN(manager->send_threads[threadId]);
}
free(manager->send_threads);
aos_info_log("join sender thread pool success");
}
DeleteCond(manager->triger_cond);
log_queue_destroy(manager->loggroup_queue);
if (manager->sender_data_queue != NULL)
{
aos_info_log("flush out sender queue begin");
while (log_queue_size(manager->sender_data_queue) > 0)
{
void * send_param = log_queue_trypop(manager->sender_data_queue);
if (send_param != NULL)
{
log_producer_send_fun(send_param);
}
}
log_queue_destroy(manager->sender_data_queue);
aos_info_log("flush out sender queue success");
}
else if (g_sender_data_queue != NULL && g_send_threads != NULL)
{
// if use global send queue, let send thread destroy manager
log_producer_send_param * destroy_param = create_log_producer_destroy_param(manager->producer_config, manager);
// make sure push success
while(0 != log_queue_push(g_sender_data_queue, destroy_param))
{
;
}
return;
}
destroy_log_producer_manager_tail(manager);
}