in source/src/molecule-unfolding/lambda/TaskParametersLambda/app.py [0:0]
def handler(event, context):
# log.info(f"event={event}")
aws_region = os.environ['AWS_REGION']
param_type = event['param_type']
s3_bucket = event['s3_bucket']
s3_prefix = event['s3_prefix']
read_config(s3_bucket, s3_prefix)
log.info(f"default_model_params: {default_model_params}")
common_param = f"--aws_region,{aws_region},--s3-bucket,{s3_bucket},--s3_prefix,{s3_prefix}"
if param_type == 'CHECK_INPUT':
user_input = event['user_input']
execution_id = event['execution_id']
execution_id = execution_id.split(':')[-1]
errs = validate_input(user_input)
if len(errs) > 0:
raise Exception("validate error: {}".format("\n".join(errs)))
# set default optParams
if user_input.get('optParams', None) is None:
user_input['optParams'] = default_opt_params
elif user_input['optParams'].get('sa', None) is None:
user_input['optParams']['sa'] = default_opt_params['sa']
elif user_input['optParams'].get('qa', None) is None:
user_input['optParams']['qa'] = default_opt_params['qa']
if user_input.get('modelParams', None) is None:
user_input['modelParams'] = default_model_params
if user_input.get('hpcResources', None) is None:
user_input['hpcResources'] = default_hpc_resources
if user_input.get('devicesArns', None) is None:
user_input['devicesArns'] = default_devices_arns
key = f"{s3_prefix}/executions/{execution_id}/user_input.json"
string_to_s3(content=json.dumps({
"user_input": user_input,
"execution_id": execution_id,
"aws_region": aws_region,
"start_time": datetime.datetime.utcnow().isoformat()
}), bucket=s3_bucket, key=key)
return {
"params": f"{common_param},--execution-id,{execution_id}".split(","),
"execution_id": execution_id,
"runMode": user_input.get('runMode', "ALL"),
"start_time": datetime.datetime.utcnow().isoformat()
}
else:
execution_id = event.get('execution_id', None)
if execution_id:
common_param = f"{common_param},--execution-id,{execution_id}"
user_input = read_user_input(
execution_id, bucket=s3_bucket, s3_prefix=s3_prefix)
devices_arns = user_input['user_input'].get(
'devicesArns', default_devices_arns)
model_params = user_input['user_input'].get(
'modelParams', default_model_params)
hpc_resources = user_input['user_input'].get(
'hpcResources', default_hpc_resources)
else:
devices_arns = default_devices_arns
model_params = default_model_params
hpc_resources = default_hpc_resources
log.info(f"devices_arns={devices_arns}")
log.info(f"model_params={model_params}")
log.info(f"hpc_resources={hpc_resources}")
if param_type == 'QC_DEVICE_LIST':
return {
"devices_arns": devices_arns,
"execution_id": execution_id
}
model_param_items = get_model_param_items(model_params)
qc_task_params = defaultdict(list)
qc_device_names = []
qc_index = 0
for device_arn in devices_arns:
device_name = device_arn.split("/").pop()
qc_device_names.append(device_name)
for param_item in model_param_items:
param_item_name = str(param_item).replace(
"&", '').replace("=", '')
qc_index += 1
qc_task_params[device_arn].append({
"params": f"--model-param,{param_item},--device-arn,{device_arn},--index,{qc_index},{common_param}".split(","),
"device_name": device_name,
"task_name": f"{device_name}_{param_item_name}",
"model_param": param_item,
"index": qc_index,
"device_arn": device_arn})
hpc_task_params = []
hpc_index = 0
for resource in hpc_resources:
resource_name = f"Vcpu{resource[0]}_Mem{resource[1]}G"
if '.' in resource_name:
resource_name = resource_name.replace(r'.', '_')
for param_item in model_param_items:
hpc_index += 1
param_item_name = str(param_item).replace(
"&", '').replace("=", '')
hpc_task_params.append({
"params": f"--model-param,{param_item},--resource,{resource_name},--index,{hpc_index},{common_param}".split(","),
"resource_name": resource_name,
"task_name": f"{resource_name}_{param_item_name}",
"model_param": param_item,
"index": hpc_index,
"vcpus": resource[0],
"memory": resource[1] * 1024
})
if param_type == 'PARAMS_FOR_QC_DEVICE':
device_arn = event['device_arn']
log.info(f"device_arn={device_arn}")
if device_arn not in devices_arns:
raise ValueError(f"unknown device_arn: {device_arn}")
log.info(qc_task_params)
return {
"qcTaskParams": qc_task_params[device_arn],
"execution_id": execution_id,
}
if param_type == 'PARAMS_FOR_HPC':
return {
"hpcTaskParams": hpc_task_params,
"execution_id": execution_id
}