in src/source/CurlApiCallbacks.c [683:787]
STATUS shutdownStreamCurl(UINT64 customData, STREAM_HANDLE streamHandle, BOOL resetStream)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
BOOL shutdownLocked = FALSE, activeRequestExists = FALSE, requestLocked = FALSE, uploadsLocked = FALSE, alreadyShutdown = FALSE;
UINT32 activeUploadCount = 0;
PDoubleListNode pCurNode = NULL;
UINT64 data;
PCurlRequest pCurlRequest = NULL;
PCurlApiCallbacks pCurlApiCallbacks = (PCurlApiCallbacks) customData;
CHK(pCurlApiCallbacks != NULL && pCurlApiCallbacks->pCallbacksProvider != NULL, STATUS_INVALID_ARG);
// store the information that this stream is shutting down
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.lockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->shutdownLock);
shutdownLocked = TRUE;
CHK_STATUS(hashTableContains(pCurlApiCallbacks->pStreamsShuttingDown, streamHandle, &alreadyShutdown));
CHK_WARN(!alreadyShutdown, retStatus, "shutdownStreamCurl called when already in progress of shutting down");
CHK_STATUS(hashTablePut(pCurlApiCallbacks->pStreamsShuttingDown, streamHandle, streamHandle));
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->shutdownLock);
shutdownLocked = FALSE;
// Shutdown active requests, terminate curl session if thread is blocked in curl
CHK_STATUS(curlApiCallbacksShutdownActiveRequests(pCurlApiCallbacks, streamHandle, CURL_API_CALLBACKS_SHUTDOWN_TIMEOUT, FALSE, FALSE));
// Shutdown active uploads, terminate curl session if thread is blocked in curl
CHK_STATUS(curlApiCallbacksShutdownActiveUploads(pCurlApiCallbacks, streamHandle, INVALID_UPLOAD_HANDLE_VALUE,
CURL_API_CALLBACKS_SHUTDOWN_TIMEOUT, FALSE, FALSE));
// Clear out the cached endpoints only when not resetting
if (!resetStream) {
CHK_STATUS(curlApiCallbacksShutdownCachedEndpoints(pCurlApiCallbacks, streamHandle, TRUE));
}
// at this point all remaining threads should not be blocked and reach termination shortly. Thus spin wait for it
// to remove itself from the pActiveRequests hashtable and pActiveUploads list
do {
THREAD_SLEEP(CURL_API_DEFAULT_SHUTDOWN_POLLING_INTERVAL);
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.lockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->activeRequestsLock);
requestLocked = TRUE;
CHK_STATUS(hashTableContains(pCurlApiCallbacks->pActiveRequests, streamHandle, &activeRequestExists));
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->activeRequestsLock);
requestLocked = FALSE;
} while (activeRequestExists);
// spin until all active uploads for streamHandle has terminated
do {
THREAD_SLEEP(CURL_API_DEFAULT_SHUTDOWN_POLLING_INTERVAL);
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.lockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->activeUploadsLock);
uploadsLocked = TRUE;
activeUploadCount = 0;
CHK_STATUS(doubleListGetHeadNode(pCurlApiCallbacks->pActiveUploads, &pCurNode));
while (pCurNode != NULL) {
CHK_STATUS(doubleListGetNodeData(pCurNode, &data));
pCurNode = pCurNode->pNext;
pCurlRequest = (PCurlRequest) data;
if (pCurlRequest->streamHandle == streamHandle) {
activeUploadCount++;
}
}
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->activeUploadsLock);
uploadsLocked = FALSE;
} while (activeUploadCount != 0);
// shutdown completed, remove streamHandle from pStreamsShuttingDown.
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.lockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->shutdownLock);
shutdownLocked = TRUE;
CHK_STATUS(hashTableRemove(pCurlApiCallbacks->pStreamsShuttingDown, streamHandle));
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->shutdownLock);
shutdownLocked = FALSE;
CleanUp:
CHK_LOG_ERR(retStatus);
if (shutdownLocked) {
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->shutdownLock);
}
if (requestLocked) {
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->activeRequestsLock);
}
if (uploadsLocked) {
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->activeUploadsLock);
}
LEAVES();
return retStatus;
}