STATUS signalingClientMessageReceivedFn()

in gst/gst-kvs-plugin/src/KvsWebRtc.c [82:218]


STATUS signalingClientMessageReceivedFn(UINT64 customData, PReceivedSignalingMessage pReceivedSignalingMessage)
{
    STATUS retStatus = STATUS_SUCCESS;
    PGstKvsPlugin pGstKvsPlugin = (PGstKvsPlugin) customData;
    BOOL peerConnectionFound = FALSE;
    BOOL locked = TRUE;
    UINT32 clientIdHash;
    UINT64 hashValue = 0;
    PPendingMessageQueue pPendingMessageQueue = NULL;
    PWebRtcStreamingSession pStreamingSession = NULL;
    PReceivedSignalingMessage pReceivedSignalingMessageCopy = NULL;

    CHK(pGstKvsPlugin != NULL, STATUS_NULL_ARG);

    MUTEX_LOCK(pGstKvsPlugin->sessionLock);
    locked = TRUE;

    clientIdHash = COMPUTE_CRC32((PBYTE) pReceivedSignalingMessage->signalingMessage.peerClientId,
                                 (UINT32) STRLEN(pReceivedSignalingMessage->signalingMessage.peerClientId));
    CHK_STATUS(hashTableContains(pGstKvsPlugin->pRtcPeerConnectionForRemoteClient, clientIdHash, &peerConnectionFound));
    if (peerConnectionFound) {
        CHK_STATUS(hashTableGet(pGstKvsPlugin->pRtcPeerConnectionForRemoteClient, clientIdHash, &hashValue));
        pStreamingSession = (PWebRtcStreamingSession) hashValue;
    }

    switch (pReceivedSignalingMessage->signalingMessage.messageType) {
        case SIGNALING_MESSAGE_TYPE_OFFER:
            // Check if we already have an ongoing master session with the same peer
            CHK_ERR(!peerConnectionFound, STATUS_INVALID_OPERATION, "Peer connection %s is in progress",
                    pReceivedSignalingMessage->signalingMessage.peerClientId);

            /*
             * Create new streaming session for each offer, then insert the client id and streaming session into
             * pRtcPeerConnectionForRemoteClient for subsequent ice candidate messages. Lastly check if there is
             * any ice candidate messages queued in pPendingSignalingMessageForRemoteClient. If so then submit
             * all of them.
             */
            if (pGstKvsPlugin->streamingSessionCount == ARRAY_SIZE(pGstKvsPlugin->streamingSessionList)) {
                DLOGW("Max simultaneous streaming session count reached.");

                // Need to remove the pending queue if any.
                // This is a simple optimization as the session cleanup will
                // handle the cleanup of pending message queue after a while
                CHK_STATUS(
                    getPendingMessageQueueForHash(pGstKvsPlugin->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE, &pPendingMessageQueue));

                CHK(FALSE, retStatus);
            }
            CHK_STATUS(
                createWebRtcStreamingSession(pGstKvsPlugin, pReceivedSignalingMessage->signalingMessage.peerClientId, TRUE, &pStreamingSession));
            pStreamingSession->offerReceiveTime = GETTIME();
            MUTEX_LOCK(pGstKvsPlugin->sessionListReadLock);
            pGstKvsPlugin->streamingSessionList[pGstKvsPlugin->streamingSessionCount++] = pStreamingSession;
            MUTEX_UNLOCK(pGstKvsPlugin->sessionListReadLock);

            CHK_STATUS(handleOffer(pGstKvsPlugin, pStreamingSession, &pReceivedSignalingMessage->signalingMessage));
            CHK_STATUS(hashTablePut(pGstKvsPlugin->pRtcPeerConnectionForRemoteClient, clientIdHash, (UINT64) pStreamingSession));

            // If there are any ice candidate messages in the queue for this client id, submit them now.
            CHK_STATUS(
                getPendingMessageQueueForHash(pGstKvsPlugin->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE, &pPendingMessageQueue));
            if (pPendingMessageQueue != NULL) {
                CHK_STATUS(submitPendingIceCandidate(pPendingMessageQueue, pStreamingSession));

                // NULL the pointer to avoid it being freed in the cleanup
                pPendingMessageQueue = NULL;
            }
            break;

        case SIGNALING_MESSAGE_TYPE_ANSWER:
            /*
             * for viewer, pStreamingSession should've already been created. insert the client id and
             * streaming session into pRtcPeerConnectionForRemoteClient for subsequent ice candidate messages.
             * Lastly check if there is any ice candidate messages queued in pPendingSignalingMessageForRemoteClient.
             * If so then submit all of them.
             */
            pStreamingSession = pGstKvsPlugin->streamingSessionList[0];
            CHK_STATUS(handleAnswer(pGstKvsPlugin, pStreamingSession, &pReceivedSignalingMessage->signalingMessage));
            CHK_STATUS(hashTablePut(pGstKvsPlugin->pRtcPeerConnectionForRemoteClient, clientIdHash, (UINT64) pStreamingSession));

            // If there are any ice candidate messages in the queue for this client id, submit them now.
            CHK_STATUS(
                getPendingMessageQueueForHash(pGstKvsPlugin->pPendingSignalingMessageForRemoteClient, clientIdHash, TRUE, &pPendingMessageQueue));
            if (pPendingMessageQueue != NULL) {
                CHK_STATUS(submitPendingIceCandidate(pPendingMessageQueue, pStreamingSession));

                // NULL the pointer to avoid it being freed in the cleanup
                pPendingMessageQueue = NULL;
            }
            break;

        case SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE:
            /*
             * if peer connection hasn't been created, create an queue to store the ice candidate message. Otherwise
             * submit the signaling message into the corresponding streaming session.
             */
            if (!peerConnectionFound) {
                CHK_STATUS(getPendingMessageQueueForHash(pGstKvsPlugin->pPendingSignalingMessageForRemoteClient, clientIdHash, FALSE,
                                                         &pPendingMessageQueue));
                if (pPendingMessageQueue == NULL) {
                    CHK_STATUS(createMessageQueue(clientIdHash, &pPendingMessageQueue));
                    CHK_STATUS(stackQueueEnqueue(pGstKvsPlugin->pPendingSignalingMessageForRemoteClient, (UINT64) pPendingMessageQueue));
                }

                pReceivedSignalingMessageCopy = (PReceivedSignalingMessage) MEMCALLOC(1, SIZEOF(ReceivedSignalingMessage));

                *pReceivedSignalingMessageCopy = *pReceivedSignalingMessage;

                CHK_STATUS(stackQueueEnqueue(pPendingMessageQueue->messageQueue, (UINT64) pReceivedSignalingMessageCopy));

                // NULL the pointers to not free any longer
                pPendingMessageQueue = NULL;
                pReceivedSignalingMessageCopy = NULL;
            } else {
                CHK_STATUS(handleRemoteCandidate(pStreamingSession, &pReceivedSignalingMessage->signalingMessage));
            }
            break;

        default:
            DLOGD("Unhandled signaling message type %u", pReceivedSignalingMessage->signalingMessage.messageType);
            break;
    }

CleanUp:

    SAFE_MEMFREE(pReceivedSignalingMessageCopy);
    if (pPendingMessageQueue != NULL) {
        freeMessageQueue(pPendingMessageQueue);
    }

    if (locked) {
        MUTEX_UNLOCK(pGstKvsPlugin->sessionLock);
    }

    CHK_LOG_ERR(retStatus);
    return retStatus;
}