GstFlowReturn gst_kvs_plugin_handle_buffer()

in gst/gst-kvs-plugin/src/GstPlugin.c [961:1109]


GstFlowReturn gst_kvs_plugin_handle_buffer(GstCollectPads* pads, GstCollectData* track_data, GstBuffer* buf, gpointer user_data)
{
    PGstKvsPlugin pGstKvsPlugin = GST_KVS_PLUGIN(user_data);
    GstFlowReturn ret = GST_FLOW_OK;
    PGstKvsPluginTrackData pTrackData = (PGstKvsPluginTrackData) track_data;

    BOOL isDroppable, delta;
    STATUS streamStatus = pGstKvsPlugin->streamStatus;
    GstMessage* message;
    UINT64 trackId;
    FRAME_FLAGS frameFlags = FRAME_FLAG_NONE;
    GstMapInfo info;
    STATUS status;
    Frame frame;

    info.data = NULL;

    // eos reached
    if (buf == NULL && pTrackData == NULL) {
        if (!ATOMIC_LOAD_BOOL(&pGstKvsPlugin->streamStopped)) {
            if (STATUS_FAILED(status = stopKinesisVideoStreamSync(pGstKvsPlugin->kvsContext.streamHandle))) {
                DLOGW("Failed to stop the stream with 0x%08x", status);
            }
        }

        ATOMIC_STORE_BOOL(&pGstKvsPlugin->streamStopped, TRUE);

        DLOGD("Sending eos");

        // send out eos message to gstreamer bus
        message = gst_message_new_eos(GST_OBJECT_CAST(pGstKvsPlugin));
        gst_element_post_message(GST_ELEMENT_CAST(pGstKvsPlugin), message);

        ret = GST_FLOW_EOS;
        goto CleanUp;
    }

    if (STATUS_FAILED(streamStatus)) {
        // in offline case, we cant tell the pipeline to restream the file again in case of network outage.
        // therefore error out and let higher level application do the retry.
        if (IS_OFFLINE_STREAMING_MODE(pGstKvsPlugin->gstParams.streamingType) || !IS_RETRIABLE_ERROR(streamStatus)) {
            // fatal cases
            GST_ELEMENT_ERROR(pGstKvsPlugin, STREAM, FAILED, (NULL), ("Stream error occurred. Status: 0x%08x", streamStatus));
            ret = GST_FLOW_ERROR;
            goto CleanUp;
        } else {
            // resetStream, note that this will flush out producer buffer
            if (STATUS_FAILED(status = kinesisVideoStreamResetStream(pGstKvsPlugin->kvsContext.streamHandle))) {
                DLOGW("Failed to reset the stream with 0x%08x", status);
            }

            // reset state
            pGstKvsPlugin->streamStatus = STATUS_SUCCESS;
        }
    }

    isDroppable = GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_CORRUPTED) || GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DECODE_ONLY) ||
        (GST_BUFFER_FLAGS(buf) == GST_BUFFER_FLAG_DISCONT) ||
        (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DISCONT) && GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DELTA_UNIT)) ||
        // drop if buffer contains header and has invalid timestamp
        (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_HEADER) && (!GST_BUFFER_PTS_IS_VALID(buf) || !GST_BUFFER_DTS_IS_VALID(buf)));

    if (isDroppable) {
        DLOGD("Dropping frame with flag: %d", GST_BUFFER_FLAGS(buf));
        goto CleanUp;
    }

    // In offline mode, if user specifies a file_start_time, the stream will be configured to use absolute
    // timestamp. Therefore in here we add the file_start_time to frame pts to create absolute timestamp.
    // If user did not specify file_start_time, file_start_time will be 0 and has no effect.
    if (IS_OFFLINE_STREAMING_MODE(pGstKvsPlugin->gstParams.streamingType)) {
        buf->dts = 0; // if offline mode, i.e. streaming a file, the dts from gstreamer is undefined.
        buf->pts += pGstKvsPlugin->basePts;
    } else if (!GST_BUFFER_DTS_IS_VALID(buf)) {
        buf->dts = pGstKvsPlugin->lastDts + DEFAULT_FRAME_DURATION_MS * HUNDREDS_OF_NANOS_IN_A_MILLISECOND * DEFAULT_TIME_UNIT_IN_NANOS;
    }

    pGstKvsPlugin->lastDts = buf->dts;
    trackId = pTrackData->trackId;

    if (!gst_buffer_map(buf, &info, GST_MAP_READ)) {
        goto CleanUp;
    }

    delta = GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DELTA_UNIT);

    switch (pGstKvsPlugin->mediaType) {
        case GST_PLUGIN_MEDIA_TYPE_AUDIO_ONLY:
        case GST_PLUGIN_MEDIA_TYPE_VIDEO_ONLY:
            if (!delta) {
                frameFlags = FRAME_FLAG_KEY_FRAME;
            }
            break;
        case GST_PLUGIN_MEDIA_TYPE_AUDIO_VIDEO:
            if (!delta && pTrackData->trackType == MKV_TRACK_INFO_TYPE_VIDEO) {
                frameFlags = FRAME_FLAG_KEY_FRAME;
            }
            break;
    }

    if (!IS_OFFLINE_STREAMING_MODE(pGstKvsPlugin->gstParams.streamingType)) {
        if (pGstKvsPlugin->firstPts == GST_CLOCK_TIME_NONE) {
            pGstKvsPlugin->firstPts = buf->pts;
        }

        if (pGstKvsPlugin->producerStartTime == GST_CLOCK_TIME_NONE) {
            pGstKvsPlugin->producerStartTime = GETTIME() * DEFAULT_TIME_UNIT_IN_NANOS;
        }

        buf->pts += pGstKvsPlugin->producerStartTime - pGstKvsPlugin->firstPts;
    }

    frame.version = FRAME_CURRENT_VERSION;
    frame.flags = frameFlags;
    frame.index = pGstKvsPlugin->frameCount;
    frame.decodingTs = buf->dts / DEFAULT_TIME_UNIT_IN_NANOS;
    frame.presentationTs = buf->pts / DEFAULT_TIME_UNIT_IN_NANOS;
    frame.trackId = trackId;
    frame.size = info.size;
    frame.frameData = info.data;
    frame.duration = 0;

    if (ATOMIC_LOAD_BOOL(&pGstKvsPlugin->enableStreaming)) {
        if (STATUS_FAILED(status = putKinesisVideoFrame(pGstKvsPlugin->kvsContext.streamHandle, &frame))) {
            DLOGW("Failed to put frame with 0x%08x", status);
        }
    }

    // Need to produce the frame into peer connections
    // Check whether the frame is in AvCC/HEVC and set the flag to adapt the
    // bits to Annex-B format for RTP
    if (STATUS_FAILED(status = putFrameToWebRtcPeers(pGstKvsPlugin, &frame, pGstKvsPlugin->detectedCpdFormat))) {
        DLOGW("Failed to put frame to peer connections with 0x%08x", status);
    }

    pGstKvsPlugin->frameCount++;

CleanUp:

    if (info.data != NULL) {
        gst_buffer_unmap(buf, &info);
    }

    if (buf != NULL) {
        gst_buffer_unref(buf);
    }

    return ret;
}