in source/lambda/configuration/scheduler_config_builder.py [0:0]
def configuration_from_dict(d):
"""
This method builds a configuration object instance that is passed as a dictionary in the event of a lambda function
:param d:
:return:
"""
config_args = {}
for attr in [ATTR_TAGNAME,
ATTR_DEFAULT_TIMEZONE,
ATTR_TRACE,
ATTR_ENABLE_SSM_MAINTENANCE_WINDOWS,
ATTR_SCHEDULE_CLUSTERS,
ATTR_CREATE_RDS_SNAPSHOT,
ATTR_USE_METRICS,
ATTR_SCHEDULE_LAMBDA_ACCOUNT,
ATTR_STARTED_TAGS,
ATTR_STOPPED_TAGS]:
config_args[attr] = d.get(attr, None)
for attr in [ATTR_REGIONS,
ATTR_CROSS_ACCOUNT_ROLES,
ATTR_SCHEDULED_SERVICES]:
config_args[attr] = set(d.get(attr, []))
periods = {}
for period_name in d.get(ATTR_PERIODS, {}):
period_data = d[ATTR_PERIODS][period_name]
period_args = {ATTR_NAME: period_name}
for attr in [ATTR_BEGINTIME, ATTR_ENDTIME]:
if attr in period_data:
period_args[attr] = SchedulerConfigBuilder.get_time_from_string(period_data[attr])
for attr in [ATTR_WEEKDAYS, ATTR_MONTHDAYS, ATTR_MONTHS]:
if attr in period_data:
period_args[attr] = set(period_data.get(attr, None))
period = RunningPeriod(**period_args)
periods[period_name] = period
config_args[ATTR_SCHEDULES] = {}
for schedule_name in d.get(ATTR_SCHEDULES, {}):
schedule_args = {}
schedule_data = d[ATTR_SCHEDULES][schedule_name]
for attr in [ATTR_NAME,
ATTR_TIMEZONE,
ATTR_OVERRIDE_STATUS,
ATTR_STOP_NEW_INSTANCES,
ATTR_USE_METRICS,
ATTR_ENFORCED,
ATTR_HIBERNATE,
ATTR_RETAIN_RUNNING,
ATTR_SSM_MAINTENANCE_WINDOW,
ATTR_USE_MAINTENANCE_WINDOW]:
schedule_args[attr] = schedule_data.get(attr, None)
for attr in [ATTR_SCHEDULE_DT]:
if attr in schedule_data:
schedule_args[attr] = dateutil.parser.parse(schedule_data[attr])
if schedule_args[ATTR_OVERRIDE_STATUS] is None:
schedule_args[ATTR_PERIODS] = []
for period in schedule_data.get(ATTR_PERIODS):
temp = period.split(configuration.INSTANCE_TYPE_SEP)
if len(temp) > 1:
name = temp[0]
instance_type = temp[1]
else:
name = period
instance_type = None
schedule_args[ATTR_PERIODS].append({ATTR_PERIOD: periods[name], ATTR_INSTANCE_TYPE: instance_type})
schedule = InstanceSchedule(**schedule_args)
config_args[ATTR_SCHEDULES][schedule_name] = schedule
config = SchedulerConfig(**config_args)
return config