in src/source/Signaling/LwsApiCalls.c [288:528]
INT32 lwsWssCallbackRoutine(struct lws* wsi, enum lws_callback_reasons reason, PVOID user, PVOID pDataIn, size_t dataSize)
{
UNUSED_PARAM(user);
STATUS retStatus = STATUS_SUCCESS;
PVOID customData;
INT32 status, size, writeSize, retValue = 0;
PCHAR pCurPtr;
PLwsCallInfo pLwsCallInfo;
PRequestInfo pRequestInfo = NULL;
PSignalingClient pSignalingClient = NULL;
SIZE_T offset, bufferSize;
BOOL connected, locked = FALSE;
DLOGV("WSS callback with reason %d", reason);
// Early check before accessing custom field to see if we are interested in the message
switch (reason) {
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
case LWS_CALLBACK_CLIENT_ESTABLISHED:
case LWS_CALLBACK_CLIENT_CLOSED:
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
case LWS_CALLBACK_CLIENT_RECEIVE:
case LWS_CALLBACK_CLIENT_WRITEABLE:
break;
default:
CHK(FALSE, retStatus);
}
customData = lws_get_opaque_user_data(wsi);
pLwsCallInfo = (PLwsCallInfo) customData;
lws_set_log_level(LLL_NOTICE | LLL_WARN | LLL_ERR, NULL);
CHK(pLwsCallInfo != NULL && pLwsCallInfo->pSignalingClient != NULL && pLwsCallInfo->pSignalingClient->pOngoingCallInfo != NULL &&
pLwsCallInfo->pSignalingClient->pLwsContext != NULL && pLwsCallInfo->pSignalingClient->pOngoingCallInfo->callInfo.pRequestInfo != NULL &&
pLwsCallInfo->protocolIndex == PROTOCOL_INDEX_WSS,
retStatus);
pSignalingClient = pLwsCallInfo->pSignalingClient;
pLwsCallInfo = pSignalingClient->pOngoingCallInfo;
pRequestInfo = pLwsCallInfo->callInfo.pRequestInfo;
// Quick check whether we need to exit
if (ATOMIC_LOAD(&pLwsCallInfo->cancelService)) {
retValue = 1;
ATOMIC_STORE_BOOL(&pRequestInfo->terminating, TRUE);
CHK(FALSE, retStatus);
}
MUTEX_LOCK(pSignalingClient->lwsServiceLock);
locked = TRUE;
switch (reason) {
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
pCurPtr = pDataIn == NULL ? "(None)" : (PCHAR) pDataIn;
DLOGW("Client connection failed. Connection error string: %s", pCurPtr);
STRNCPY(pLwsCallInfo->callInfo.errorBuffer, pCurPtr, CALL_INFO_ERROR_BUFFER_LEN);
// TODO: Attempt to get more meaningful service return code
ATOMIC_STORE_BOOL(&pRequestInfo->terminating, TRUE);
connected = ATOMIC_EXCHANGE_BOOL(&pSignalingClient->connected, FALSE);
CVAR_BROADCAST(pSignalingClient->receiveCvar);
CVAR_BROADCAST(pSignalingClient->sendCvar);
ATOMIC_STORE(&pSignalingClient->messageResult, (SIZE_T) SERVICE_CALL_UNKNOWN);
ATOMIC_STORE(&pSignalingClient->result, (SIZE_T) SERVICE_CALL_UNKNOWN);
if (connected && !ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) {
// Handle re-connection in a reconnect handler thread. Set the terminated indicator before the thread
// creation and the thread itself will reset it. NOTE: Need to check for a failure and reset.
ATOMIC_STORE_BOOL(&pSignalingClient->reconnecterTracker.terminated, FALSE);
retStatus = THREAD_CREATE(&pSignalingClient->reconnecterTracker.threadId, reconnectHandler, (PVOID) pSignalingClient);
if (STATUS_FAILED(retStatus)) {
ATOMIC_STORE_BOOL(&pSignalingClient->reconnecterTracker.terminated, TRUE);
CHK(FALSE, retStatus);
}
CHK_STATUS(THREAD_DETACH(pSignalingClient->reconnecterTracker.threadId));
}
break;
case LWS_CALLBACK_CLIENT_ESTABLISHED:
DLOGD("Connection established");
// Set the call result to succeeded
ATOMIC_STORE(&pSignalingClient->result, (SIZE_T) SERVICE_CALL_RESULT_OK);
ATOMIC_STORE_BOOL(&pSignalingClient->connected, TRUE);
// Store the time when we connect for diagnostics
MUTEX_LOCK(pSignalingClient->diagnosticsLock);
pSignalingClient->diagnostics.connectTime = SIGNALING_GET_CURRENT_TIME(pSignalingClient);
MUTEX_UNLOCK(pSignalingClient->diagnosticsLock);
// Notify the listener thread
CVAR_BROADCAST(pSignalingClient->connectedCvar);
break;
case LWS_CALLBACK_CLIENT_CLOSED:
DLOGD("Client WSS closed");
ATOMIC_STORE_BOOL(&pRequestInfo->terminating, TRUE);
connected = ATOMIC_EXCHANGE_BOOL(&pSignalingClient->connected, FALSE);
CVAR_BROADCAST(pSignalingClient->receiveCvar);
CVAR_BROADCAST(pSignalingClient->sendCvar);
ATOMIC_STORE(&pSignalingClient->messageResult, (SIZE_T) SERVICE_CALL_UNKNOWN);
if (connected && ATOMIC_LOAD(&pSignalingClient->result) != SERVICE_CALL_RESULT_SIGNALING_RECONNECT_ICE &&
!ATOMIC_LOAD_BOOL(&pSignalingClient->shutdown)) {
// Set the result failed
ATOMIC_STORE(&pSignalingClient->result, (SIZE_T) SERVICE_CALL_UNKNOWN);
// Handle re-connection in a reconnect handler thread. Set the terminated indicator before the thread
// creation and the thread itself will reset it. NOTE: Need to check for a failure and reset.
ATOMIC_STORE_BOOL(&pSignalingClient->reconnecterTracker.terminated, FALSE);
retStatus = THREAD_CREATE(&pSignalingClient->reconnecterTracker.threadId, reconnectHandler, (PVOID) pSignalingClient);
if (STATUS_FAILED(retStatus)) {
ATOMIC_STORE_BOOL(&pSignalingClient->reconnecterTracker.terminated, TRUE);
CHK(FALSE, retStatus);
}
CHK_STATUS(THREAD_DETACH(pSignalingClient->reconnecterTracker.threadId));
}
break;
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
status = 0;
pCurPtr = NULL;
size = (UINT32) dataSize;
if (dataSize > SIZEOF(UINT16)) {
// The status should be the first two bytes in network order
status = getInt16(*(PINT16) pDataIn);
// Set the string past the status
pCurPtr = (PCHAR) ((PBYTE) pDataIn + SIZEOF(UINT16));
size -= SIZEOF(UINT16);
}
DLOGD("Peer initiated close with %d (0x%08x). Message: %.*s", status, (UINT32) status, size, pCurPtr);
// Store the state as the result
retValue = -1;
ATOMIC_STORE(&pSignalingClient->result, (SIZE_T) status);
break;
case LWS_CALLBACK_CLIENT_RECEIVE:
// Check if it's a binary data
CHK(!lws_frame_is_binary(wsi), STATUS_SIGNALING_RECEIVE_BINARY_DATA_NOT_SUPPORTED);
// Skip if it's the first and last fragment and the size is 0
CHK(!(lws_is_first_fragment(wsi) && lws_is_final_fragment(wsi) && dataSize == 0), retStatus);
// Check what type of a message it is. We will set the size to 0 on first and flush on last
if (lws_is_first_fragment(wsi)) {
pLwsCallInfo->receiveBufferSize = 0;
}
// Store the data in the buffer
CHK(pLwsCallInfo->receiveBufferSize + (UINT32) dataSize + LWS_PRE <= SIZEOF(pLwsCallInfo->receiveBuffer),
STATUS_SIGNALING_RECEIVED_MESSAGE_LARGER_THAN_MAX_DATA_LEN);
MEMCPY(&pLwsCallInfo->receiveBuffer[LWS_PRE + pLwsCallInfo->receiveBufferSize], pDataIn, dataSize);
pLwsCallInfo->receiveBufferSize += (UINT32) dataSize;
// Flush on last
if (lws_is_final_fragment(wsi)) {
CHK_STATUS(receiveLwsMessage(pLwsCallInfo->pSignalingClient, (PCHAR) &pLwsCallInfo->receiveBuffer[LWS_PRE],
pLwsCallInfo->receiveBufferSize / SIZEOF(CHAR)));
}
lws_callback_on_writable(wsi);
break;
case LWS_CALLBACK_CLIENT_WRITEABLE:
DLOGD("Client is writable");
// Check if we are attempting to terminate the connection
if (!ATOMIC_LOAD_BOOL(&pSignalingClient->connected) && ATOMIC_LOAD(&pSignalingClient->messageResult) == SERVICE_CALL_UNKNOWN) {
retValue = 1;
CHK(FALSE, retStatus);
}
offset = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendOffset);
bufferSize = (UINT32) ATOMIC_LOAD(&pLwsCallInfo->sendBufferSize);
writeSize = (INT32) (bufferSize - offset);
// Check if we need to do anything
CHK(writeSize > 0, retStatus);
// Send data and notify on completion
size = lws_write(wsi, &(pLwsCallInfo->sendBuffer[pLwsCallInfo->sendOffset]), (SIZE_T) writeSize, LWS_WRITE_TEXT);
if (size < 0) {
DLOGW("Write failed. Returned write size is %d", size);
// Quit
retValue = -1;
CHK(FALSE, retStatus);
}
if (size == writeSize) {
// Notify the listener
ATOMIC_STORE(&pLwsCallInfo->sendOffset, 0);
ATOMIC_STORE(&pLwsCallInfo->sendBufferSize, 0);
CVAR_BROADCAST(pLwsCallInfo->pSignalingClient->sendCvar);
} else {
// Partial write
DLOGV("Failed to write out the data entirely. Wrote %d out of %d", size, writeSize);
// Schedule again
lws_callback_on_writable(wsi);
}
break;
default:
break;
}
CleanUp:
if (STATUS_FAILED(retStatus)) {
DLOGW("Failed in LWS handling routine with 0x%08x", retStatus);
if (pRequestInfo != NULL) {
ATOMIC_STORE_BOOL(&pRequestInfo->terminating, TRUE);
}
lws_cancel_service(lws_get_context(wsi));
retValue = -1;
}
if (locked) {
MUTEX_UNLOCK(pSignalingClient->lwsServiceLock);
}
return retValue;
}