in source/Main_Orchestrator.py [0:0]
def process_ct_list(ctlistjson,appaccountid,appacctrole):
# Check if the step function ARN values are valid values
IsValidRfcSm = validate_config_input(rfc_state_machine_arn,"^[A-Za-z0-9-:_]{1,1000}$",1,1000)
IsValidSrSm = validate_config_input(sr_state_machine_arn,"^[A-Za-z0-9-:_]{1,1000}$",1,1000)
IsValidCMBackUpSm = validate_config_input(customer_managed_backup_state_machine_arn,"^[A-Za-z0-9-:_]{1,1000}$",1,1000)
IsValidCMPatchSm = validate_config_input(customer_managed_patch_state_machine_arn,"^[A-Za-z0-9-:_]{1,1000}$",1,1000)
# Only proceed if the values are valid
if IsValidRfcSm==False or IsValidSrSm==False or IsValidCMBackUpSm==False or IsValidCMPatchSm==False :
sys.exit("State machine ARNs or role values are not valid inputs")
else:
# Go through each requested change, AMS or customer managed and then make a call to the step
# function required to make the lambda calls.
# For RFC next call is Create_RFC.py, for ServiceRequests Create_SR.py and so on.
try:
for ct in ctlistjson:
ChangeType = (ct['Key'])
pause_time = (ct['Wait_Time'])
exec_params = (ct['Exec_Params'])
# Validate input
IsValild_keyChangeType = validate_config_input(ChangeType,"^[A-Za-z0-9-:_]{1,1000}$",1,1000)
IsValidPauseTime = validate_config_input(str(pause_time),"^[0-9]{1,1000}$",1,1000)
if IsValild_keyChangeType==False or IsValidPauseTime==False:
sys.exit("Change type, Pause time or exec param values are not valid inputs")
if ChangeType =="ct-2hyozbpa0sx0m":
process_step_function(rfc_state_machine_arn,"ct-2hyozbpa0sx0m",pause_time,appaccountid,exec_params,appacctrole)
elif ChangeType == "ct-0el2j07llrxs7":
process_step_function(rfc_state_machine_arn,"ct-0el2j07llrxs7",pause_time,appaccountid,exec_params,appacctrole)
elif ChangeType == "Service_Request":
process_step_function(sr_state_machine_arn,"Service_Request",pause_time,appaccountid,exec_params,appacctrole)
elif ChangeType == "backup":
process_step_function(customer_managed_backup_state_machine_arn,"Base_AWS_BackUp",pause_time,appaccountid,exec_params,appacctrole)
elif ChangeType == "patch":
process_step_function(customer_managed_patch_state_machine_arn,"Base_AWS_Patch",pause_time,appaccountid,exec_params,appacctrole)
else:
print("Unknown_Change_Type")
except Exception as ct_list_step_function_error:
print("Error while trying to call step functions" + " " + str(ct_list_step_function_error))
else:
print('Launched operational baseline successfully' )