def __populate_params()

in rdk/rdk.py [0:0]


    def __populate_params(self):
        #create custom session based on whatever credentials are available to us
        my_session = self.__get_boto_session()

        #get accountID
        #my_sts = my_session.client('sts')
        #response = my_sts.get_caller_identity()
        #account_id = response['Account']

        my_input_params = {}

        if self.args.input_parameters:
            #Parse the input parameters to make sure it's valid json.  Be tolerant of quote usage in the input string.
            try:
                my_input_params = json.loads(self.args.input_parameters, strict=False)
            except Exception as e:
                print("Error parsing input parameter JSON.  Make sure your JSON keys and values are enclosed in properly-escaped double quotes and your input-parameters string is enclosed in single quotes.")
                raise e

        my_optional_params = {}

        if self.args.optional_parameters:
            #As above, but with the optional input parameters.
            try:
                my_optional_params = json.loads(self.args.optional_parameters, strict=False)
            except Exception as e:
                print("Error parsing optional input parameter JSON.  Make sure your JSON keys and values are enclosed in properly escaped double quotes and your optional-parameters string is enclosed in single quotes.")

        my_tags = []

        if self.args.tags:
            #As above, but with the optional tag key value pairs.
            try:
                my_tags = json.loads(self.args.tags, strict=False)
            except Exception as e:
                print("Error parsing optional tags JSON.  Make sure your JSON keys and values are enclosed in properly escaped double quotes and tags string is enclosed in single quotes.")

        my_remediation = {}
        if (
            any(
                getattr(self.args, arg) is not None
                for arg in [
                    "auto_remediation_retry_attempts",
                    "auto_remediation_retry_time",
                    "remediation_action_version",
                    "remediation_concurrent_execution_percent",
                    "remediation_error_rate_percent",
                    "remediation_parameters"
                ]
            )
            and not self.args.remediation_action
        ):
            print("Remediation Flags detected but no remediation action (--remediation-action) set")

        if self.args.remediation_action:
            try:
                my_remediation = self.__generate_remediation_params()
            except Exception as e:
                print("Error parsing remediation configuration.")

        #create config file and place in rule directory
        parameters = {
            'RuleName': self.args.rulename,
            'Description': self.args.rulename,
            'SourceRuntime': self.args.runtime,
            #'CodeBucket': code_bucket_prefix + account_id,
            'CodeKey': self.args.rulename+my_session.region_name+'.zip',
            'InputParameters': json.dumps(my_input_params),
            'OptionalParameters': json.dumps(my_optional_params)
        }

        if self.args.custom_lambda_name:
            parameters['CustomLambdaName'] = self.args.custom_lambda_name

        tags = json.dumps(my_tags)

        if self.args.resource_types:
            parameters['SourceEvents'] = self.args.resource_types

        if self.args.maximum_frequency:
            parameters['SourcePeriodic'] = self.args.maximum_frequency

        if self.args.rulesets:
            parameters['RuleSets'] = self.args.rulesets

        if self.args.source_identifier:
            parameters['SourceIdentifier'] = self.args.source_identifier
            parameters['CodeKey'] = None
            parameters['SourceRuntime'] = None

        if my_remediation:
            parameters['Remediation'] = my_remediation

        self.__write_params_file(self.args.rulename, parameters, tags)