void KinesisVideoFrameTransportCallback()

in kinesis_video_streamer/src/subscriber_callbacks.cpp [49:109]


void KinesisVideoFrameTransportCallback(
  KinesisStreamManagerInterface & stream_manager, std::string stream_name,
  const kinesis_video_msgs::KinesisVideoFrame::ConstPtr & frame_msg)
{
  if (!frame_msg->codec_private_data.empty()) {
    KinesisManagerStatus update_codec_data_result =
      stream_manager.ProcessCodecPrivateDataForStream(stream_name, frame_msg->codec_private_data);
    if (KINESIS_MANAGER_STATUS_FAILED(update_codec_data_result)) {
        AWS_LOGSTREAM_WARN(
        __func__, stream_name.c_str()
                    << " failed updating codec data, error code: " << update_codec_data_result
                    << ". Continuing streaming as a best effort, but you might not be able to "
                       "decode and render the stream.");
    } else {
      ROS_DEBUG_THROTTLE(PUTFRAME_LOG_INTERVAL_IN_SECONDS,
                         "%s Updated codec data successfully. Frame index: %du",
                         stream_name.c_str(), frame_msg->index);
    }
  }

  Frame frame;
  frame.trackId = DEFAULT_TRACK_ID;
  frame.size = frame_msg->frame_data.size();
  frame.frameData = reinterpret_cast<PBYTE>((void *)(frame_msg->frame_data.data()));
  frame.duration = frame_msg->duration *
                   HUNDREDS_OF_NANOS_IN_A_MICROSECOND; /* Duration is specified in microseconds, but
                                                          Kinesis expects 100ns units. */
  frame.index = frame_msg->index;
  UINT64 generated_timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
                                 std::chrono::system_clock::now().time_since_epoch())
                                 .count() /
                               DEFAULT_TIME_UNIT_IN_NANOS;
  frame.decodingTs = frame_msg->decoding_ts ? frame_msg->decoding_ts : generated_timestamp;
  frame.presentationTs =
    frame_msg->presentation_ts ? frame_msg->presentation_ts : generated_timestamp;
  frame.flags = (FRAME_FLAGS)frame_msg->flags;

  KinesisManagerStatus status = stream_manager.PutFrame(stream_name, frame);
  if (KINESIS_MANAGER_STATUS_FAILED(status)) {
    AWS_LOGSTREAM_WARN(__func__, stream_name.c_str() << " PutFrame failed. Error code: " << status);
  } else {
    ROS_DEBUG_THROTTLE(PUTFRAME_LOG_INTERVAL_IN_SECONDS, "%s PutFrame succeeded. Frame index: %du",
                       stream_name.c_str(), frame.index);
  }

  if (!frame_msg->metadata.empty()) {
    status = KINESIS_MANAGER_STATUS_SUCCESS;
    for (auto iter = frame_msg->metadata.begin(); iter != frame_msg->metadata.end(); ++iter) {
      status = static_cast<KinesisManagerStatus>(
        status | stream_manager.PutMetadata(stream_name, iter->key, iter->value));
    }
    if (KINESIS_MANAGER_STATUS_FAILED(status)) {
      AWS_LOGSTREAM_WARN(__func__, stream_name.c_str()
                                     << " PutMetadata failed. Error code: " << status);
    } else {
      ROS_DEBUG_THROTTLE(PUTFRAME_LOG_INTERVAL_IN_SECONDS,
                         "%s PutMetadata succeeded. Frame index: %du", stream_name.c_str(),
                         frame.index);
    }
  }
}