remediation/config_rule_and_remediation_sample_v2.py (150 lines of code) (raw):

# -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. import json import time from alibabacloud_config20200907.client import Client as Config20200907Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_config20200907 import models as config_20200907_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient AccessKey = 'xxx' AccessSecret = 'xxx' class Sample: def __init__(self): pass @staticmethod def create_client( access_key_id: str, access_key_secret: str, ) -> Config20200907Client: config = open_api_models.Config( access_key_id=access_key_id, access_key_secret=access_key_secret ) # 访问的域名 config.endpoint = f'config.cn-hangzhou.aliyuncs.com' return Config20200907Client(config) @staticmethod def create_config_rule( input_parameters ) -> str: client = Sample.create_client(AccessKey, AccessSecret) create_config_rule_request = config_20200907_models.CreateConfigRuleRequest( config_rule_name='delta-required-tags-by-sdk-v2', description='最多可定义6组标签,资源需同时具有指定的所有标签,视为“合规”。标签输入大小写敏感,每组最多只能输入一个值。', resource_types_scope=[ # 'ACS::ECS::Disk', 'ACS::ECS::Instance', 'ACS::OSS::Bucket' ], input_parameters=input_parameters, config_rule_trigger_types='ConfigurationItemChangeNotification', risk_level=1, source_owner='ALIYUN', source_identifier='required-tags' ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 response = client.create_config_rule_with_options(create_config_rule_request, runtime) print("finish create_config_rule rule_id:" + response.body.config_rule_id) return response.body.config_rule_id except Exception as error: # 如有需要,请打印 error UtilClient.assert_as_string(error.message) @staticmethod def create_remediation( config_rule_id, remediation_tags ) -> None: # 工程代码泄露可能会导致AccessKey泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考,建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html client = Sample.create_client(AccessKey, AccessSecret) params = { "properties": [ { "name": "regionId", "type": "STRING", "value": "{regionId}", "allowedValues": [], "description": "[Required]地域ID。" }, { "name": "tags", "type": "OBJECT", "value": json.dumps(remediation_tags), "allowedValues": [], "description": "[Required]资源标签(例:{\"k1\":\"v1\",\"k2\":\"v2\"})。" }, { "name": "resourceType", "type": "STRING", "value": "{resourceType}", "allowedValues": [], "description": "[Required]资源类型。" }, { "name": "resourceIds", "type": "ARRAY", "value": "[\"{resourceId}\"]", "allowedValues": [], "description": "[Required]资源ID。" } ] } create_remediation_request = config_20200907_models.CreateRemediationRequest( config_rule_id=config_rule_id, remediation_type='OOS', remediation_template_id='ACS-TAG-TagResources', invoke_type='MANUAL_EXECUTION', source_type='ALIYUN', params=json.dumps(params) ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 response = client.create_remediation_with_options(create_remediation_request, runtime) print("finish create_remediation remediation_id:" + response.body.remediation_id) except Exception as error: # 如有需要,请打印 error UtilClient.assert_as_string(error.message) @staticmethod def start_remediation( config_rule_id ) -> None: client = Sample.create_client(AccessKey, AccessSecret) start_remediation_request = config_20200907_models.StartRemediationRequest( config_rule_id=config_rule_id ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 client.start_remediation_with_options(start_remediation_request, runtime) print("finish start_remediation config_rule_id:" + config_rule_id) except Exception as error: # 如有需要,请打印 error UtilClient.assert_as_string(error.message) @staticmethod def get_resource_compliance_by_config_rule( config_rule_id ) -> None: client = Sample.create_client(AccessKey, AccessSecret) get_resource_compliance_by_config_rule_request = config_20200907_models.GetResourceComplianceByConfigRuleRequest( compliance_type='NON_COMPLIANT', config_rule_id=config_rule_id ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 response = client.get_resource_compliance_by_config_rule_with_options( get_resource_compliance_by_config_rule_request, runtime) for res in response.body.compliance_result.compliances: if res.compliance_type == 'NON_COMPLIANT': return res.count return 0 except Exception as error: # 如有需要,请打印 error UtilClient.assert_as_string(error.message) if __name__ == '__main__': input_parameters = { "tag1Key": "a_sys_app_id", "tag1Value": "200345", "tag2Key": "a_sys_app_name", "tag2Value": "name", "tag3Key": "a_sys_env", "tag3Value": "dev", "tag4Key": "a_sys_owner_div", "tag4Value": "054", "tag5Key": "a_sys_owner_dept", "tag5Value": "0645", "tag6Key": "", "tag6Value": "" } config_rule_id = Sample.create_config_rule(input_parameters) # remediation_tags = { # "a_sys_app_id":"200345", # "a_sys_app_name": "name", # "a_sys_env": "dev", # "a_sys_owner_div": "054", # "a_sys_owner_dept": "0645", # } # Sample.create_remediation(config_rule_id, remediation_tags) # # ## wait for 60s, after finish evaluate then start remediation execution and monitor # time.sleep(60) # print("start remediation configRuleId:" + config_rule_id) # Sample.start_remediation(config_rule_id) # time.sleep(60) # ## 等异步评估任务完成再探查合规记录count # non_compliance_cnt = Sample.get_resource_compliance_by_config_rule(config_rule_id) print("finish remediation configRuleId:" + config_rule_id)