KinesisManagerStatus KinesisStreamManager::FetchRekognitionResults()

in kinesis_manager/src/kinesis_stream_manager.cpp [409:449]


KinesisManagerStatus KinesisStreamManager::FetchRekognitionResults(
  const std::string & stream_name, Aws::Vector<Model::Record> * records)
{
  KinesisManagerStatus status = KINESIS_MANAGER_STATUS_SUCCESS;
  if (0 == rekognition_config_.count(stream_name)) {
    AWS_LOG_WARN(__func__, "AWS Rekognition configuration is missing for this stream. Skipping");
    return status;
  }
  if (rekognition_config_.at(stream_name).shard_iterator.empty()) {
    status = UpdateShardIterator(stream_name);
    if (KINESIS_MANAGER_STATUS_FAILED(status)) {
      return status;
    }
  }
  Model::GetRecordsRequest get_records_request;
  get_records_request.SetShardIterator(rekognition_config_.at(stream_name).shard_iterator);
  get_records_request.SetLimit(kDefaultRecordsLimitForRekognitionResults);
  auto get_records_outcome = kinesis_client_->GetRecords(get_records_request);
  if (get_records_outcome.IsSuccess()) {
    rekognition_config_.at(stream_name).shard_iterator =
      get_records_outcome.GetResult().GetNextShardIterator();
    *records = get_records_outcome.GetResult().GetRecords();
  } else {
    if (KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED ==
        get_records_outcome.GetError().GetErrorType()) {
      return KINESIS_MANAGER_STATUS_GET_RECORDS_THROTTLED;
    } else if (KinesisErrors::EXPIRED_ITERATOR == get_records_outcome.GetError().GetErrorType()) {
      rekognition_config_.at(stream_name).shard_iterator.clear();
      AWS_LOG_WARN(
        __func__,
        "GetRecords failed due to expired iterator. A new one will be fetched at the next run.");
    } else {
      AWS_LOGSTREAM_ERROR(
        __func__, "GetRecords failed with code "
                    << static_cast<int>(get_records_outcome.GetError().GetErrorType()) << ": "
                    << get_records_outcome.GetError().GetMessage());
    }
    return KINESIS_MANAGER_STATUS_GET_RECORDS_FAILED;
  }
  return status;
}