in gst/gst-kvs-plugin/src/KvsWebRtc.c [82:218]
STATUS signalingClientMessageReceivedFn(UINT64 customData, PReceivedSignalingMessage pReceivedSignalingMessage)
{
STATUS retStatus = STATUS_SUCCESS;
PGstKvsPlugin pGstKvsPlugin = (PGstKvsPlugin) customData;
BOOL peerConnectionFound = FALSE;
BOOL locked = TRUE;
UINT32 clientIdHash;
UINT64 hashValue = 0;
PPendingMessageQueue pPendingMessageQueue = NULL;
PWebRtcStreamingSession pStreamingSession = NULL;
PReceivedSignalingMessage pReceivedSignalingMessageCopy = NULL;
CHK(pGstKvsPlugin != NULL, STATUS_NULL_ARG);
MUTEX_LOCK(pGstKvsPlugin->sessionLock);
locked = TRUE;
clientIdHash = COMPUTE_CRC32((PBYTE) pReceivedSignalingMessage->signalingMessage.peerClientId,
(UINT32) STRLEN(pReceivedSignalingMessage->signalingMessage.peerClientId));
CHK_STATUS(hashTableContains(pGstKvsPlugin->pRtcPeerConnectionForRemoteClient, clientIdHash, &peerConnectionFound));
if (peerConnectionFound) {
CHK_STATUS(hashTableGet(pGstKvsPlugin->pRtcPeerConnectionForRemoteClient, clientIdHash, &hashValue));
pStreamingSession = (PWebRtcStreamingSession) hashValue;
}
switch (pReceivedSignalingMessage->signalingMessage.messageType) {
case SIGNALING_MESSAGE_TYPE_OFFER:
// Check if we already have an ongoing master session with the same peer
CHK_ERR(!peerConnectionFound, STATUS_INVALID_OPERATION, "Peer connection %s is in progress",
pReceivedSignalingMessage->signalingMessage.peerClientId);
/*
* Create new streaming session for each offer, then insert the client id and streaming session into
* pRtcPeerConnectionForRemoteClient for subsequent ice candidate messages. Lastly check if there is
* any ice candidate messages queued in pPendingSignalingMessageForRemoteClient. If so then submit
* all of them.
*/
if (pGstKvsPlugin->streamingSessionCount == ARRAY_SIZE(pGstKvsPlugin->streamingSessionList)) {
DLOGW("Max simultaneous streaming session count reached.");
// Need to remove the pending queue if any.
// This is a simple optimization as the session cleanup will
// handle the cleanup of pending message queue after a while
CHK_STATUS(
getPendingMessageQueueForHash(pGstKvsPlugin->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE, &pPendingMessageQueue));
CHK(FALSE, retStatus);
}
CHK_STATUS(
createWebRtcStreamingSession(pGstKvsPlugin, pReceivedSignalingMessage->signalingMessage.peerClientId, TRUE, &pStreamingSession));
pStreamingSession->offerReceiveTime = GETTIME();
MUTEX_LOCK(pGstKvsPlugin->sessionListReadLock);
pGstKvsPlugin->streamingSessionList[pGstKvsPlugin->streamingSessionCount++] = pStreamingSession;
MUTEX_UNLOCK(pGstKvsPlugin->sessionListReadLock);
CHK_STATUS(handleOffer(pGstKvsPlugin, pStreamingSession, &pReceivedSignalingMessage->signalingMessage));
CHK_STATUS(hashTablePut(pGstKvsPlugin->pRtcPeerConnectionForRemoteClient, clientIdHash, (UINT64) pStreamingSession));
// If there are any ice candidate messages in the queue for this client id, submit them now.
CHK_STATUS(
getPendingMessageQueueForHash(pGstKvsPlugin->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE, &pPendingMessageQueue));
if (pPendingMessageQueue != NULL) {
CHK_STATUS(submitPendingIceCandidate(pPendingMessageQueue, pStreamingSession));
// NULL the pointer to avoid it being freed in the cleanup
pPendingMessageQueue = NULL;
}
break;
case SIGNALING_MESSAGE_TYPE_ANSWER:
/*
* for viewer, pStreamingSession should've already been created. insert the client id and
* streaming session into pRtcPeerConnectionForRemoteClient for subsequent ice candidate messages.
* Lastly check if there is any ice candidate messages queued in pPendingSignalingMessageForRemoteClient.
* If so then submit all of them.
*/
pStreamingSession = pGstKvsPlugin->streamingSessionList[0];
CHK_STATUS(handleAnswer(pGstKvsPlugin, pStreamingSession, &pReceivedSignalingMessage->signalingMessage));
CHK_STATUS(hashTablePut(pGstKvsPlugin->pRtcPeerConnectionForRemoteClient, clientIdHash, (UINT64) pStreamingSession));
// If there are any ice candidate messages in the queue for this client id, submit them now.
CHK_STATUS(
getPendingMessageQueueForHash(pGstKvsPlugin->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE, &pPendingMessageQueue));
if (pPendingMessageQueue != NULL) {
CHK_STATUS(submitPendingIceCandidate(pPendingMessageQueue, pStreamingSession));
// NULL the pointer to avoid it being freed in the cleanup
pPendingMessageQueue = NULL;
}
break;
case SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE:
/*
* if peer connection hasn't been created, create an queue to store the ice candidate message. Otherwise
* submit the signaling message into the corresponding streaming session.
*/
if (!peerConnectionFound) {
CHK_STATUS(getPendingMessageQueueForHash(pGstKvsPlugin->pPendingSignalingMessageForRemoteClient, clientIdHash, FALSE,
&pPendingMessageQueue));
if (pPendingMessageQueue == NULL) {
CHK_STATUS(createMessageQueue(clientIdHash, &pPendingMessageQueue));
CHK_STATUS(stackQueueEnqueue(pGstKvsPlugin->pPendingSignalingMessageForRemoteClient, (UINT64) pPendingMessageQueue));
}
pReceivedSignalingMessageCopy = (PReceivedSignalingMessage) MEMCALLOC(1, SIZEOF(ReceivedSignalingMessage));
*pReceivedSignalingMessageCopy = *pReceivedSignalingMessage;
CHK_STATUS(stackQueueEnqueue(pPendingMessageQueue->messageQueue, (UINT64) pReceivedSignalingMessageCopy));
// NULL the pointers to not free any longer
pPendingMessageQueue = NULL;
pReceivedSignalingMessageCopy = NULL;
} else {
CHK_STATUS(handleRemoteCandidate(pStreamingSession, &pReceivedSignalingMessage->signalingMessage));
}
break;
default:
DLOGD("Unhandled signaling message type %u", pReceivedSignalingMessage->signalingMessage.messageType);
break;
}
CleanUp:
SAFE_MEMFREE(pReceivedSignalingMessageCopy);
if (pPendingMessageQueue != NULL) {
freeMessageQueue(pPendingMessageQueue);
}
if (locked) {
MUTEX_UNLOCK(pGstKvsPlugin->sessionLock);
}
CHK_LOG_ERR(retStatus);
return retStatus;
}