KinesisManagerStatus KinesisStreamManagerInterface::KinesisVideoStreamerSetup()

in kinesis_manager/src/kinesis_stream_manager.cpp [116:167]


KinesisManagerStatus KinesisStreamManagerInterface::KinesisVideoStreamerSetup()
{
  KinesisManagerStatus status = KINESIS_MANAGER_STATUS_ERROR_BASE;
  int video_stream_count = 0;
  parameter_reader_->ReadParam(GetKinesisVideoParameter(kStreamParameters.stream_count),
                             video_stream_count);
  if (0 >= video_stream_count) {
    AWS_LOGSTREAM_WARN(__func__, "Stream count " << video_stream_count << " is invalid. Aborting");
  }
  for (int stream_idx = 0; stream_idx < video_stream_count; stream_idx++) {
    /* Load stream definition */
    PBYTE codec_private_data = nullptr;
    uint32_t codec_private_data_size = 0;
    KinesisManagerStatus get_codec_private_data_result =
      stream_definition_provider_->GetCodecPrivateData(GetStreamParameterPrefix(stream_idx),
                                                       *parameter_reader_, &codec_private_data,
                                                       &codec_private_data_size);
    if (KINESIS_MANAGER_STATUS_FAILED(get_codec_private_data_result)) {
      AWS_LOGSTREAM_ERROR(__func__, "Skipping stream id "
                                      << stream_idx
                                      << " due to failure to load codec private data. Error code: "
                                      << get_codec_private_data_result);
      continue;
    }
    if (KINESIS_MANAGER_STATUS_FAILED(KinesisVideoStreamSetup(stream_idx, codec_private_data,
                                                              codec_private_data_size, nullptr))) {
      SAFE_MEMFREE(codec_private_data);
      continue;
    }
    /* Subscribe to the specified topic */
    StreamSubscriptionDescriptor descriptor;
    if (KINESIS_MANAGER_STATUS_FAILED(
          GenerateStreamSubscriptionDescriptor(stream_idx, descriptor))) {
      FreeStream(descriptor.stream_name);
      SAFE_MEMFREE(codec_private_data);
      continue;
    }
    KinesisManagerStatus subscription_installation_result =
      InitializeStreamSubscription(descriptor);
    if (KINESIS_MANAGER_STATUS_FAILED(subscription_installation_result)) {
      AWS_LOGSTREAM_ERROR(__func__, "Failed to subscribe to '"
                                      << descriptor.topic_name << "' for stream '"
                                      << descriptor.stream_name
                                      << "'. Error code: " << subscription_installation_result);
      FreeStream(descriptor.stream_name);
      SAFE_MEMFREE(codec_private_data);
      continue;
    }
    status = KINESIS_MANAGER_STATUS_SUCCESS;
  }
  return status;
}