in source/controlplaneapi/infrastructure/lambda/EventClipGenerator/mre-event-clip-generator.py [0:0]
def GenerateClips(event, context):
from MediaReplayEnginePluginHelper import DataPlane
dataplane = DataPlane(event)
# Contains Job IDs for all the HLS Jobs. We will need to
# check if all Jobs have completed before creating the Aggregated
# m3u8 file
all_hls_clip_job_ids = []
# If Segments key not present (In case of Batch Processing), call API to get Segments.
# For now, we will process every Segment as they are created in Near Real time
#input_segments = event['Segments']
optimized_segments = []
nonoptimized_segments = []
hls_input_settings_for_segments = []
audioTrack = 1 if 'TrackNumber' not in event else event['TrackNumber']
input_segments = event
for segment in input_segments['Segments']:
# Track Segments which have Optimized Start and End Times
#if "OptoEnd" in segment and "OptoStart" in segment:
# if len(segment['OptoEnd']) > 0 and len(segment['OptoStart']) > 0 :
#Ignore Starts that have -1 in it
if "OptoEnd" in segment and "OptoStart" in segment:
if get_OptoStart(segment, event) != -1:
optimized_segments.append(segment)
# Original Clips should be generated when asked for.
if event['GenerateOriginal']:
if "End" in segment and "Start" in segment:
#Ignore Starts that have -1 in it
if segment['Start'] != -1:
nonoptimized_segments.append(segment)
# Create a Optimized Clip for Optimized Segments
for optsegment in optimized_segments:
print("--- OPTO SEGMENT PROCESSED-------------")
print(segment['Start'])
chunks = dataplane.get_chunks_for_segment(get_OptoStart(segment, event), get_OptoEnd(segment, event))
print('Got Chunks from API based for Optimized Segments)')
print(f" optimized_segments chunks: {chunks}")
keyprefix, hls_input_settings = create_optimized_MP4_clips(optsegment, event, chunks)
# Only if we are dealing with Optimization, we will consider HLS Inputs for Opto Segments.
if 'Optimizer' in event['Profile']:
hls_input_settings_for_segments.extend(hls_input_settings)
# A NonOpt Clip needs to be created for every segment
nonoptimized_segments_with_tracks = []
for segment in nonoptimized_segments:
print("--- NON OPTO SEGMENT PROCESSED-------------")
print(segment['Start'])
chunks = dataplane.get_chunks_for_segment(segment['Start'], segment['End'])
print('Got Chunks from API based for NonOptimized Segments)')
print(f" nonoptimized_segments chunks: {chunks}")
segs, hls_inputs = create_non_optimized_MP4_clip_per_audio_track(segment, event, chunks)
nonoptimized_segments_with_tracks.extend(segs)
# If we are dealing with Optimization, we don't have to consider HLS Inputs for Original Segments.
# Only when we have no Optimizer configured, we consider HLS Inputs from Original Segments to generate HLS for Original Segs.
if 'Optimizer' not in event['Profile']:
hls_input_settings_for_segments.extend(hls_inputs)
# SAVE ALL NON OPT SEGMENTS PER AUDIO TRACK
if len(nonoptimized_segments_with_tracks) > 0:
print(f"Processed Non Opt Segments before saving - {nonoptimized_segments_with_tracks}")
dataplane.save_clip_results(nonoptimized_segments_with_tracks)
detail = {
"State": "CLIP_GEN_DONE",
"Event": {
"EventInfo": event,
"EventType": "EVENT_CLIP_GEN"
}
}
eb_client.put_events(
Entries=[
{
"Source": "awsmre",
"DetailType": "Clip Gen Status",
"Detail": json.dumps(detail),
"EventBusName": EB_EVENT_BUS_NAME
}
]
)
# For HLS Clips, divide the Entire Segment List to Smaller Chunks to meet the MediaConvert Quota Limits of 150 Inputs per Job
# We create a MediaConvert Job per segment group (which can have a Max of 150 Inputs configured - build_input_settings)
groups_of_input_settings = [hls_input_settings_for_segments[x:x+MAX_INPUTS_PER_JOB] for x in range(0, len(hls_input_settings_for_segments), MAX_INPUTS_PER_JOB)]
index = 1
#batch_id = f"{str(event['Event']['Name']).lower()}-{str(event['Event']['Program']).lower()}"
batch_id = str(uuid.uuid4())
#----------- HLS Clips Gen ------------------------------------
#if audioTrack > 0:
print("---------------- ALL groups_of_input_settings")
print(groups_of_input_settings)
# For Opto segments, AudioTrack would be passed to the Step function
if 'Optimizer' in event['Profile']:
print("---- CReating Opto HLS -----------")
# Launch a Media Convert Job with a Max of 150 Inputs
for inputsettings in groups_of_input_settings:
# Each Input setting will have the relevant AudioTrack embedded.
job = create_HLS_clips(inputsettings, index, batch_id, audioTrack)
if job != None:
all_hls_clip_job_ids.append(job['Job']['Id'])
index += 1
else:
# Launch a Media Convert Job with a Max of 150 Inputs
for inputsettings in groups_of_input_settings:
# inputsettings for Orig Segments can have multiple audio tracks in it.
# We need to create a Job per audiotrack to save the HLS Manifests
# in S3 at AudioTrack level.
for track in event['Event']['AudioTracks']:
final_input_settings = []
for inputsetting in inputsettings:
if int(inputsetting['AudioSelectors']['Audio Selector 1']['Tracks'][0]) == int(track):
final_input_settings.append(inputsetting)
# We have Track specific Input Setting, create the Job
if len(final_input_settings) > 0:
# Each Input setting will have the relevant AudioTrack embedded.
job = create_HLS_clips(final_input_settings, index, batch_id, track)
if job != None:
all_hls_clip_job_ids.append(job['Job']['Id'])
index += 1
#----------- HLS Clips Gen Ends ------------------------------------
# Persist Output Artifacts generated from MediaConvert Jobs into DDB
results = []
for segment in input_segments['Segments']:
# We are dealing with an Optimized Segment
if "OptimizedS3KeyPrefix" in segment:
if "OriginalS3KeyPrefix" in segment:
results.append({
"Start": segment['Start'],
"End": segment['End'],
"OriginalClipStatus": "Success",
"OriginalClipLocation": segment["OriginalS3KeyPrefix"],
"OptoStart": get_OptoStart(segment, event),
"OptoEnd": get_OptoEnd(segment, event),
"OptimizedClipStatus": "Success",
"OptimizedClipLocation": segment["OptimizedS3KeyPrefix"],
"OptimizedThumbnailLocation": segment["OptimizedThumbnailS3KeyPrefix"],
"AudioTrack": audioTrack
})
else:
results.append({
"Start": segment['Start'],
"End": segment['End'],
"OptoStart": get_OptoStart(segment, event),
"OptoEnd": get_OptoEnd(segment, event),
"OptimizedClipStatus": "Success",
"OptimizedClipLocation": segment["OptimizedS3KeyPrefix"],
"OptimizedThumbnailLocation": segment["OptimizedThumbnailS3KeyPrefix"],
"AudioTrack": audioTrack
})
#elif "OriginalS3KeyPrefix" in segment: # We are dealing with an Original Segment
# results.append({
# "Start": segment['Start'],
# "End": segment['End'],
# "OriginalClipStatus": "Success" if "OriginalS3KeyPrefix" in segment else "Failure",
# "OriginalClipLocation": segment["OriginalS3KeyPrefix"],
# "OriginalThumbnailLocation": segment["OriginalThumbnailS3KeyPrefix"]
# })
if len(results) > 0:
print(f"Processed Segments before saving - {results}")
dataplane.save_clip_results(results)
detail = {
"State": "CLIP_GEN_DONE",
"Event": {
"EventInfo": event,
"EventType": "EVENT_CLIP_GEN"
}
}
eb_client.put_events(
Entries=[
{
"Source": "awsmre",
"DetailType": "Clip Gen Status",
"Detail": json.dumps(detail),
"EventBusName": EB_EVENT_BUS_NAME
}
]
)
# For Opto segments, AudioTrack would be passed to the Step function
if 'Optimizer' in event['Profile']:
return {
"MediaConvertJobs" : all_hls_clip_job_ids,
"HLSOutputKeyPrefix": f"HLS/{batch_id}/{audioTrack}/",
"OutputBucket": OUTPUT_BUCKET,
"Result": results,
"Event": event['Event'],
"Input": event['Input']
}
else:
return {
"MediaConvertJobs" : all_hls_clip_job_ids,
"OutputBucket": OUTPUT_BUCKET,
"HLSOutputKeyPrefix": f"HLS/{batch_id}/",
"Result": results,
"Event": event['Event'],
"Input": event['Input']
}