in src/source/Signaling/LwsApiCalls.c [1773:2029]
STATUS receiveLwsMessage(PSignalingClient pSignalingClient, PCHAR pMessage, UINT32 messageLen)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
jsmn_parser parser;
jsmntok_t tokens[MAX_JSON_TOKEN_COUNT];
jsmntok_t* pToken;
UINT32 i, strLen, outLen = MAX_SIGNALING_MESSAGE_LEN;
UINT32 tokenCount;
INT32 j;
PSignalingMessageWrapper pSignalingMessageWrapper = NULL;
TID receivedTid = INVALID_TID_VALUE;
BOOL parsedMessageType = FALSE, parsedStatusResponse = FALSE, jsonInIceServerList = FALSE;
PSignalingMessage pOngoingMessage;
UINT64 ttl;
CHK(pSignalingClient != NULL, STATUS_NULL_ARG);
// If we have a signalingMessage and if there is a correlation id specified then the response should be non-empty
if (pMessage == NULL || messageLen == 0) {
if (BLOCK_ON_CORRELATION_ID) {
// Get empty correlation id message from the ongoing if exists
CHK_STATUS(signalingGetOngoingMessage(pSignalingClient, EMPTY_STRING, EMPTY_STRING, &pOngoingMessage));
if (pOngoingMessage == NULL) {
DLOGW("Received an empty body for a message with no correlation id which has been already removed from the queue. Warning 0x%08x",
STATUS_SIGNALING_RECEIVE_EMPTY_DATA_NOT_SUPPORTED);
} else {
CHK_STATUS(signalingRemoveOngoingMessage(pSignalingClient, EMPTY_STRING));
}
}
// Check if anything needs to be done
CHK_WARN(pMessage != NULL && messageLen != 0, retStatus, "Signaling received an empty message");
}
// Parse the response
jsmn_init(&parser);
tokenCount = jsmn_parse(&parser, pMessage, messageLen, tokens, SIZEOF(tokens) / SIZEOF(jsmntok_t));
CHK(tokenCount > 1, STATUS_INVALID_API_CALL_RETURN_JSON);
CHK(tokens[0].type == JSMN_OBJECT, STATUS_INVALID_API_CALL_RETURN_JSON);
CHK(NULL != (pSignalingMessageWrapper = (PSignalingMessageWrapper) MEMCALLOC(1, SIZEOF(SignalingMessageWrapper))), STATUS_NOT_ENOUGH_MEMORY);
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.version = SIGNALING_MESSAGE_CURRENT_VERSION;
// Loop through the tokens and extract the stream description
for (i = 1; i < tokenCount; i++) {
if (compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "senderClientId")) {
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_SIGNALING_CLIENT_ID_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.peerClientId, pMessage + tokens[i + 1].start, strLen);
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.peerClientId[MAX_SIGNALING_CLIENT_ID_LEN] = '\0';
i++;
} else if (compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "messageType")) {
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_SIGNALING_MESSAGE_TYPE_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
CHK_STATUS(getMessageTypeFromString(pMessage + tokens[i + 1].start, strLen,
&pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.messageType));
parsedMessageType = TRUE;
i++;
} else if (compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "messagePayload")) {
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_SIGNALING_MESSAGE_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
// Base64 decode the message
CHK_STATUS(base64Decode(pMessage + tokens[i + 1].start, strLen,
(PBYTE) (pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.payload), &outLen));
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.payload[MAX_SIGNALING_MESSAGE_LEN] = '\0';
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.payloadLen = outLen;
i++;
} else if (!parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "statusResponse")) {
parsedStatusResponse = TRUE;
i++;
} else if (parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "correlationId")) {
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_CORRELATION_ID_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.correlationId, pMessage + tokens[i + 1].start, strLen);
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.correlationId[MAX_CORRELATION_ID_LEN] = '\0';
i++;
} else if (parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "errorType")) {
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_ERROR_TYPE_STRING_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(pSignalingMessageWrapper->receivedSignalingMessage.errorType, pMessage + tokens[i + 1].start, strLen);
pSignalingMessageWrapper->receivedSignalingMessage.errorType[MAX_ERROR_TYPE_STRING_LEN] = '\0';
i++;
} else if (parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "statusCode")) {
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_STATUS_CODE_STRING_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
// Parse the status code
CHK_STATUS(STRTOUI32(pMessage + tokens[i + 1].start, pMessage + tokens[i + 1].end, 10,
&pSignalingMessageWrapper->receivedSignalingMessage.statusCode));
i++;
} else if (parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "description")) {
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_MESSAGE_DESCRIPTION_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(pSignalingMessageWrapper->receivedSignalingMessage.description, pMessage + tokens[i + 1].start, strLen);
pSignalingMessageWrapper->receivedSignalingMessage.description[MAX_MESSAGE_DESCRIPTION_LEN] = '\0';
i++;
} else if (!jsonInIceServerList &&
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.messageType == SIGNALING_MESSAGE_TYPE_OFFER &&
compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "IceServerList")) {
jsonInIceServerList = TRUE;
CHK(tokens[i + 1].type == JSMN_ARRAY, STATUS_INVALID_API_CALL_RETURN_JSON);
CHK(tokens[i + 1].size <= MAX_ICE_CONFIG_COUNT, STATUS_SIGNALING_MAX_ICE_CONFIG_COUNT);
// Zero the ice configs
MEMSET(&pSignalingClient->iceConfigs, 0x00, MAX_ICE_CONFIG_COUNT * SIZEOF(IceConfigInfo));
pSignalingClient->iceConfigCount = 0;
} else if (jsonInIceServerList) {
pToken = &tokens[i];
if (pToken->type == JSMN_OBJECT) {
pSignalingClient->iceConfigCount++;
} else if (compareJsonString(pMessage, pToken, JSMN_STRING, (PCHAR) "Username")) {
strLen = (UINT32) (pToken[1].end - pToken[1].start);
CHK(strLen <= MAX_ICE_CONFIG_USER_NAME_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].userName, pMessage + pToken[1].start, strLen);
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].userName[MAX_ICE_CONFIG_USER_NAME_LEN] = '\0';
i++;
} else if (compareJsonString(pMessage, pToken, JSMN_STRING, (PCHAR) "Password")) {
strLen = (UINT32) (pToken[1].end - pToken[1].start);
CHK(strLen <= MAX_ICE_CONFIG_CREDENTIAL_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].password, pMessage + pToken[1].start, strLen);
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].userName[MAX_ICE_CONFIG_CREDENTIAL_LEN] = '\0';
i++;
} else if (compareJsonString(pMessage, pToken, JSMN_STRING, (PCHAR) "Ttl")) {
CHK_STATUS(STRTOUI64(pMessage + pToken[1].start, pMessage + pToken[1].end, 10, &ttl));
// NOTE: Ttl value is in seconds
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].ttl = ttl * HUNDREDS_OF_NANOS_IN_A_SECOND;
i++;
} else if (compareJsonString(pMessage, pToken, JSMN_STRING, (PCHAR) "Uris")) {
// Expect an array of elements
CHK(pToken[1].type == JSMN_ARRAY, STATUS_INVALID_API_CALL_RETURN_JSON);
CHK(pToken[1].size <= MAX_ICE_CONFIG_URI_COUNT, STATUS_SIGNALING_MAX_ICE_URI_COUNT);
for (j = 0; j < pToken[1].size; j++) {
strLen = (UINT32) (pToken[j + 2].end - pToken[j + 2].start);
CHK(strLen <= MAX_ICE_CONFIG_URI_LEN, STATUS_SIGNALING_MAX_ICE_URI_LEN);
STRNCPY(pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].uris[j], pMessage + pToken[j + 2].start, strLen);
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].uris[j][MAX_ICE_CONFIG_URI_LEN] = '\0';
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].uriCount++;
}
i += pToken[1].size + 1;
}
}
}
// Message type is a mandatory field.
CHK(parsedMessageType, STATUS_SIGNALING_INVALID_MESSAGE_TYPE);
pSignalingMessageWrapper->pSignalingClient = pSignalingClient;
switch (pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.messageType) {
case SIGNALING_MESSAGE_TYPE_STATUS_RESPONSE:
if (pSignalingMessageWrapper->receivedSignalingMessage.statusCode != SERVICE_CALL_RESULT_OK) {
DLOGW("Failed to deliver message. Correlation ID: %s, Error Type: %s, Error Code: %u, Description: %s",
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.correlationId,
pSignalingMessageWrapper->receivedSignalingMessage.errorType, pSignalingMessageWrapper->receivedSignalingMessage.statusCode,
pSignalingMessageWrapper->receivedSignalingMessage.description);
// Store the response
ATOMIC_STORE(&pSignalingClient->messageResult,
(SIZE_T) getServiceCallResultFromHttpStatus(pSignalingMessageWrapper->receivedSignalingMessage.statusCode));
} else {
// Success
ATOMIC_STORE(&pSignalingClient->messageResult, (SIZE_T) SERVICE_CALL_RESULT_OK);
}
// Notify the awaiting send
CVAR_BROADCAST(pSignalingClient->receiveCvar);
// Delete the message wrapper and exit
SAFE_MEMFREE(pSignalingMessageWrapper);
CHK(FALSE, retStatus);
break;
case SIGNALING_MESSAGE_TYPE_GO_AWAY:
// Move the describe state
CHK_STATUS(terminateConnectionWithStatus(pSignalingClient, SERVICE_CALL_RESULT_SIGNALING_GO_AWAY));
// Delete the message wrapper and exit
SAFE_MEMFREE(pSignalingMessageWrapper);
// Iterate the state machinery
CHK_STATUS(signalingStateMachineIterator(pSignalingClient, SIGNALING_GET_CURRENT_TIME(pSignalingClient) + SIGNALING_CONNECT_STATE_TIMEOUT, SIGNALING_STATE_CONNECTED));
CHK(FALSE, retStatus);
break;
case SIGNALING_MESSAGE_TYPE_RECONNECT_ICE_SERVER:
// Move to get ice config state
CHK_STATUS(terminateConnectionWithStatus(pSignalingClient, SERVICE_CALL_RESULT_SIGNALING_RECONNECT_ICE));
// Delete the message wrapper and exit
SAFE_MEMFREE(pSignalingMessageWrapper);
// Iterate the state machinery
CHK_STATUS(signalingStateMachineIterator(pSignalingClient, SIGNALING_GET_CURRENT_TIME(pSignalingClient) + SIGNALING_CONNECT_STATE_TIMEOUT, SIGNALING_STATE_CONNECTED));
CHK(FALSE, retStatus);
break;
case SIGNALING_MESSAGE_TYPE_OFFER:
CHK(pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.peerClientId[0] != '\0',
STATUS_SIGNALING_NO_PEER_CLIENT_ID_IN_MESSAGE);
// Explicit fall-through !!!
case SIGNALING_MESSAGE_TYPE_ANSWER:
case SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE:
CHK(pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.payloadLen > 0 &&
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.payloadLen <= MAX_SIGNALING_MESSAGE_LEN,
STATUS_SIGNALING_INVALID_PAYLOAD_LEN_IN_MESSAGE);
CHK(pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.payload[0] != '\0', STATUS_SIGNALING_NO_PAYLOAD_IN_MESSAGE);
break;
default:
break;
}
DLOGD("Client received message of type: %s",
getMessageTypeInString(pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.messageType));
// Validate and process the ice config
if (jsonInIceServerList && STATUS_FAILED(validateIceConfiguration(pSignalingClient))) {
DLOGW("Failed to validate the ICE server configuration received with an Offer");
}
// Issue the callback on a separate thread
CHK_STATUS(THREAD_CREATE(&receivedTid, receiveLwsMessageWrapper, (PVOID) pSignalingMessageWrapper));
CHK_STATUS(THREAD_DETACH(receivedTid));
CleanUp:
CHK_LOG_ERR(retStatus);
if (pSignalingClient != NULL && STATUS_FAILED(retStatus)) {
ATOMIC_INCREMENT(&pSignalingClient->diagnostics.numberOfRuntimeErrors);
if (pSignalingClient->signalingClientCallbacks.errorReportFn != NULL) {
retStatus = pSignalingClient->signalingClientCallbacks.errorReportFn(pSignalingClient->signalingClientCallbacks.customData, retStatus,
pMessage, messageLen);
}
// Kill the receive thread on error
if (IS_VALID_TID_VALUE(receivedTid)) {
THREAD_CANCEL(receivedTid);
}
SAFE_MEMFREE(pSignalingMessageWrapper);
}
LEAVES();
return retStatus;
}