in source/src/AppCommon.c [401:505]
static STATUS onSignalingMessageReceived(UINT64 userData, PReceivedSignalingMessage pReceivedSignalingMessage)
{
STATUS retStatus = STATUS_SUCCESS;
PAppConfiguration pAppConfiguration = (PAppConfiguration) userData;
BOOL peerConnectionFound = FALSE, locked = FALSE, startStats = FALSE;
UINT32 clientIdHashKey;
UINT64 hashValue = 0;
PPendingMessageQueue pPendingMsgQ = NULL;
PStreamingSession pStreamingSession = NULL;
CHK(pAppConfiguration != NULL, STATUS_APP_COMMON_NULL_ARG);
MUTEX_LOCK(pAppConfiguration->appConfigurationObjLock);
locked = TRUE;
// find the corresponding streaming session.
clientIdHashKey = COMPUTE_CRC32((PBYTE) pReceivedSignalingMessage->signalingMessage.peerClientId,
(UINT32) STRLEN(pReceivedSignalingMessage->signalingMessage.peerClientId));
CHK_STATUS((appHashTableContains(pAppConfiguration->pRemoteRtcPeerConnections, clientIdHashKey, &peerConnectionFound)));
if (peerConnectionFound) {
CHK_STATUS((appHashTableGet(pAppConfiguration->pRemoteRtcPeerConnections, clientIdHashKey, &hashValue)));
pStreamingSession = (PStreamingSession) hashValue;
}
switch (pReceivedSignalingMessage->signalingMessage.messageType) {
case SIGNALING_MESSAGE_TYPE_OFFER:
// Check if we already have an ongoing master session with the same peer
CHK_ERR(!peerConnectionFound, STATUS_INVALID_OPERATION, "Peer connection %s is in progress",
pReceivedSignalingMessage->signalingMessage.peerClientId);
/*
* Create new streaming session for each offer, then insert the client id and streaming session into
* pRemoteRtcPeerConnections for subsequent ice candidate messages. Lastly check if there is
* any ice candidate messages queued in pRemotePeerPendingSignalingMessages. If so then submit
* all of them.
*/
if (pAppConfiguration->streamingSessionCount == ARRAY_SIZE(pAppConfiguration->streamingSessionList)) {
DLOGW("Max simultaneous streaming session count reached.");
// Need to remove the pending queue if any.
// This is a simple optimization as the session cleanup will
// handle the cleanup of pending message queue after a while
CHK_STATUS((getPendingMsgQByHashVal(pAppConfiguration->pRemotePeerPendingSignalingMessages, clientIdHashKey, TRUE, &pPendingMsgQ)));
CHK(FALSE, retStatus);
}
CHK_STATUS((createStreamingSession(pAppConfiguration, pReceivedSignalingMessage->signalingMessage.peerClientId, &pStreamingSession)));
pStreamingSession->offerReceiveTime = GETTIME();
MUTEX_LOCK(pAppConfiguration->streamingSessionListReadLock);
pAppConfiguration->streamingSessionList[pAppConfiguration->streamingSessionCount++] = pStreamingSession;
MUTEX_UNLOCK(pAppConfiguration->streamingSessionListReadLock);
CHK_STATUS((handleOffer(pAppConfiguration, pStreamingSession, &pReceivedSignalingMessage->signalingMessage)));
CHK_STATUS((appHashTablePut(pAppConfiguration->pRemoteRtcPeerConnections, clientIdHashKey, (UINT64) pStreamingSession)));
// If there are any ice candidate messages in the queue for this client id, submit them now.
CHK_STATUS((getPendingMsgQByHashVal(pAppConfiguration->pRemotePeerPendingSignalingMessages, clientIdHashKey, TRUE, &pPendingMsgQ)));
CHK_STATUS((handlePendingMsgQ(pPendingMsgQ, handleRemoteCandidate, pStreamingSession)));
startStats = pAppConfiguration->iceCandidatePairStatsTimerId == MAX_UINT32;
break;
case SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE:
/*
* if peer connection hasn't been created, create an queue to store the ice candidate message. Otherwise
* submit the signaling message into the corresponding streaming session.
*/
if (!peerConnectionFound) {
CHK_STATUS((getPendingMsgQByHashVal(pAppConfiguration->pRemotePeerPendingSignalingMessages, clientIdHashKey, FALSE, &pPendingMsgQ)));
if (pPendingMsgQ == NULL) {
CHK_STATUS((createPendingMsgQ(pAppConfiguration->pRemotePeerPendingSignalingMessages, clientIdHashKey, &pPendingMsgQ)));
}
CHK_STATUS((pushMsqIntoPendingMsgQ(pPendingMsgQ, pReceivedSignalingMessage)));
} else {
CHK_STATUS((handleRemoteCandidate(pStreamingSession, &pReceivedSignalingMessage->signalingMessage)));
}
break;
default:
DLOGD("Unhandled signaling message type %u", pReceivedSignalingMessage->signalingMessage.messageType);
break;
}
MUTEX_UNLOCK(pAppConfiguration->appConfigurationObjLock);
locked = FALSE;
if (startStats &&
STATUS_FAILED(retStatus = appTimeQueueAdd(pAppConfiguration->timerQueueHandle, APP_STATS_DURATION, APP_STATS_DURATION,
getIceCandidatePairStatsCallback, (UINT64) pAppConfiguration,
&pAppConfiguration->iceCandidatePairStatsTimerId))) {
DLOGW("Failed to add getIceCandidatePairStatsCallback to add to timer queue (code 0x%08x). "
"Cannot pull ice candidate pair metrics periodically",
retStatus);
// Reset the returned status
retStatus = STATUS_SUCCESS;
}
CleanUp:
if (locked) {
MUTEX_UNLOCK(pAppConfiguration->appConfigurationObjLock);
}
CHK_LOG_ERR((retStatus));
return retStatus;
}