void destroy_log_producer_manager()

in Sources/aliyun-log-c-sdk/log_producer_manager.c [339:426]


void destroy_log_producer_manager(log_producer_manager * manager)
{
    // when destroy instance, flush last loggroup
    _push_last_loggroup(manager);

    aos_info_log("flush out producer loggroup begin");
    int32_t total_wait_count = manager->producer_config->destroyFlusherWaitTimeoutSec > 0 ? manager->producer_config->destroyFlusherWaitTimeoutSec * 100 : MAX_MANAGER_FLUSH_COUNT;
    total_wait_count += manager->producer_config->destroySenderWaitTimeoutSec > 0 ? manager->producer_config->destroySenderWaitTimeoutSec * 100 : MAX_SENDER_FLUSH_COUNT;

#ifdef WIN32
    Sleep(10);
#else
    usleep(10 * 1000);
#endif

    int waitCount = 0;
    while (log_queue_size(manager->loggroup_queue) > 0 ||
            manager->send_param_queue_write - manager->send_param_queue_read > 0 ||
            (manager->sender_data_queue != NULL && log_queue_size(manager->sender_data_queue) > 0) )
    {
#ifdef WIN32
        Sleep(10);
#else
        usleep(10 * 1000);
#endif
        if (++waitCount == total_wait_count)
        {
            break;
        }
    }
    if (waitCount == total_wait_count)
    {
        aos_error_log("try flush out producer loggroup error, force exit, now loggroup %d", (int)(log_queue_size(manager->loggroup_queue)));
    }
    else
    {
      aos_info_log("flush out producer loggroup success");
    }
    manager->shutdown = 1;

    // destroy root resources
    COND_SIGNAL(manager->triger_cond);
    aos_info_log("join flush thread begin");
    if (manager->flush_thread) {
        THREAD_JOIN(manager->flush_thread);
    }
    aos_info_log("join flush thread success");
    if (manager->send_threads != NULL)
    {
        aos_info_log("join sender thread pool begin");
        int32_t threadId = 0;
        for (; threadId < manager->producer_config->sendThreadCount; ++threadId)
        {
            if (manager->send_threads[threadId]) {
                THREAD_JOIN(manager->send_threads[threadId]);
            }
        }
        free(manager->send_threads);
        aos_info_log("join sender thread pool success");
    }
    DeleteCond(manager->triger_cond);
    log_queue_destroy(manager->loggroup_queue);
    if (manager->sender_data_queue != NULL)
    {
        aos_info_log("flush out sender queue begin");
        while (log_queue_size(manager->sender_data_queue) > 0)
        {
            void * send_param = log_queue_trypop(manager->sender_data_queue);
            if (send_param != NULL)
            {
                log_producer_send_fun(send_param);
            }
        }
        log_queue_destroy(manager->sender_data_queue);
        aos_info_log("flush out sender queue success");
    }
    ReleaseCriticalSection(manager->lock);
    if (manager->pack_prefix != NULL)
    {
        free(manager->pack_prefix);
    }
    if (manager->send_param_queue != NULL)
    {
        free(manager->send_param_queue);
    }
    log_sdsfree(manager->source);
    free(manager);
}