in gst/gst-kvs-plugin/src/GstPlugin.c [793:959]
gboolean gst_kvs_plugin_handle_plugin_event(GstCollectPads* pads, GstCollectData* track_data, GstEvent* event, gpointer user_data)
{
STATUS retStatus = STATUS_SUCCESS;
PGstKvsPlugin pGstKvsPlugin = GST_KVS_PLUGIN(user_data);
PGstKvsPluginTrackData pTrackData = (PGstKvsPluginTrackData) track_data;
GstCaps* gstcaps = NULL;
UINT64 trackId = pTrackData->trackId;
BYTE cpd[GST_PLUGIN_MAX_CPD_SIZE];
UINT32 cpdSize;
gchar* gstCpd = NULL;
gboolean persistent, enableStreaming, connectWeRtc;
const GstStructure* gstStruct;
PCHAR pName, pVal;
UINT32 nalFlags = NAL_ADAPTATION_FLAG_NONE;
gint samplerate = 0, channels = 0;
const gchar* mediaType;
switch (GST_EVENT_TYPE(event)) {
case GST_EVENT_EOS:
if (!ATOMIC_LOAD_BOOL(&pGstKvsPlugin->streamStopped)) {
if (STATUS_FAILED(retStatus = stopKinesisVideoStreamSync(pGstKvsPlugin->kvsContext.streamHandle))) {
GST_ERROR_OBJECT(pGstKvsPlugin, "Failed to stop the stream with 0x%08x", retStatus);
CHK_STATUS(retStatus);
}
ATOMIC_STORE_BOOL(&pGstKvsPlugin->streamStopped, TRUE);
}
break;
case GST_EVENT_CAPS:
gst_event_parse_caps(event, &gstcaps);
GstStructure* gststructforcaps = gst_caps_get_structure(gstcaps, 0);
mediaType = gst_structure_get_name(gststructforcaps);
if (0 == STRCMP(mediaType, GSTREAMER_MEDIA_TYPE_ALAW) || 0 == STRCMP(mediaType, GSTREAMER_MEDIA_TYPE_MULAW)) {
KVS_PCM_FORMAT_CODE format = KVS_PCM_FORMAT_CODE_MULAW;
gst_structure_get_int(gststructforcaps, "rate", &samplerate);
gst_structure_get_int(gststructforcaps, "channels", &channels);
if (samplerate == 0 || channels == 0) {
GST_ERROR_OBJECT(pGstKvsPlugin, "Missing channels/sample rate on caps");
CHK(FALSE, STATUS_INVALID_OPERATION);
}
if (0 == STRCMP(mediaType, GSTREAMER_MEDIA_TYPE_ALAW)) {
format = KVS_PCM_FORMAT_CODE_ALAW;
} else {
format = KVS_PCM_FORMAT_CODE_MULAW;
}
if (STATUS_FAILED(mkvgenGeneratePcmCpd(format, (UINT32) samplerate, (UINT16) channels, (PBYTE) cpd, KVS_PCM_CPD_SIZE_BYTE))) {
GST_ERROR_OBJECT(pGstKvsPlugin, "Failed to generate pcm cpd");
CHK(FALSE, STATUS_INVALID_OPERATION);
}
// Send cpd to kinesis video stream
CHK_STATUS(kinesisVideoStreamFormatChanged(pGstKvsPlugin->kvsContext.streamHandle, KVS_PCM_CPD_SIZE_BYTE, cpd, trackId));
} else if (!pGstKvsPlugin->trackCpdReceived[trackId] && gst_structure_has_field(gststructforcaps, "codec_data")) {
const GValue* gstStreamFormat = gst_structure_get_value(gststructforcaps, "codec_data");
gstCpd = gst_value_serialize(gstStreamFormat);
// Convert hex cpd to byte array by getting the size, allocating and converting
CHK_STATUS(hexDecode(gstCpd, 0, NULL, &cpdSize));
CHK(cpdSize < GST_PLUGIN_MAX_CPD_SIZE, STATUS_INVALID_ARG_LEN);
CHK_STATUS(hexDecode(gstCpd, 0, cpd, &cpdSize));
// Need to detect the CPD format first time only for video
if (trackId == DEFAULT_VIDEO_TRACK_ID && pGstKvsPlugin->detectedCpdFormat == ELEMENTARY_STREAM_NAL_FORMAT_UNKNOWN) {
CHK_STATUS(identifyCpdNalFormat(cpd, cpdSize, &pGstKvsPlugin->detectedCpdFormat));
// We should store the CPD as is if it's in Annex-B format and convert from AvCC/HEVC
// The stored CPD will be used for WebRTC RTP stream prefixing each I-frame if it's not
if (pGstKvsPlugin->detectedCpdFormat == ELEMENTARY_STREAM_NAL_FORMAT_AVCC) {
// Convert from AvCC to Annex-B format
// NOTE: This will also store the data
CHK_STATUS(convertCpdFromAvcToAnnexB(pGstKvsPlugin, cpd, cpdSize));
} else if (pGstKvsPlugin->detectedCpdFormat == ELEMENTARY_STREAM_NAL_FORMAT_HEVC) {
// Convert from HEVC to Annex-B format
// NOTE: This will also store the data
CHK_STATUS(convertCpdFromHevcToAnnexB(pGstKvsPlugin, cpd, cpdSize));
} else {
// Store it for use with WebRTC where we will pre-pend the Annex-B CPD to each I-frame
// if the Annex-B format I-frame doesn't have it already pre-pended
MEMCPY(pGstKvsPlugin->videoCpd, cpd, cpdSize);
pGstKvsPlugin->videoCpdSize = cpdSize;
}
// Prior to setting the CPD we need to set the flags
if (pGstKvsPlugin->gstParams.adaptCpdNals && pGstKvsPlugin->detectedCpdFormat == ELEMENTARY_STREAM_NAL_FORMAT_ANNEX_B) {
nalFlags |= NAL_ADAPTATION_ANNEXB_CPD_NALS;
}
if (pGstKvsPlugin->gstParams.adaptFrameNals && pGstKvsPlugin->detectedCpdFormat == ELEMENTARY_STREAM_NAL_FORMAT_ANNEX_B) {
nalFlags |= NAL_ADAPTATION_ANNEXB_NALS;
}
CHK_STATUS(kinesisVideoStreamSetNalAdaptationFlags(pGstKvsPlugin->kvsContext.streamHandle, nalFlags));
}
// Send cpd to kinesis video stream
CHK_STATUS(kinesisVideoStreamFormatChanged(pGstKvsPlugin->kvsContext.streamHandle, cpdSize, cpd, trackId));
// Mark as received
pGstKvsPlugin->trackCpdReceived[trackId] = TRUE;
}
gst_event_unref(event);
event = NULL;
break;
case GST_EVENT_CUSTOM_DOWNSTREAM:
gstStruct = gst_event_get_structure(event);
if (gst_structure_has_name(gstStruct, KVS_ADD_METADATA_G_STRUCT_NAME) &&
NULL != (pName = (PCHAR) gst_structure_get_string(gstStruct, KVS_ADD_METADATA_NAME)) &&
NULL != (pVal = (PCHAR) gst_structure_get_string(gstStruct, KVS_ADD_METADATA_VALUE)) &&
gst_structure_get_boolean(gstStruct, KVS_ADD_METADATA_PERSISTENT, &persistent)) {
DLOGD("received " KVS_ADD_METADATA_G_STRUCT_NAME " event");
CHK_STATUS(putKinesisVideoFragmentMetadata(pGstKvsPlugin->kvsContext.streamHandle, pName, pVal, persistent));
gst_event_unref(event);
event = NULL;
} else if (gst_structure_has_name(gstStruct, KVS_ENABLE_STREAMING_G_STRUCT_NAME) &&
gst_structure_get_boolean(gstStruct, KVS_ENABLE_STREAMING_FIELD, &enableStreaming)) {
DLOGD("received " KVS_ENABLE_STREAMING_G_STRUCT_NAME " event");
ATOMIC_STORE_BOOL(&pGstKvsPlugin->enableStreaming, enableStreaming);
gst_event_unref(event);
event = NULL;
} else if (gst_structure_has_name(gstStruct, KVS_CONNECT_WEBRTC_G_STRUCT_NAME) &&
gst_structure_get_boolean(gstStruct, KVS_CONNECT_WEBRTC_FIELD, &connectWeRtc)) {
DLOGD("received " KVS_CONNECT_WEBRTC_G_STRUCT_NAME " event");
ATOMIC_STORE_BOOL(&pGstKvsPlugin->connectWebRtc, connectWeRtc);
gst_event_unref(event);
event = NULL;
}
break;
default:
break;
}
CleanUp:
if (event != NULL) {
gst_collect_pads_event_default(pads, track_data, event, FALSE);
}
if (gstCpd != NULL) {
g_free(gstCpd);
}
if (STATUS_FAILED(retStatus)) {
GST_ELEMENT_ERROR(pGstKvsPlugin, STREAM, FAILED, (NULL), ("Failed to handle event"));
}
return STATUS_SUCCEEDED(retStatus);
}