in rdk/rdk.py [0:0]
def __create_function_cloudformation_template(self):
print ("Generating CloudFormation template for Lambda Functions!")
#First add the common elements - description, parameters, and resource section header
template = {}
template["AWSTemplateFormatVersion"] = "2010-09-09"
template["Description"] = "AWS CloudFormation template to create Lambda functions for backing custom AWS Config rules. You will be billed for the AWS resources used if you create a stack from this template."
parameters = {}
parameters["SourceBucket"] = {}
parameters["SourceBucket"]["Description"] = "Name of the S3 bucket that you have stored the rule zip files in."
parameters["SourceBucket"]["Type"] = "String"
parameters["SourceBucket"]["MinLength"] = "1"
parameters["SourceBucket"]["MaxLength"] = "255"
template["Parameters"] = parameters
resources = {}
my_session = self.__get_boto_session()
identity_details = self.__get_caller_identity_details(my_session)
account_id = identity_details['account_id']
partition = identity_details['partition']
lambdaRoleArn = ""
if self.args.lambda_role_arn:
print (f"[{my_session.region_name}]: Existing IAM Role provided: " + self.args.lambda_role_arn)
lambdaRoleArn = self.args.lambda_role_arn
elif self.args.lambda_role_name:
print (f"[{my_session.region_name}]: Building IAM Role ARN from Name: " + self.args.lambda_role_name)
arn = f"arn:{partition}:iam::{account_id}:role/{self.args.lambda_role_name}"
lambdaRoleArn = arn
else:
print ("No IAM role provided, creating a new IAM role for lambda function")
lambda_role = {}
lambda_role["Type"] = "AWS::IAM::Role"
lambda_role["Properties"] = {}
lambda_role["Properties"]["Path"] = "/rdk/"
lambda_role["Properties"]["AssumeRolePolicyDocument"] = {
"Version": "2012-10-17",
"Statement": [ {
"Sid": "AllowLambdaAssumeRole",
"Effect": "Allow",
"Principal": { "Service": "lambda.amazonaws.com" },
"Action": "sts:AssumeRole"
} ]
}
lambda_policy_statements =[
{
"Sid": "1",
"Action": [
"s3:GetObject"
],
"Effect": "Allow",
"Resource": { "Fn::Sub": "arn:${AWS::Partition}:s3:::${SourceBucket}/*" }
},
{
"Sid": "2",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogStreams"
],
"Effect": "Allow",
"Resource": "*"
},
{
"Sid": "3",
"Action": [
"config:PutEvaluations"
],
"Effect": "Allow",
"Resource": "*"
},
{
"Sid": "4",
"Action": [
"iam:List*",
"iam:Describe*",
"iam:Get*"
],
"Effect": "Allow",
"Resource": "*"
},
{
"Sid": "5",
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Resource": "*"
}
]
if self.args.lambda_subnets and self.args.lambda_security_groups:
vpc_policy={
"Sid": "LambdaVPCAccessExecution",
"Action": [
"ec2:DescribeNetworkInterfaces",
"ec2:DeleteNetworkInterface",
"ec2:CreateNetworkInterface"
],
"Effect": "Allow",
"Resource": "*"
}
lambda_policy_statements.append(vpc_policy)
lambda_role["Properties"]["Policies"] = [{
"PolicyName": "ConfigRulePolicy",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": lambda_policy_statements
}
} ]
lambda_role["Properties"]["ManagedPolicyArns"] = [{"Fn::Sub": "arn:${AWS::Partition}:iam::aws:policy/ReadOnlyAccess"}]
resources["rdkLambdaRole"] = lambda_role
rule_names = self.__get_rule_list_for_command()
for rule_name in rule_names:
alphanum_rule_name = self.__get_alphanumeric_rule_name(rule_name)
params, tags = self.__get_rule_parameters(rule_name)
if 'SourceIdentifier' in params:
print("Skipping Managed Rule.")
continue
lambda_function = {}
lambda_function["Type"] = "AWS::Lambda::Function"
properties = {}
properties["FunctionName"] = self.__get_lambda_name(rule_name, params)
properties["Code"] = {"S3Bucket": { "Ref": "SourceBucket"}, "S3Key": rule_name+"/"+rule_name+".zip"}
properties["Description"] = "Function for AWS Config Rule " + rule_name
properties["Handler"] = self.__get_handler(rule_name, params)
properties["MemorySize"] = "256"
if self.args.lambda_role_arn or self.args.lambda_role_name:
properties["Role"] = lambdaRoleArn
else:
lambda_function["DependsOn"] = "rdkLambdaRole"
properties["Role"] = {"Fn::GetAtt": [ "rdkLambdaRole", "Arn" ]}
properties["Runtime"] = self.__get_runtime_string(params)
properties["Timeout"] = str(self.args.lambda_timeout)
properties["Tags"] = tags
if self.args.lambda_subnets and self.args.lambda_security_groups:
properties["VpcConfig"] = {
"SecurityGroupIds" : self.args.lambda_security_groups.split(","),
"SubnetIds" : self.args.lambda_subnets.split(",")
}
layers = []
if self.args.rdklib_layer_arn:
layers.append(self.args.rdklib_layer_arn)
if self.args.lambda_layers:
for layer in self.args.lambda_layers.split(','):
layers.append(layer)
if layers:
properties["Layers"] = layers
lambda_function["Properties"] = properties
resources[alphanum_rule_name+"LambdaFunction"] = lambda_function
lambda_permissions = {}
lambda_permissions["Type"] = "AWS::Lambda::Permission"
lambda_permissions["DependsOn"] = alphanum_rule_name+"LambdaFunction"
lambda_permissions["Properties"] = {
"FunctionName": {"Fn::GetAtt": [ alphanum_rule_name+"LambdaFunction", "Arn" ] },
"Action": "lambda:InvokeFunction",
"Principal": "config.amazonaws.com"
}
resources[alphanum_rule_name+"LambdaPermissions"]=lambda_permissions
template['Resources'] = resources
return json.dumps(template, indent=2)