gboolean gst_kvs_plugin_handle_plugin_event()

in gst/gst-kvs-plugin/src/GstPlugin.c [793:959]


gboolean gst_kvs_plugin_handle_plugin_event(GstCollectPads* pads, GstCollectData* track_data, GstEvent* event, gpointer user_data)
{
    STATUS retStatus = STATUS_SUCCESS;
    PGstKvsPlugin pGstKvsPlugin = GST_KVS_PLUGIN(user_data);
    PGstKvsPluginTrackData pTrackData = (PGstKvsPluginTrackData) track_data;
    GstCaps* gstcaps = NULL;
    UINT64 trackId = pTrackData->trackId;
    BYTE cpd[GST_PLUGIN_MAX_CPD_SIZE];
    UINT32 cpdSize;
    gchar* gstCpd = NULL;
    gboolean persistent, enableStreaming, connectWeRtc;
    const GstStructure* gstStruct;
    PCHAR pName, pVal;
    UINT32 nalFlags = NAL_ADAPTATION_FLAG_NONE;

    gint samplerate = 0, channels = 0;
    const gchar* mediaType;

    switch (GST_EVENT_TYPE(event)) {
        case GST_EVENT_EOS:
            if (!ATOMIC_LOAD_BOOL(&pGstKvsPlugin->streamStopped)) {
                if (STATUS_FAILED(retStatus = stopKinesisVideoStreamSync(pGstKvsPlugin->kvsContext.streamHandle))) {
                    GST_ERROR_OBJECT(pGstKvsPlugin, "Failed to stop the stream with 0x%08x", retStatus);
                    CHK_STATUS(retStatus);
                }

                ATOMIC_STORE_BOOL(&pGstKvsPlugin->streamStopped, TRUE);
            }

            break;

        case GST_EVENT_CAPS:
            gst_event_parse_caps(event, &gstcaps);
            GstStructure* gststructforcaps = gst_caps_get_structure(gstcaps, 0);
            mediaType = gst_structure_get_name(gststructforcaps);

            if (0 == STRCMP(mediaType, GSTREAMER_MEDIA_TYPE_ALAW) || 0 == STRCMP(mediaType, GSTREAMER_MEDIA_TYPE_MULAW)) {
                KVS_PCM_FORMAT_CODE format = KVS_PCM_FORMAT_CODE_MULAW;

                gst_structure_get_int(gststructforcaps, "rate", &samplerate);
                gst_structure_get_int(gststructforcaps, "channels", &channels);

                if (samplerate == 0 || channels == 0) {
                    GST_ERROR_OBJECT(pGstKvsPlugin, "Missing channels/sample rate on caps");
                    CHK(FALSE, STATUS_INVALID_OPERATION);
                }

                if (0 == STRCMP(mediaType, GSTREAMER_MEDIA_TYPE_ALAW)) {
                    format = KVS_PCM_FORMAT_CODE_ALAW;
                } else {
                    format = KVS_PCM_FORMAT_CODE_MULAW;
                }

                if (STATUS_FAILED(mkvgenGeneratePcmCpd(format, (UINT32) samplerate, (UINT16) channels, (PBYTE) cpd, KVS_PCM_CPD_SIZE_BYTE))) {
                    GST_ERROR_OBJECT(pGstKvsPlugin, "Failed to generate pcm cpd");
                    CHK(FALSE, STATUS_INVALID_OPERATION);
                }

                // Send cpd to kinesis video stream
                CHK_STATUS(kinesisVideoStreamFormatChanged(pGstKvsPlugin->kvsContext.streamHandle, KVS_PCM_CPD_SIZE_BYTE, cpd, trackId));
            } else if (!pGstKvsPlugin->trackCpdReceived[trackId] && gst_structure_has_field(gststructforcaps, "codec_data")) {
                const GValue* gstStreamFormat = gst_structure_get_value(gststructforcaps, "codec_data");
                gstCpd = gst_value_serialize(gstStreamFormat);

                // Convert hex cpd to byte array by getting the size, allocating and converting
                CHK_STATUS(hexDecode(gstCpd, 0, NULL, &cpdSize));
                CHK(cpdSize < GST_PLUGIN_MAX_CPD_SIZE, STATUS_INVALID_ARG_LEN);
                CHK_STATUS(hexDecode(gstCpd, 0, cpd, &cpdSize));

                // Need to detect the CPD format first time only for video
                if (trackId == DEFAULT_VIDEO_TRACK_ID && pGstKvsPlugin->detectedCpdFormat == ELEMENTARY_STREAM_NAL_FORMAT_UNKNOWN) {
                    CHK_STATUS(identifyCpdNalFormat(cpd, cpdSize, &pGstKvsPlugin->detectedCpdFormat));

                    // We should store the CPD as is if it's in Annex-B format and convert from AvCC/HEVC
                    // The stored CPD will be used for WebRTC RTP stream prefixing each I-frame if it's not
                    if (pGstKvsPlugin->detectedCpdFormat == ELEMENTARY_STREAM_NAL_FORMAT_AVCC) {
                        // Convert from AvCC to Annex-B format
                        // NOTE: This will also store the data
                        CHK_STATUS(convertCpdFromAvcToAnnexB(pGstKvsPlugin, cpd, cpdSize));
                    } else if (pGstKvsPlugin->detectedCpdFormat == ELEMENTARY_STREAM_NAL_FORMAT_HEVC) {
                        // Convert from HEVC to Annex-B format
                        // NOTE: This will also store the data
                        CHK_STATUS(convertCpdFromHevcToAnnexB(pGstKvsPlugin, cpd, cpdSize));
                    } else {
                        // Store it for use with WebRTC where we will pre-pend the Annex-B CPD to each I-frame
                        // if the Annex-B format I-frame doesn't have it already pre-pended
                        MEMCPY(pGstKvsPlugin->videoCpd, cpd, cpdSize);
                        pGstKvsPlugin->videoCpdSize = cpdSize;
                    }

                    // Prior to setting the CPD we need to set the flags
                    if (pGstKvsPlugin->gstParams.adaptCpdNals && pGstKvsPlugin->detectedCpdFormat == ELEMENTARY_STREAM_NAL_FORMAT_ANNEX_B) {
                        nalFlags |= NAL_ADAPTATION_ANNEXB_CPD_NALS;
                    }

                    if (pGstKvsPlugin->gstParams.adaptFrameNals && pGstKvsPlugin->detectedCpdFormat == ELEMENTARY_STREAM_NAL_FORMAT_ANNEX_B) {
                        nalFlags |= NAL_ADAPTATION_ANNEXB_NALS;
                    }

                    CHK_STATUS(kinesisVideoStreamSetNalAdaptationFlags(pGstKvsPlugin->kvsContext.streamHandle, nalFlags));
                }

                // Send cpd to kinesis video stream
                CHK_STATUS(kinesisVideoStreamFormatChanged(pGstKvsPlugin->kvsContext.streamHandle, cpdSize, cpd, trackId));

                // Mark as received
                pGstKvsPlugin->trackCpdReceived[trackId] = TRUE;
            }

            gst_event_unref(event);
            event = NULL;

            break;

        case GST_EVENT_CUSTOM_DOWNSTREAM:
            gstStruct = gst_event_get_structure(event);

            if (gst_structure_has_name(gstStruct, KVS_ADD_METADATA_G_STRUCT_NAME) &&
                NULL != (pName = (PCHAR) gst_structure_get_string(gstStruct, KVS_ADD_METADATA_NAME)) &&
                NULL != (pVal = (PCHAR) gst_structure_get_string(gstStruct, KVS_ADD_METADATA_VALUE)) &&
                gst_structure_get_boolean(gstStruct, KVS_ADD_METADATA_PERSISTENT, &persistent)) {
                DLOGD("received " KVS_ADD_METADATA_G_STRUCT_NAME " event");

                CHK_STATUS(putKinesisVideoFragmentMetadata(pGstKvsPlugin->kvsContext.streamHandle, pName, pVal, persistent));

                gst_event_unref(event);
                event = NULL;
            } else if (gst_structure_has_name(gstStruct, KVS_ENABLE_STREAMING_G_STRUCT_NAME) &&
                       gst_structure_get_boolean(gstStruct, KVS_ENABLE_STREAMING_FIELD, &enableStreaming)) {
                DLOGD("received " KVS_ENABLE_STREAMING_G_STRUCT_NAME " event");

                ATOMIC_STORE_BOOL(&pGstKvsPlugin->enableStreaming, enableStreaming);

                gst_event_unref(event);
                event = NULL;
            } else if (gst_structure_has_name(gstStruct, KVS_CONNECT_WEBRTC_G_STRUCT_NAME) &&
                       gst_structure_get_boolean(gstStruct, KVS_CONNECT_WEBRTC_FIELD, &connectWeRtc)) {
                DLOGD("received " KVS_CONNECT_WEBRTC_G_STRUCT_NAME " event");

                ATOMIC_STORE_BOOL(&pGstKvsPlugin->connectWebRtc, connectWeRtc);

                gst_event_unref(event);
                event = NULL;
            }

            break;

        default:
            break;
    }

CleanUp:

    if (event != NULL) {
        gst_collect_pads_event_default(pads, track_data, event, FALSE);
    }

    if (gstCpd != NULL) {
        g_free(gstCpd);
    }

    if (STATUS_FAILED(retStatus)) {
        GST_ELEMENT_ERROR(pGstKvsPlugin, STREAM, FAILED, (NULL), ("Failed to handle event"));
    }

    return STATUS_SUCCEEDED(retStatus);
}