in canary/webrtc-c/src/CanarySignaling.cpp [260:462]
STATUS run(Canary::PConfig pConfig)
{
STATUS retStatus = STATUS_SUCCESS;
BOOL initialized = FALSE, channelNameGenerated = FALSE;
TIMER_QUEUE_HANDLE timerQueueHandle = 0;
UINT32 timeoutTimerId;
ChannelInfo masterChannelInfo = {};
ChannelInfo viewerChannelInfo = {};
SignalingClientInfo masterClientInfo = {};
SignalingClientInfo viewerClientInfo = {};
SignalingClientCallbacks masterSignalingClientCallbacks = {};
SignalingClientCallbacks viewerSignalingClientCallbacks = {};
PAwsCredentialProvider pCredentialProvider = NULL;
SIGNALING_CLIENT_HANDLE masterSignalingClientHandle = INVALID_SIGNALING_CLIENT_HANDLE_VALUE;
SIGNALING_CLIENT_HANDLE viewerSignalingClientHandle = INVALID_SIGNALING_CLIENT_HANDLE_VALUE;
CanarySessionInfo canarySessionInfo = {};
MUTEX lock = INVALID_MUTEX_VALUE;
CVAR terminateCv = INVALID_CVAR_VALUE;
CHAR channelName[MAX_CHANNEL_NAME_LEN + 1];
canarySessionInfo.roundtripLock = INVALID_MUTEX_VALUE;
canarySessionInfo.roundtripCv = INVALID_CVAR_VALUE;
// Create the lock and the cvar for iteration
CHK(IS_VALID_MUTEX_VALUE(lock = MUTEX_CREATE(FALSE)), STATUS_NOT_ENOUGH_MEMORY);
CHK(IS_VALID_CVAR_VALUE(terminateCv = CVAR_CREATE()), STATUS_NOT_ENOUGH_MEMORY);
CHK(IS_VALID_MUTEX_VALUE(canarySessionInfo.roundtripLock = MUTEX_CREATE(FALSE)), STATUS_NOT_ENOUGH_MEMORY);
CHK(IS_VALID_CVAR_VALUE(canarySessionInfo.roundtripCv = CVAR_CREATE()), STATUS_NOT_ENOUGH_MEMORY);
canarySessionInfo.exitStatus = STATUS_SUCCESS;
canarySessionInfo.iterationFailCount = 0;
CHK_STATUS(Canary::Cloudwatch::init(pConfig));
CHK_STATUS(initKvsWebRtc());
initialized = TRUE;
SET_LOGGER_LOG_LEVEL(pConfig->logLevel.value);
pConfig->print();
// The timer loop for iteration
CHK_STATUS(timerQueueCreate(&timerQueueHandle));
// We will create a static credential provider. We can replace it with others if needed.
if(pConfig->useIotCredentialProvider.value) {
CHK_STATUS(createLwsIotCredentialProvider((PCHAR) pConfig->iotEndpoint,
(PCHAR) pConfig->iotCoreCert.value.c_str(),
(PCHAR) pConfig->iotCorePrivateKey.value.c_str(),
(PCHAR) pConfig->caCertPath.value.c_str(),
(PCHAR) pConfig->iotCoreRoleAlias.value.c_str(),
(PCHAR) pConfig->channelName.value.c_str(),
&pCredentialProvider));
}
else {
CHK_STATUS(createStaticCredentialProvider((PCHAR) pConfig->accessKey.value.c_str(), 0, (PCHAR) pConfig->secretKey.value.c_str(), 0,
(PCHAR) pConfig->sessionToken.value.c_str(), 0, MAX_UINT64, &pCredentialProvider));
}
// Generate a random channel name if not specified in the config.
// In case we generate the random name we will follow-up with deleting
// it upon exit to prevent the account from ever increasing channel count
if (pConfig->channelName.value.empty()) {
generateChannelName(channelName);
channelNameGenerated = TRUE;
} else {
STRNCPY(channelName, pConfig->channelName.value.c_str(), MAX_CHANNEL_NAME_LEN);
}
// Prepare the channel info structure
masterChannelInfo.version = CHANNEL_INFO_CURRENT_VERSION;
masterChannelInfo.pChannelName = channelName;
masterChannelInfo.pRegion = (PCHAR) pConfig->region.value.c_str();
masterChannelInfo.pKmsKeyId = NULL;
masterChannelInfo.tagCount = 0;
masterChannelInfo.pTags = NULL;
masterChannelInfo.channelType = SIGNALING_CHANNEL_TYPE_SINGLE_MASTER;
masterChannelInfo.channelRoleType = SIGNALING_CHANNEL_ROLE_TYPE_MASTER;
masterChannelInfo.cachingPolicy = SIGNALING_API_CALL_CACHE_TYPE_FILE;
masterChannelInfo.cachingPeriod = SIGNALING_API_CALL_CACHE_TTL_SENTINEL_VALUE;
masterChannelInfo.asyncIceServerConfig = FALSE;
masterChannelInfo.retry = TRUE;
masterChannelInfo.reconnect = TRUE;
masterChannelInfo.pCertPath = (PCHAR) DEFAULT_KVS_CACERT_PATH;
masterChannelInfo.messageTtl = 0; // Default is 60 seconds
masterSignalingClientCallbacks.version = SIGNALING_CLIENT_CALLBACKS_CURRENT_VERSION;
masterSignalingClientCallbacks.errorReportFn = signalingClientError;
masterSignalingClientCallbacks.stateChangeFn = signalingClientStateChanged;
masterSignalingClientCallbacks.messageReceivedFn = signalingMessageReceived;
masterSignalingClientCallbacks.customData = (UINT64) &canarySessionInfo;
masterClientInfo.version = SIGNALING_CLIENT_INFO_CURRENT_VERSION;
masterClientInfo.loggingLevel = pConfig->logLevel.value;
STRCPY(masterClientInfo.clientId, SIGNALING_CANARY_MASTER_CLIENT_ID);
// Create the master signaling client
CHK_STATUS(createSignalingClientSync(&masterClientInfo, &masterChannelInfo, &masterSignalingClientCallbacks, pCredentialProvider,
&masterSignalingClientHandle));
CHK_STATUS(signalingClientFetchSync(masterSignalingClientHandle));
canarySessionInfo.pMasterChannelInfo = &masterChannelInfo;
canarySessionInfo.pMasterClientInfo = &masterClientInfo;
canarySessionInfo.pMasterCallbacks = &masterSignalingClientCallbacks;
canarySessionInfo.masterHandle = masterSignalingClientHandle;
// Prepare the structs and create the viewer signaling client
viewerChannelInfo = masterChannelInfo;
viewerChannelInfo.channelRoleType = SIGNALING_CHANNEL_ROLE_TYPE_VIEWER;
viewerSignalingClientCallbacks = masterSignalingClientCallbacks;
viewerSignalingClientCallbacks.customData = (UINT64) &canarySessionInfo;
viewerClientInfo = masterClientInfo;
STRCPY(viewerClientInfo.clientId, SIGNALING_CANARY_VIEWER_CLIENT_ID);
CHK_STATUS(createSignalingClientSync(&viewerClientInfo, &viewerChannelInfo, &viewerSignalingClientCallbacks, pCredentialProvider,
&viewerSignalingClientHandle));
CHK_STATUS(signalingClientFetchSync(viewerSignalingClientHandle));
canarySessionInfo.pViewerChannelInfo = &viewerChannelInfo;
canarySessionInfo.pViewerClientInfo = &viewerClientInfo;
canarySessionInfo.pViewerCallbacks = &viewerSignalingClientCallbacks;
canarySessionInfo.viewerHandle = viewerSignalingClientHandle;
// Set it to a non-terminated state and iterate
ATOMIC_STORE_BOOL(&gExitCanary, FALSE);
// Connect the signaling clients
CHK_STATUS(signalingClientConnectSync(canarySessionInfo.masterHandle));
CHK_STATUS(signalingClientConnectSync(canarySessionInfo.viewerHandle));
if (pConfig->duration.value != 0) {
CHK_STATUS(timerQueueAddTimer(timerQueueHandle, pConfig->duration.value, TIMER_QUEUE_SINGLE_INVOCATION_PERIOD, terminateCanaryCallback,
(UINT64) &canarySessionInfo, &timeoutTimerId));
}
// Set the duration to iterate
CHK_STATUS(timerQueueAddTimer(timerQueueHandle, SIGNALING_CANARY_START_DELAY, pConfig->iterationDuration.value, sendViewerOfferCallback,
(UINT64) &canarySessionInfo, &timeoutTimerId));
MUTEX_LOCK(lock);
while (!ATOMIC_LOAD_BOOL(&gExitCanary)) {
// Having the cvar waking up often allows for Ctrl+C cancellation to be more responsive
CVAR_WAIT(terminateCv, lock, 1 * HUNDREDS_OF_NANOS_IN_A_SECOND);
}
MUTEX_UNLOCK(lock);
CleanUp:
if (IS_VALID_TIMER_QUEUE_HANDLE(timerQueueHandle)) {
timerQueueFree(&timerQueueHandle);
}
STATUS combinedStatus = STATUS_FAILED(canarySessionInfo.exitStatus) ? canarySessionInfo.exitStatus : retStatus;
DLOGI("Exiting with 0x%08x", combinedStatus);
if (initialized) {
Canary::Cloudwatch::getInstance().monitoring.pushExitStatus(combinedStatus);
}
if (IS_VALID_SIGNALING_CLIENT_HANDLE(masterSignalingClientHandle)) {
freeSignalingClient(&masterSignalingClientHandle);
}
if (IS_VALID_SIGNALING_CLIENT_HANDLE(viewerSignalingClientHandle)) {
// As we are freeing the viewer (the last of the clients),
// we need to check whether we generated the
// channel name and if so delete it
if (channelNameGenerated) {
signalingClientDeleteSync(masterSignalingClientHandle);
}
freeSignalingClient(&viewerSignalingClientHandle);
}
if (pCredentialProvider != NULL) {
if(pConfig->useIotCredentialProvider.value) {
freeIotCredentialProvider(&pCredentialProvider);
}
else {
freeStaticCredentialProvider(&pCredentialProvider);
}
}
if (IS_VALID_MUTEX_VALUE(lock)) {
MUTEX_FREE(lock);
}
if (IS_VALID_CVAR_VALUE(terminateCv)) {
CVAR_FREE(terminateCv);
}
if (IS_VALID_MUTEX_VALUE(canarySessionInfo.roundtripLock)) {
MUTEX_FREE(canarySessionInfo.roundtripLock);
}
if (IS_VALID_CVAR_VALUE(canarySessionInfo.roundtripCv)) {
CVAR_FREE(canarySessionInfo.roundtripCv);
}
deinitKvsWebRtc();
Canary::Cloudwatch::deinit();
return retStatus;
}