static STATUS onSignalingMessageReceived()

in source/src/AppCommon.c [401:505]


static STATUS onSignalingMessageReceived(UINT64 userData, PReceivedSignalingMessage pReceivedSignalingMessage)
{
    STATUS retStatus = STATUS_SUCCESS;
    PAppConfiguration pAppConfiguration = (PAppConfiguration) userData;
    BOOL peerConnectionFound = FALSE, locked = FALSE, startStats = FALSE;
    UINT32 clientIdHashKey;
    UINT64 hashValue = 0;
    PPendingMessageQueue pPendingMsgQ = NULL;
    PStreamingSession pStreamingSession = NULL;

    CHK(pAppConfiguration != NULL, STATUS_APP_COMMON_NULL_ARG);
    MUTEX_LOCK(pAppConfiguration->appConfigurationObjLock);
    locked = TRUE;
    // find the corresponding streaming session.
    clientIdHashKey = COMPUTE_CRC32((PBYTE) pReceivedSignalingMessage->signalingMessage.peerClientId,
                                    (UINT32) STRLEN(pReceivedSignalingMessage->signalingMessage.peerClientId));
    CHK_STATUS((appHashTableContains(pAppConfiguration->pRemoteRtcPeerConnections, clientIdHashKey, &peerConnectionFound)));

    if (peerConnectionFound) {
        CHK_STATUS((appHashTableGet(pAppConfiguration->pRemoteRtcPeerConnections, clientIdHashKey, &hashValue)));
        pStreamingSession = (PStreamingSession) hashValue;
    }

    switch (pReceivedSignalingMessage->signalingMessage.messageType) {
        case SIGNALING_MESSAGE_TYPE_OFFER:
            // Check if we already have an ongoing master session with the same peer
            CHK_ERR(!peerConnectionFound, STATUS_INVALID_OPERATION, "Peer connection %s is in progress",
                    pReceivedSignalingMessage->signalingMessage.peerClientId);
            /*
             * Create new streaming session for each offer, then insert the client id and streaming session into
             * pRemoteRtcPeerConnections for subsequent ice candidate messages. Lastly check if there is
             * any ice candidate messages queued in pRemotePeerPendingSignalingMessages. If so then submit
             * all of them.
             */
            if (pAppConfiguration->streamingSessionCount == ARRAY_SIZE(pAppConfiguration->streamingSessionList)) {
                DLOGW("Max simultaneous streaming session count reached.");

                // Need to remove the pending queue if any.
                // This is a simple optimization as the session cleanup will
                // handle the cleanup of pending message queue after a while
                CHK_STATUS((getPendingMsgQByHashVal(pAppConfiguration->pRemotePeerPendingSignalingMessages, clientIdHashKey, TRUE, &pPendingMsgQ)));
                CHK(FALSE, retStatus);
            }
            CHK_STATUS((createStreamingSession(pAppConfiguration, pReceivedSignalingMessage->signalingMessage.peerClientId, &pStreamingSession)));
            pStreamingSession->offerReceiveTime = GETTIME();
            MUTEX_LOCK(pAppConfiguration->streamingSessionListReadLock);
            pAppConfiguration->streamingSessionList[pAppConfiguration->streamingSessionCount++] = pStreamingSession;
            MUTEX_UNLOCK(pAppConfiguration->streamingSessionListReadLock);

            CHK_STATUS((handleOffer(pAppConfiguration, pStreamingSession, &pReceivedSignalingMessage->signalingMessage)));
            CHK_STATUS((appHashTablePut(pAppConfiguration->pRemoteRtcPeerConnections, clientIdHashKey, (UINT64) pStreamingSession)));

            // If there are any ice candidate messages in the queue for this client id, submit them now.
            CHK_STATUS((getPendingMsgQByHashVal(pAppConfiguration->pRemotePeerPendingSignalingMessages, clientIdHashKey, TRUE, &pPendingMsgQ)));
            CHK_STATUS((handlePendingMsgQ(pPendingMsgQ, handleRemoteCandidate, pStreamingSession)));

            startStats = pAppConfiguration->iceCandidatePairStatsTimerId == MAX_UINT32;
            break;

        case SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE:
            /*
             * if peer connection hasn't been created, create an queue to store the ice candidate message. Otherwise
             * submit the signaling message into the corresponding streaming session.
             */
            if (!peerConnectionFound) {
                CHK_STATUS((getPendingMsgQByHashVal(pAppConfiguration->pRemotePeerPendingSignalingMessages, clientIdHashKey, FALSE, &pPendingMsgQ)));

                if (pPendingMsgQ == NULL) {
                    CHK_STATUS((createPendingMsgQ(pAppConfiguration->pRemotePeerPendingSignalingMessages, clientIdHashKey, &pPendingMsgQ)));
                }
                CHK_STATUS((pushMsqIntoPendingMsgQ(pPendingMsgQ, pReceivedSignalingMessage)));
            } else {
                CHK_STATUS((handleRemoteCandidate(pStreamingSession, &pReceivedSignalingMessage->signalingMessage)));
            }
            break;

        default:
            DLOGD("Unhandled signaling message type %u", pReceivedSignalingMessage->signalingMessage.messageType);
            break;
    }

    MUTEX_UNLOCK(pAppConfiguration->appConfigurationObjLock);
    locked = FALSE;

    if (startStats &&
        STATUS_FAILED(retStatus = appTimeQueueAdd(pAppConfiguration->timerQueueHandle, APP_STATS_DURATION, APP_STATS_DURATION,
                                                  getIceCandidatePairStatsCallback, (UINT64) pAppConfiguration,
                                                  &pAppConfiguration->iceCandidatePairStatsTimerId))) {
        DLOGW("Failed to add getIceCandidatePairStatsCallback to add to timer queue (code 0x%08x). "
              "Cannot pull ice candidate pair metrics periodically",
              retStatus);

        // Reset the returned status
        retStatus = STATUS_SUCCESS;
    }

CleanUp:

    if (locked) {
        MUTEX_UNLOCK(pAppConfiguration->appConfigurationObjLock);
    }

    CHK_LOG_ERR((retStatus));
    return retStatus;
}