in gst/gst-kvs-plugin/src/KvsWebRtc.c [1266:1339]
STATUS sessionServiceHandler(UINT32 timerId, UINT64 currentTime, UINT64 customData)
{
STATUS retStatus = STATUS_SUCCESS;
PGstKvsPlugin pGstKvsPlugin = (PGstKvsPlugin) customData;
PWebRtcStreamingSession pStreamingSession = NULL;
UINT32 i, clientIdHash;
BOOL locked = FALSE, peerConnectionFound = FALSE;
SIGNALING_CLIENT_STATE signalingClientState;
CHK(pGstKvsPlugin != NULL, STATUS_NULL_ARG);
MUTEX_LOCK(pGstKvsPlugin->sessionLock);
locked = TRUE;
// scan and cleanup terminated streaming session
for (i = 0; i < pGstKvsPlugin->streamingSessionCount; ++i) {
if (ATOMIC_LOAD_BOOL(&pGstKvsPlugin->streamingSessionList[i]->terminateFlag)) {
pStreamingSession = pGstKvsPlugin->streamingSessionList[i];
MUTEX_LOCK(pGstKvsPlugin->sessionListReadLock);
// swap with last element and decrement count
pGstKvsPlugin->streamingSessionCount--;
pGstKvsPlugin->streamingSessionList[i] = pGstKvsPlugin->streamingSessionList[pGstKvsPlugin->streamingSessionCount];
// Remove from the hash table
clientIdHash = COMPUTE_CRC32((PBYTE) pStreamingSession->peerId, (UINT32) STRLEN(pStreamingSession->peerId));
CHK_STATUS(hashTableContains(pGstKvsPlugin->pRtcPeerConnectionForRemoteClient, clientIdHash, &peerConnectionFound));
if (peerConnectionFound) {
CHK_STATUS(hashTableRemove(pGstKvsPlugin->pRtcPeerConnectionForRemoteClient, clientIdHash));
}
MUTEX_UNLOCK(pGstKvsPlugin->sessionListReadLock);
CHK_STATUS(freeWebRtcStreamingSession(&pStreamingSession));
}
}
// Check if we need to re-create the signaling client on-the-fly
if (ATOMIC_LOAD_BOOL(&pGstKvsPlugin->recreateSignalingClient) &&
STATUS_SUCCEEDED(freeSignalingClient(&pGstKvsPlugin->kvsContext.signalingHandle)) &&
STATUS_SUCCEEDED(createSignalingClientSync(&pGstKvsPlugin->kvsContext.signalingClientInfo, &pGstKvsPlugin->kvsContext.channelInfo,
&pGstKvsPlugin->kvsContext.signalingClientCallbacks, pGstKvsPlugin->kvsContext.pCredentialProvider,
&pGstKvsPlugin->kvsContext.signalingHandle)) &&
STATUS_SUCCEEDED(signalingClientFetchSync(pGstKvsPlugin->kvsContext.signalingHandle))) {
// Re-set the variable again
ATOMIC_STORE_BOOL(&pGstKvsPlugin->recreateSignalingClient, FALSE);
}
// Check the signaling client state and connect if needed
if (ATOMIC_LOAD_BOOL(&pGstKvsPlugin->connectWebRtc) && IS_VALID_SIGNALING_CLIENT_HANDLE(pGstKvsPlugin->kvsContext.signalingHandle)) {
CHK_STATUS(signalingClientGetCurrentState(pGstKvsPlugin->kvsContext.signalingHandle, &signalingClientState));
if (signalingClientState == SIGNALING_CLIENT_STATE_READY) {
UNUSED_PARAM(signalingClientConnectSync(pGstKvsPlugin->kvsContext.signalingHandle));
}
}
// Check if any lingering pending message queues
CHK_STATUS(removeExpiredMessageQueues(pGstKvsPlugin->pPendingSignalingMessageForRemoteClient));
// periodically wake up and clean up terminated streaming session
MUTEX_UNLOCK(pGstKvsPlugin->sessionLock);
locked = FALSE;
CleanUp:
CHK_LOG_ERR(retStatus);
if (locked) {
MUTEX_UNLOCK(pGstKvsPlugin->sessionLock);
}
return retStatus;
}