in kinesis_video_streamer/src/subscriber_callbacks.cpp [49:109]
void KinesisVideoFrameTransportCallback(
KinesisStreamManagerInterface & stream_manager, std::string stream_name,
const kinesis_video_msgs::KinesisVideoFrame::ConstPtr & frame_msg)
{
if (!frame_msg->codec_private_data.empty()) {
KinesisManagerStatus update_codec_data_result =
stream_manager.ProcessCodecPrivateDataForStream(stream_name, frame_msg->codec_private_data);
if (KINESIS_MANAGER_STATUS_FAILED(update_codec_data_result)) {
AWS_LOGSTREAM_WARN(
__func__, stream_name.c_str()
<< " failed updating codec data, error code: " << update_codec_data_result
<< ". Continuing streaming as a best effort, but you might not be able to "
"decode and render the stream.");
} else {
ROS_DEBUG_THROTTLE(PUTFRAME_LOG_INTERVAL_IN_SECONDS,
"%s Updated codec data successfully. Frame index: %du",
stream_name.c_str(), frame_msg->index);
}
}
Frame frame;
frame.trackId = DEFAULT_TRACK_ID;
frame.size = frame_msg->frame_data.size();
frame.frameData = reinterpret_cast<PBYTE>((void *)(frame_msg->frame_data.data()));
frame.duration = frame_msg->duration *
HUNDREDS_OF_NANOS_IN_A_MICROSECOND; /* Duration is specified in microseconds, but
Kinesis expects 100ns units. */
frame.index = frame_msg->index;
UINT64 generated_timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count() /
DEFAULT_TIME_UNIT_IN_NANOS;
frame.decodingTs = frame_msg->decoding_ts ? frame_msg->decoding_ts : generated_timestamp;
frame.presentationTs =
frame_msg->presentation_ts ? frame_msg->presentation_ts : generated_timestamp;
frame.flags = (FRAME_FLAGS)frame_msg->flags;
KinesisManagerStatus status = stream_manager.PutFrame(stream_name, frame);
if (KINESIS_MANAGER_STATUS_FAILED(status)) {
AWS_LOGSTREAM_WARN(__func__, stream_name.c_str() << " PutFrame failed. Error code: " << status);
} else {
ROS_DEBUG_THROTTLE(PUTFRAME_LOG_INTERVAL_IN_SECONDS, "%s PutFrame succeeded. Frame index: %du",
stream_name.c_str(), frame.index);
}
if (!frame_msg->metadata.empty()) {
status = KINESIS_MANAGER_STATUS_SUCCESS;
for (auto iter = frame_msg->metadata.begin(); iter != frame_msg->metadata.end(); ++iter) {
status = static_cast<KinesisManagerStatus>(
status | stream_manager.PutMetadata(stream_name, iter->key, iter->value));
}
if (KINESIS_MANAGER_STATUS_FAILED(status)) {
AWS_LOGSTREAM_WARN(__func__, stream_name.c_str()
<< " PutMetadata failed. Error code: " << status);
} else {
ROS_DEBUG_THROTTLE(PUTFRAME_LOG_INTERVAL_IN_SECONDS,
"%s PutMetadata succeeded. Frame index: %du", stream_name.c_str(),
frame.index);
}
}
}