in broadcast-monitoring/src/appsync_notify/app/appsync_push_notification.py [0:0]
def push_appsync(stream_id, segment_start_date_time_str, segment_duration_sec, segment_s3_key, thumbnail_s3_key,
media_check_status):
"""
Notify AppSync subscribers that a new segment summary is available.
:param stream_id:
:param segment_start_date_time_str:
:param segment_duration_sec:
:param segment_s3_key:
:param thumbnail_s3_key:
:param media_check_status:
:return: none
"""
new_segment_ready_gql = '''
mutation NewSegmentSummaryReady($input: newSegmentSummaryReadyInput!) {
newSegmentSummaryReady(input: $input) {
Stream_ID
Start_DateTime
Duration_Sec
S3_Key
Station_Status
Audio_Status
Sports_Status
Team_Status
Thumbnail_Key
}
}
'''
new_segment_summary_ready_input = {
"Stream_ID": stream_id,
"Start_DateTime": segment_start_date_time_str,
"Duration_Sec": segment_duration_sec,
"S3_Key": segment_s3_key,
}
if thumbnail_s3_key is not None:
new_segment_summary_ready_input['Thumbnail_Key'] = thumbnail_s3_key
for checks in media_check_status:
new_segment_summary_ready_input[checks] = media_check_status[checks]
execute_gql(new_segment_ready_gql, params={"input": new_segment_summary_ready_input})
logger.info('success pushing to AppSync.')