in source/controlplaneapi/infrastructure/lambda/EventScheduler/mre_event_scheduler.py [0:0]
def __process_future_events(self, future_events):
for future_event in future_events:
event_start_time = future_event['Start']
# Fall back on the Default BOOTSTRAP time defined in Env Variable if the
# Event does not have BOOTSTRAP time defined
event_bootstrap_time_in_mins = int(EVENT_SCHEDULER_BOOTSTRAP_TIME_IN_MINS) if 'BootstrapTimeInMinutes' not in future_event['BootstrapTimeInMinutes'] else int(future_event['BootstrapTimeInMinutes'])
cur_utc_time = datetime.utcnow()
future_time = cur_utc_time + timedelta(minutes=event_bootstrap_time_in_mins + int(EVENT_SCHEDULER_BUFFER_TIME_IN_MINS))
# Check if the current event is few minutes away from streaming
# We consider the Bootstrap Time (In case of EC2 or any such resource to Bootstrap and initialize all required services)
# along with a buffer to calculate the TIME_AWAY value
# Also make sure that the same event is not re-processed. This is done by checking if the Event exists in the
# CurrentEvents table.
if datetime.strptime(event_start_time, '%Y-%m-%dT%H:%M:%SZ') >= cur_utc_time and datetime.strptime(event_start_time, '%Y-%m-%dT%H:%M:%SZ') <= future_time:
# Send a Message to EventBridge for Provisioning Stream Processing Resource Architecture
# Check the Concurrent Events value , before provisioning Stream Processing Resources
events_scheduled = self.get_events_scheduled()
# Make sure we are honoring concurrency limits and that the event processing is Idempotent
if len(events_scheduled) <= int(EVENT_SCHEDULER_CONCURRENT_EVENTS) and not self.is_event_scheduled(future_event['Id']):
# Send Msg to EventBridge for Provisioning AWS Resources to Process Input Video Source
result = self.send_event_to_eventbridge(future_event, "FUTURE_EVENT_TO_BE_HARVESTED")
# Push Event into CURRENT_EVENTS table so it gets accounted for concurrent tables pending processing
if result:
self.schedule_event(future_event)
print('Future Event has been scheduled ...')
else:
print('Max Concurrent events have scheduled ...')