in gst/gst-kvs-plugin/src/GstPlugin.c [961:1109]
GstFlowReturn gst_kvs_plugin_handle_buffer(GstCollectPads* pads, GstCollectData* track_data, GstBuffer* buf, gpointer user_data)
{
PGstKvsPlugin pGstKvsPlugin = GST_KVS_PLUGIN(user_data);
GstFlowReturn ret = GST_FLOW_OK;
PGstKvsPluginTrackData pTrackData = (PGstKvsPluginTrackData) track_data;
BOOL isDroppable, delta;
STATUS streamStatus = pGstKvsPlugin->streamStatus;
GstMessage* message;
UINT64 trackId;
FRAME_FLAGS frameFlags = FRAME_FLAG_NONE;
GstMapInfo info;
STATUS status;
Frame frame;
info.data = NULL;
// eos reached
if (buf == NULL && pTrackData == NULL) {
if (!ATOMIC_LOAD_BOOL(&pGstKvsPlugin->streamStopped)) {
if (STATUS_FAILED(status = stopKinesisVideoStreamSync(pGstKvsPlugin->kvsContext.streamHandle))) {
DLOGW("Failed to stop the stream with 0x%08x", status);
}
}
ATOMIC_STORE_BOOL(&pGstKvsPlugin->streamStopped, TRUE);
DLOGD("Sending eos");
// send out eos message to gstreamer bus
message = gst_message_new_eos(GST_OBJECT_CAST(pGstKvsPlugin));
gst_element_post_message(GST_ELEMENT_CAST(pGstKvsPlugin), message);
ret = GST_FLOW_EOS;
goto CleanUp;
}
if (STATUS_FAILED(streamStatus)) {
// in offline case, we cant tell the pipeline to restream the file again in case of network outage.
// therefore error out and let higher level application do the retry.
if (IS_OFFLINE_STREAMING_MODE(pGstKvsPlugin->gstParams.streamingType) || !IS_RETRIABLE_ERROR(streamStatus)) {
// fatal cases
GST_ELEMENT_ERROR(pGstKvsPlugin, STREAM, FAILED, (NULL), ("Stream error occurred. Status: 0x%08x", streamStatus));
ret = GST_FLOW_ERROR;
goto CleanUp;
} else {
// resetStream, note that this will flush out producer buffer
if (STATUS_FAILED(status = kinesisVideoStreamResetStream(pGstKvsPlugin->kvsContext.streamHandle))) {
DLOGW("Failed to reset the stream with 0x%08x", status);
}
// reset state
pGstKvsPlugin->streamStatus = STATUS_SUCCESS;
}
}
isDroppable = GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_CORRUPTED) || GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DECODE_ONLY) ||
(GST_BUFFER_FLAGS(buf) == GST_BUFFER_FLAG_DISCONT) ||
(GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DISCONT) && GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DELTA_UNIT)) ||
// drop if buffer contains header and has invalid timestamp
(GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_HEADER) && (!GST_BUFFER_PTS_IS_VALID(buf) || !GST_BUFFER_DTS_IS_VALID(buf)));
if (isDroppable) {
DLOGD("Dropping frame with flag: %d", GST_BUFFER_FLAGS(buf));
goto CleanUp;
}
// In offline mode, if user specifies a file_start_time, the stream will be configured to use absolute
// timestamp. Therefore in here we add the file_start_time to frame pts to create absolute timestamp.
// If user did not specify file_start_time, file_start_time will be 0 and has no effect.
if (IS_OFFLINE_STREAMING_MODE(pGstKvsPlugin->gstParams.streamingType)) {
buf->dts = 0; // if offline mode, i.e. streaming a file, the dts from gstreamer is undefined.
buf->pts += pGstKvsPlugin->basePts;
} else if (!GST_BUFFER_DTS_IS_VALID(buf)) {
buf->dts = pGstKvsPlugin->lastDts + DEFAULT_FRAME_DURATION_MS * HUNDREDS_OF_NANOS_IN_A_MILLISECOND * DEFAULT_TIME_UNIT_IN_NANOS;
}
pGstKvsPlugin->lastDts = buf->dts;
trackId = pTrackData->trackId;
if (!gst_buffer_map(buf, &info, GST_MAP_READ)) {
goto CleanUp;
}
delta = GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DELTA_UNIT);
switch (pGstKvsPlugin->mediaType) {
case GST_PLUGIN_MEDIA_TYPE_AUDIO_ONLY:
case GST_PLUGIN_MEDIA_TYPE_VIDEO_ONLY:
if (!delta) {
frameFlags = FRAME_FLAG_KEY_FRAME;
}
break;
case GST_PLUGIN_MEDIA_TYPE_AUDIO_VIDEO:
if (!delta && pTrackData->trackType == MKV_TRACK_INFO_TYPE_VIDEO) {
frameFlags = FRAME_FLAG_KEY_FRAME;
}
break;
}
if (!IS_OFFLINE_STREAMING_MODE(pGstKvsPlugin->gstParams.streamingType)) {
if (pGstKvsPlugin->firstPts == GST_CLOCK_TIME_NONE) {
pGstKvsPlugin->firstPts = buf->pts;
}
if (pGstKvsPlugin->producerStartTime == GST_CLOCK_TIME_NONE) {
pGstKvsPlugin->producerStartTime = GETTIME() * DEFAULT_TIME_UNIT_IN_NANOS;
}
buf->pts += pGstKvsPlugin->producerStartTime - pGstKvsPlugin->firstPts;
}
frame.version = FRAME_CURRENT_VERSION;
frame.flags = frameFlags;
frame.index = pGstKvsPlugin->frameCount;
frame.decodingTs = buf->dts / DEFAULT_TIME_UNIT_IN_NANOS;
frame.presentationTs = buf->pts / DEFAULT_TIME_UNIT_IN_NANOS;
frame.trackId = trackId;
frame.size = info.size;
frame.frameData = info.data;
frame.duration = 0;
if (ATOMIC_LOAD_BOOL(&pGstKvsPlugin->enableStreaming)) {
if (STATUS_FAILED(status = putKinesisVideoFrame(pGstKvsPlugin->kvsContext.streamHandle, &frame))) {
DLOGW("Failed to put frame with 0x%08x", status);
}
}
// Need to produce the frame into peer connections
// Check whether the frame is in AvCC/HEVC and set the flag to adapt the
// bits to Annex-B format for RTP
if (STATUS_FAILED(status = putFrameToWebRtcPeers(pGstKvsPlugin, &frame, pGstKvsPlugin->detectedCpdFormat))) {
DLOGW("Failed to put frame to peer connections with 0x%08x", status);
}
pGstKvsPlugin->frameCount++;
CleanUp:
if (info.data != NULL) {
gst_buffer_unmap(buf, &info);
}
if (buf != NULL) {
gst_buffer_unref(buf);
}
return ret;
}