void destroy_log_producer_manager()

in src/log_producer_manager.c [325:402]


void destroy_log_producer_manager(log_producer_manager * manager)
{
    // when destroy instance, flush last loggroup
    _push_last_loggroup(manager);

    aos_info_log("flush out producer loggroup begin");
    int32_t total_wait_count = manager->producer_config->destroyFlusherWaitTimeoutSec > 0 ? manager->producer_config->destroyFlusherWaitTimeoutSec * 100 : MAX_MANAGER_FLUSH_COUNT;
    total_wait_count += manager->producer_config->destroySenderWaitTimeoutSec > 0 ? manager->producer_config->destroySenderWaitTimeoutSec * 100 : MAX_SENDER_FLUSH_COUNT;

    usleep(10 * 1000);
    int waitCount = 0;
    while (log_queue_size(manager->loggroup_queue) > 0 ||
            manager->send_param_queue_write - manager->send_param_queue_read > 0 ||
            (manager->sender_data_queue != NULL && log_queue_size(manager->sender_data_queue) > 0) )
    {
        usleep(10 * 1000);
        if (++waitCount == total_wait_count)
        {
            break;
        }
    }
    if (waitCount == total_wait_count)
    {
        aos_error_log("try flush out producer loggroup error, force exit, now loggroup %d", (int)(log_queue_size(manager->loggroup_queue)));
    }
    else
    {
        aos_info_log("flush out producer loggroup success");
    }
    manager->shutdown = 1;

    // destroy root resources
    COND_SIGNAL(manager->triger_cond);
    aos_info_log("join flush thread begin");
    THREAD_JOIN(manager->flush_thread);
    aos_info_log("join flush thread success");
    if (manager->send_threads != NULL)
    {
        aos_info_log("join sender thread pool begin");
        int32_t threadId = 0;
        for (; threadId < manager->producer_config->sendThreadCount; ++threadId)
        {
            THREAD_JOIN(manager->send_threads[threadId]);
        }
        free(manager->send_threads);
        aos_info_log("join sender thread pool success");
    }
    DeleteCond(manager->triger_cond);
    log_queue_destroy(manager->loggroup_queue);
    if (manager->sender_data_queue != NULL)
    {
        aos_info_log("flush out sender queue begin");
        while (log_queue_size(manager->sender_data_queue) > 0)
        {
            void * send_param = log_queue_trypop(manager->sender_data_queue);
            if (send_param != NULL)
            {
                log_producer_send_fun(send_param);
            }
        }
        log_queue_destroy(manager->sender_data_queue);
        aos_info_log("flush out sender queue success");
    }
    else if (g_sender_data_queue != NULL && g_send_threads != NULL)
    {
        // if use global send queue, let send thread destroy manager
        log_producer_send_param * destroy_param = create_log_producer_destroy_param(manager->producer_config, manager);
        // make sure push success
        while(0 != log_queue_push(g_sender_data_queue, destroy_param))
        {
            ;
        }
        return;
    }

    destroy_log_producer_manager_tail(manager);

}