KinesisManagerStatus KinesisStreamManagerInterface::GenerateStreamSubscriptionDescriptor()

in kinesis_manager/src/kinesis_stream_manager.cpp [60:114]


KinesisManagerStatus KinesisStreamManagerInterface::GenerateStreamSubscriptionDescriptor(
  int stream_idx, StreamSubscriptionDescriptor & descriptor)
{
  KinesisManagerStatus status = KINESIS_MANAGER_STATUS_SUCCESS;
  int param_status = AWS_ERR_OK;
  param_status |= parameter_reader_->ReadParam(
    GetStreamParameterPath(stream_idx, kStreamParameters.topic_name),
    descriptor.topic_name);
  param_status |= parameter_reader_->ReadParam(
    GetStreamParameterPath(stream_idx, kStreamParameters.stream_name),
    descriptor.stream_name);
  param_status |= parameter_reader_->ReadParam(
    GetStreamParameterPath(stream_idx, kStreamParameters.topic_type),
    descriptor.input_type);
  if (AWS_ERR_OK != param_status) {
    AWS_LOGSTREAM_ERROR(__func__, "Missing parameters - can't construct descriptor (topic: "
                                    << descriptor.topic_name
                                    << " stream: " << descriptor.stream_name
                                    << " type: " << descriptor.input_type << ") " << param_status);
    return KINESIS_MANAGER_STATUS_INVALID_INPUT;
  }
  /* Rekognition data stream and topic name - one cannot be provided without the other */
  AwsError data_stream_read_result = parameter_reader_->ReadParam(
    GetStreamParameterPath(stream_idx, kStreamParameters.rekognition_data_stream),
    descriptor.rekognition_data_stream);
  AwsError rekognition_topic_read_result = parameter_reader_->ReadParam(
    GetStreamParameterPath(stream_idx, kStreamParameters.rekognition_topic_name),
    descriptor.rekognition_topic_name);
  if (data_stream_read_result != rekognition_topic_read_result ||
      (data_stream_read_result != AWS_ERR_OK && data_stream_read_result != AWS_ERR_NOT_FOUND)) {
    AWS_LOGSTREAM_ERROR(
      __func__, "Invalid input: error reading parameters for AWS Rekognition support (data stream: "
                  << descriptor.rekognition_data_stream << " code: " << data_stream_read_result
                  << " Rekognition topic: " << descriptor.rekognition_topic_name
                  << " code: " << rekognition_topic_read_result << ")");
    return KINESIS_MANAGER_STATUS_INVALID_INPUT;
  }
  uint32_t message_queue_size = kDefaultMessageQueueSize;
  int message_queue_size_input;
  if (AWS_ERR_OK ==
      parameter_reader_->ReadParam(
        GetStreamParameterPath(stream_idx, kStreamParameters.message_queue_size),
        message_queue_size_input)) {
    if (0 > message_queue_size_input) {
      AWS_LOGSTREAM_WARN(__func__, descriptor.stream_name << " Message queue size provided ("
                                                          << message_queue_size_input << ")"
                                                          << "is invalid. Using the default of "
                                                          << message_queue_size);
    } else {
      message_queue_size = static_cast<uint32_t>(message_queue_size_input);
    }
  }
  descriptor.message_queue_size = message_queue_size;
  return status;
}