in rdk/rdk.py [0:0]
def __populate_params(self):
#create custom session based on whatever credentials are available to us
my_session = self.__get_boto_session()
#get accountID
#my_sts = my_session.client('sts')
#response = my_sts.get_caller_identity()
#account_id = response['Account']
my_input_params = {}
if self.args.input_parameters:
#Parse the input parameters to make sure it's valid json. Be tolerant of quote usage in the input string.
try:
my_input_params = json.loads(self.args.input_parameters, strict=False)
except Exception as e:
print("Error parsing input parameter JSON. Make sure your JSON keys and values are enclosed in properly-escaped double quotes and your input-parameters string is enclosed in single quotes.")
raise e
my_optional_params = {}
if self.args.optional_parameters:
#As above, but with the optional input parameters.
try:
my_optional_params = json.loads(self.args.optional_parameters, strict=False)
except Exception as e:
print("Error parsing optional input parameter JSON. Make sure your JSON keys and values are enclosed in properly escaped double quotes and your optional-parameters string is enclosed in single quotes.")
my_tags = []
if self.args.tags:
#As above, but with the optional tag key value pairs.
try:
my_tags = json.loads(self.args.tags, strict=False)
except Exception as e:
print("Error parsing optional tags JSON. Make sure your JSON keys and values are enclosed in properly escaped double quotes and tags string is enclosed in single quotes.")
my_remediation = {}
if (
any(
getattr(self.args, arg) is not None
for arg in [
"auto_remediation_retry_attempts",
"auto_remediation_retry_time",
"remediation_action_version",
"remediation_concurrent_execution_percent",
"remediation_error_rate_percent",
"remediation_parameters"
]
)
and not self.args.remediation_action
):
print("Remediation Flags detected but no remediation action (--remediation-action) set")
if self.args.remediation_action:
try:
my_remediation = self.__generate_remediation_params()
except Exception as e:
print("Error parsing remediation configuration.")
#create config file and place in rule directory
parameters = {
'RuleName': self.args.rulename,
'Description': self.args.rulename,
'SourceRuntime': self.args.runtime,
#'CodeBucket': code_bucket_prefix + account_id,
'CodeKey': self.args.rulename+my_session.region_name+'.zip',
'InputParameters': json.dumps(my_input_params),
'OptionalParameters': json.dumps(my_optional_params)
}
if self.args.custom_lambda_name:
parameters['CustomLambdaName'] = self.args.custom_lambda_name
tags = json.dumps(my_tags)
if self.args.resource_types:
parameters['SourceEvents'] = self.args.resource_types
if self.args.maximum_frequency:
parameters['SourcePeriodic'] = self.args.maximum_frequency
if self.args.rulesets:
parameters['RuleSets'] = self.args.rulesets
if self.args.source_identifier:
parameters['SourceIdentifier'] = self.args.source_identifier
parameters['CodeKey'] = None
parameters['SourceRuntime'] = None
if my_remediation:
parameters['Remediation'] = my_remediation
self.__write_params_file(self.args.rulename, parameters, tags)