in rdk/rdk.py [0:0]
def export(self):
self.__parse_export_args()
# get the rule names
rule_names = self.__get_rule_list_for_command("export")
# run the export code
print("Running export")
for rule_name in rule_names:
rule_params, cfn_tags = self.__get_rule_parameters(rule_name)
if 'SourceIdentifier' in rule_params:
print("Found Managed Rule, Ignored.")
print("Export support only Custom Rules.")
continue
source_events = []
if 'SourceEvents' in rule_params:
source_events = [rule_params['SourceEvents']]
source_periodic = "NONE"
if 'SourcePeriodic' in rule_params:
source_periodic = rule_params['SourcePeriodic']
combined_input_parameters = {}
if 'InputParameters' in rule_params:
combined_input_parameters.update(json.loads(rule_params['InputParameters']))
if 'OptionalParameters' in rule_params:
# Remove empty parameters
keys_to_delete = []
optional_parameters_json = json.loads(rule_params['OptionalParameters'])
for key, value in optional_parameters_json.items():
if not value:
keys_to_delete.append(key)
for key in keys_to_delete:
del optional_parameters_json[key]
combined_input_parameters.update(optional_parameters_json)
print("Found Custom Rule.")
s3_src = ""
s3_dst = self.__package_function_code(rule_name, rule_params)
layers = []
rdk_lib_version = "0"
my_session = self.__get_boto_session()
layers = self.__get_lambda_layers(my_session, self.args, rule_params)
if self.args.lambda_layers:
additional_layers = self.args.lambda_layers.split(',')
layers.extend(additional_layers)
subnet_ids = []
security_group_ids = []
if self.args.lambda_security_groups:
security_group_ids = self.args.lambda_security_groups.split(",")
if self.args.lambda_subnets:
subnet_ids = self.args.lambda_subnets.split(",")
lambda_role_arn = "NONE"
if self.args.lambda_role_arn:
print("Existing IAM Role provided: " + self.args.lambda_role_arn)
lambda_role_arn = self.args.lambda_role_arn
my_params = {
"rule_name": rule_name,
"rule_lambda_name": self.__get_lambda_name(rule_name, rule_params),
"source_runtime": self.__get_runtime_string(rule_params),
"source_events": source_events,
"source_periodic": source_periodic,
"source_input_parameters": json.dumps(combined_input_parameters),
"source_handler": self.__get_handler(rule_name, rule_params),
"subnet_ids": subnet_ids,
"security_group_ids": security_group_ids,
"lambda_layers": layers,
"lambda_role_arn": lambda_role_arn,
"lambda_timeout": str(self.args.lambda_timeout)
}
params_file_path = os.path.join(os.getcwd(), rules_dir, rule_name, rule_name.lower() + ".tfvars.json")
parameters_file = open(params_file_path, 'w')
json.dump(my_params, parameters_file, indent=4)
parameters_file.close()
# create json of CFN template
print(self.args.format + " version: "+ self.args.version)
tf_file_body = os.path.join(path.dirname(__file__), 'template', self.args.format, self.args.version,
"config_rule.tf")
tf_file_path = os.path.join(os.getcwd(), rules_dir, rule_name, rule_name.lower() + "_rule.tf")
shutil.copy(tf_file_body, tf_file_path)
variables_file_body = os.path.join(path.dirname(__file__), 'template', self.args.format, self.args.version,
"variables.tf")
variables_file_path = os.path.join(os.getcwd(), rules_dir, rule_name, rule_name.lower() + "_variables.tf")
shutil.copy(variables_file_body, variables_file_path)
print("Export completed.This will generate three .tf files.")