STATUS shutdownStreamCurl()

in src/source/CurlApiCallbacks.c [683:787]


STATUS shutdownStreamCurl(UINT64 customData, STREAM_HANDLE streamHandle, BOOL resetStream)
{
    ENTERS();
    STATUS retStatus = STATUS_SUCCESS;
    BOOL shutdownLocked = FALSE, activeRequestExists = FALSE, requestLocked = FALSE, uploadsLocked = FALSE, alreadyShutdown = FALSE;
    UINT32 activeUploadCount = 0;
    PDoubleListNode pCurNode = NULL;
    UINT64 data;
    PCurlRequest pCurlRequest = NULL;

    PCurlApiCallbacks pCurlApiCallbacks = (PCurlApiCallbacks) customData;

    CHK(pCurlApiCallbacks != NULL && pCurlApiCallbacks->pCallbacksProvider != NULL, STATUS_INVALID_ARG);

    // store the information that this stream is shutting down
    pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.lockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                       pCurlApiCallbacks->shutdownLock);
    shutdownLocked = TRUE;
    CHK_STATUS(hashTableContains(pCurlApiCallbacks->pStreamsShuttingDown, streamHandle, &alreadyShutdown));
    CHK_WARN(!alreadyShutdown, retStatus, "shutdownStreamCurl called when already in progress of shutting down");

    CHK_STATUS(hashTablePut(pCurlApiCallbacks->pStreamsShuttingDown, streamHandle, streamHandle));
    pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                         pCurlApiCallbacks->shutdownLock);
    shutdownLocked = FALSE;

    // Shutdown active requests, terminate curl session if thread is blocked in curl
    CHK_STATUS(curlApiCallbacksShutdownActiveRequests(pCurlApiCallbacks, streamHandle, CURL_API_CALLBACKS_SHUTDOWN_TIMEOUT, FALSE, FALSE));

    // Shutdown active uploads, terminate curl session if thread is blocked in curl
    CHK_STATUS(curlApiCallbacksShutdownActiveUploads(pCurlApiCallbacks, streamHandle, INVALID_UPLOAD_HANDLE_VALUE,
                                                     CURL_API_CALLBACKS_SHUTDOWN_TIMEOUT, FALSE, FALSE));

    // Clear out the cached endpoints only when not resetting
    if (!resetStream) {
        CHK_STATUS(curlApiCallbacksShutdownCachedEndpoints(pCurlApiCallbacks, streamHandle, TRUE));
    }

    // at this point all remaining threads should not be blocked and reach termination shortly. Thus spin wait for it
    // to remove itself from the pActiveRequests hashtable and pActiveUploads list
    do {
        THREAD_SLEEP(CURL_API_DEFAULT_SHUTDOWN_POLLING_INTERVAL);
        pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.lockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                           pCurlApiCallbacks->activeRequestsLock);
        requestLocked = TRUE;
        CHK_STATUS(hashTableContains(pCurlApiCallbacks->pActiveRequests, streamHandle, &activeRequestExists));
        pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                             pCurlApiCallbacks->activeRequestsLock);
        requestLocked = FALSE;
    } while (activeRequestExists);

    // spin until all active uploads for streamHandle has terminated
    do {
        THREAD_SLEEP(CURL_API_DEFAULT_SHUTDOWN_POLLING_INTERVAL);
        pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.lockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                           pCurlApiCallbacks->activeUploadsLock);
        uploadsLocked = TRUE;

        activeUploadCount = 0;
        CHK_STATUS(doubleListGetHeadNode(pCurlApiCallbacks->pActiveUploads, &pCurNode));
        while (pCurNode != NULL) {
            CHK_STATUS(doubleListGetNodeData(pCurNode, &data));
            pCurNode = pCurNode->pNext;

            pCurlRequest = (PCurlRequest) data;
            if (pCurlRequest->streamHandle == streamHandle) {
                activeUploadCount++;
            }
        }
        pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                             pCurlApiCallbacks->activeUploadsLock);
        uploadsLocked = FALSE;
    } while (activeUploadCount != 0);

    // shutdown completed, remove streamHandle from pStreamsShuttingDown.
    pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.lockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                       pCurlApiCallbacks->shutdownLock);
    shutdownLocked = TRUE;
    CHK_STATUS(hashTableRemove(pCurlApiCallbacks->pStreamsShuttingDown, streamHandle));
    pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                         pCurlApiCallbacks->shutdownLock);
    shutdownLocked = FALSE;

CleanUp:

    CHK_LOG_ERR(retStatus);

    if (shutdownLocked) {
        pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                             pCurlApiCallbacks->shutdownLock);
    }

    if (requestLocked) {
        pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                             pCurlApiCallbacks->activeRequestsLock);
    }

    if (uploadsLocked) {
        pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                             pCurlApiCallbacks->activeUploadsLock);
    }

    LEAVES();
    return retStatus;
}