in kinesis_manager/src/kinesis_stream_manager.cpp [116:167]
KinesisManagerStatus KinesisStreamManagerInterface::KinesisVideoStreamerSetup()
{
KinesisManagerStatus status = KINESIS_MANAGER_STATUS_ERROR_BASE;
int video_stream_count = 0;
parameter_reader_->ReadParam(GetKinesisVideoParameter(kStreamParameters.stream_count),
video_stream_count);
if (0 >= video_stream_count) {
AWS_LOGSTREAM_WARN(__func__, "Stream count " << video_stream_count << " is invalid. Aborting");
}
for (int stream_idx = 0; stream_idx < video_stream_count; stream_idx++) {
/* Load stream definition */
PBYTE codec_private_data = nullptr;
uint32_t codec_private_data_size = 0;
KinesisManagerStatus get_codec_private_data_result =
stream_definition_provider_->GetCodecPrivateData(GetStreamParameterPrefix(stream_idx),
*parameter_reader_, &codec_private_data,
&codec_private_data_size);
if (KINESIS_MANAGER_STATUS_FAILED(get_codec_private_data_result)) {
AWS_LOGSTREAM_ERROR(__func__, "Skipping stream id "
<< stream_idx
<< " due to failure to load codec private data. Error code: "
<< get_codec_private_data_result);
continue;
}
if (KINESIS_MANAGER_STATUS_FAILED(KinesisVideoStreamSetup(stream_idx, codec_private_data,
codec_private_data_size, nullptr))) {
SAFE_MEMFREE(codec_private_data);
continue;
}
/* Subscribe to the specified topic */
StreamSubscriptionDescriptor descriptor;
if (KINESIS_MANAGER_STATUS_FAILED(
GenerateStreamSubscriptionDescriptor(stream_idx, descriptor))) {
FreeStream(descriptor.stream_name);
SAFE_MEMFREE(codec_private_data);
continue;
}
KinesisManagerStatus subscription_installation_result =
InitializeStreamSubscription(descriptor);
if (KINESIS_MANAGER_STATUS_FAILED(subscription_installation_result)) {
AWS_LOGSTREAM_ERROR(__func__, "Failed to subscribe to '"
<< descriptor.topic_name << "' for stream '"
<< descriptor.stream_name
<< "'. Error code: " << subscription_installation_result);
FreeStream(descriptor.stream_name);
SAFE_MEMFREE(codec_private_data);
continue;
}
status = KINESIS_MANAGER_STATUS_SUCCESS;
}
return status;
}