in kinesis_manager/src/kinesis_stream_manager.cpp [409:449]
KinesisManagerStatus KinesisStreamManager::FetchRekognitionResults(
const std::string & stream_name, Aws::Vector<Model::Record> * records)
{
KinesisManagerStatus status = KINESIS_MANAGER_STATUS_SUCCESS;
if (0 == rekognition_config_.count(stream_name)) {
AWS_LOG_WARN(__func__, "AWS Rekognition configuration is missing for this stream. Skipping");
return status;
}
if (rekognition_config_.at(stream_name).shard_iterator.empty()) {
status = UpdateShardIterator(stream_name);
if (KINESIS_MANAGER_STATUS_FAILED(status)) {
return status;
}
}
Model::GetRecordsRequest get_records_request;
get_records_request.SetShardIterator(rekognition_config_.at(stream_name).shard_iterator);
get_records_request.SetLimit(kDefaultRecordsLimitForRekognitionResults);
auto get_records_outcome = kinesis_client_->GetRecords(get_records_request);
if (get_records_outcome.IsSuccess()) {
rekognition_config_.at(stream_name).shard_iterator =
get_records_outcome.GetResult().GetNextShardIterator();
*records = get_records_outcome.GetResult().GetRecords();
} else {
if (KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED ==
get_records_outcome.GetError().GetErrorType()) {
return KINESIS_MANAGER_STATUS_GET_RECORDS_THROTTLED;
} else if (KinesisErrors::EXPIRED_ITERATOR == get_records_outcome.GetError().GetErrorType()) {
rekognition_config_.at(stream_name).shard_iterator.clear();
AWS_LOG_WARN(
__func__,
"GetRecords failed due to expired iterator. A new one will be fetched at the next run.");
} else {
AWS_LOGSTREAM_ERROR(
__func__, "GetRecords failed with code "
<< static_cast<int>(get_records_outcome.GetError().GetErrorType()) << ": "
<< get_records_outcome.GetError().GetMessage());
}
return KINESIS_MANAGER_STATUS_GET_RECORDS_FAILED;
}
return status;
}