in gg_group_setup/cmd.py [0:0]
def _create_function_definition(gg_client, group_type, config, region=None):
# Add latest version of Lambda functions to a Function definition
aws = boto3.client('lambda', region_name=region)
latest_funcs = dict()
func_definition = []
# first determine the latest versions of configured Lambda functions
for key in config['lambda_functions']:
lambda_name = key
try:
a = aws.list_aliases(FunctionName=lambda_name)
q = config['lambda_functions'][lambda_name]['arn_qualifier']
# Find the alias index in alias list which alias name equal to arn_qualifier
alias_index = next((index for (index, d) in enumerate(a['Aliases']) if d["Name"] == q))
alias_arn = a['Aliases'][alias_index]['AliasArn']
logging.info("function {0}, found aliases: {1}".format(
lambda_name, a)
)
# add environment variables to lambda function definition
environment_variables = config['lambda_functions'][lambda_name]['environment_variables']
logging.info('function {0}, adding environment variables: {1}'.format(
lambda_name, json.dumps(environment_variables)
))
# get the function pointed to by the alias
f = aws.get_function(FunctionName=lambda_name, Qualifier=q)
logging.info(
"retrieved func config: {0}".format(f['Configuration']))
latest_funcs[lambda_name] = {
"arn": alias_arn,
"arn_qualifier": q,
"environment_variables": environment_variables
}
func_definition.append({
"Id": "{0}".format(lambda_name.lower()),
"FunctionArn": alias_arn,
"FunctionConfiguration": {
"Environment": {
"Variables": environment_variables,
'Execution': {
'IsolationMode': 'NoContainer',
'RunAs': {
'Gid': 0,
'Uid': 0
}
},
},
"Executable": f['Configuration']['Handler'],
"Timeout": int(f['Configuration']['Timeout']),
"Pinned": True
}
}) # function definition
except Exception as e:
logging.error(e)
# if we found one or more configured functions, create a func definition
if len(func_definition) > 0:
ll = gg_client.create_function_definition(
Name="{0}_func_def".format(group_type.type_name)
)
lmbv = gg_client.create_function_definition_version(
FunctionDefinitionId=ll['Id'],
Functions=func_definition,
DefaultConfig={
'Execution': {
'IsolationMode': 'NoContainer'
}
}
)
# update config with latest function versions
config['lambda_functions'] = latest_funcs
ll_arn = lmbv['Arn']
logging.info("Created Function definition ARN:{0}".format(ll_arn))
config['func_def'] = {'id': ll['Id'], 'version_arn': ll_arn, 'environment_variables': environment_variables}
return ll_arn
else:
return