in kinesis_manager/src/kinesis_stream_manager.cpp [60:114]
KinesisManagerStatus KinesisStreamManagerInterface::GenerateStreamSubscriptionDescriptor(
int stream_idx, StreamSubscriptionDescriptor & descriptor)
{
KinesisManagerStatus status = KINESIS_MANAGER_STATUS_SUCCESS;
int param_status = AWS_ERR_OK;
param_status |= parameter_reader_->ReadParam(
GetStreamParameterPath(stream_idx, kStreamParameters.topic_name),
descriptor.topic_name);
param_status |= parameter_reader_->ReadParam(
GetStreamParameterPath(stream_idx, kStreamParameters.stream_name),
descriptor.stream_name);
param_status |= parameter_reader_->ReadParam(
GetStreamParameterPath(stream_idx, kStreamParameters.topic_type),
descriptor.input_type);
if (AWS_ERR_OK != param_status) {
AWS_LOGSTREAM_ERROR(__func__, "Missing parameters - can't construct descriptor (topic: "
<< descriptor.topic_name
<< " stream: " << descriptor.stream_name
<< " type: " << descriptor.input_type << ") " << param_status);
return KINESIS_MANAGER_STATUS_INVALID_INPUT;
}
/* Rekognition data stream and topic name - one cannot be provided without the other */
AwsError data_stream_read_result = parameter_reader_->ReadParam(
GetStreamParameterPath(stream_idx, kStreamParameters.rekognition_data_stream),
descriptor.rekognition_data_stream);
AwsError rekognition_topic_read_result = parameter_reader_->ReadParam(
GetStreamParameterPath(stream_idx, kStreamParameters.rekognition_topic_name),
descriptor.rekognition_topic_name);
if (data_stream_read_result != rekognition_topic_read_result ||
(data_stream_read_result != AWS_ERR_OK && data_stream_read_result != AWS_ERR_NOT_FOUND)) {
AWS_LOGSTREAM_ERROR(
__func__, "Invalid input: error reading parameters for AWS Rekognition support (data stream: "
<< descriptor.rekognition_data_stream << " code: " << data_stream_read_result
<< " Rekognition topic: " << descriptor.rekognition_topic_name
<< " code: " << rekognition_topic_read_result << ")");
return KINESIS_MANAGER_STATUS_INVALID_INPUT;
}
uint32_t message_queue_size = kDefaultMessageQueueSize;
int message_queue_size_input;
if (AWS_ERR_OK ==
parameter_reader_->ReadParam(
GetStreamParameterPath(stream_idx, kStreamParameters.message_queue_size),
message_queue_size_input)) {
if (0 > message_queue_size_input) {
AWS_LOGSTREAM_WARN(__func__, descriptor.stream_name << " Message queue size provided ("
<< message_queue_size_input << ")"
<< "is invalid. Using the default of "
<< message_queue_size);
} else {
message_queue_size = static_cast<uint32_t>(message_queue_size_input);
}
}
descriptor.message_queue_size = message_queue_size;
return status;
}