solution/finance/storage_risk_item.py (250 lines of code) (raw):

# -*- coding: utf-8 -*- import traceback from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient from alibabacloud_sts20150401.client import Client as Sts20150401Client from alibabacloud_sts20150401 import models as sts_20150401_models from alibabacloud_sls20201230.client import Client as Sls20201230Client from alibabacloud_sls20201230 import models as sls_20201230_models from alibabacloud_oss20190517.client import Client as Oss20190517Client from alibabacloud_oss20190517 import models as oss_20190517_models from Tea.exceptions import TeaException rd_management_account_access_key_id = 'yourAccessKeyId' rd_management_account_access_key_secret = 'yourAccessKeySecret' rd_management_account_readonly_role_name = 'readOnly' rd_management_account_id = 'yourRdManagementAccountId' rd_member_account_id_list = ['yourRdMemberAccountId', 'yourRdMemberAccountId'] region_id_list = ['cn-shanghai', 'cn-hangzhou'] # SLS存储天数阀值,LogStore存储天数大于该值则视为风险项 sls_ttl_threshold_days = 90 # SLS热存储天数阀值,LogStore热存储天数配置大于该天数且未配置智能冷热分层存储则视为风险项 sls_hot_ttl_threshold_days = 60 class OpenAPI: def __init__(self): pass @staticmethod def create_sts_client( access_key_id: str, access_key_secret: str, ) -> Sts20150401Client: """ 使用AK&SK初始化账号Client @param access_key_id: @param access_key_secret: @return: Client @throws Exception """ config = open_api_models.Config( # 您的 AccessKey ID, access_key_id=access_key_id, # 您的 AccessKey Secret, access_key_secret=access_key_secret ) # 访问的域名 config.endpoint = f'sts.cn-shanghai.aliyuncs.com' return Sts20150401Client(config) @staticmethod def create_credentials_by_assume_role(account_id, access_key_id, access_key_secret, role_name): client = OpenAPI.create_sts_client(access_key_id, access_key_secret) assume_role_request = sts_20150401_models.AssumeRoleRequest( role_arn='acs:ram::' + account_id + ':role/' + role_name, role_session_name='management-account-programmaticUser' ) runtime = util_models.RuntimeOptions() try: resp = client.assume_role_with_options(assume_role_request, runtime) body_dict = resp.body.to_map() return body_dict['Credentials'] except Exception as error: print(UtilClient.assert_as_string(error)) @staticmethod def create_api_models_config_by_assume_role(account_id, access_key_id, access_key_secret, role_name): rd_role_credentials = OpenAPI.create_credentials_by_assume_role(account_id, access_key_id, access_key_secret, role_name) config = open_api_models.Config() config.access_key_id = rd_role_credentials['AccessKeyId'] config.access_key_secret = rd_role_credentials['AccessKeySecret'] config.security_token = rd_role_credentials['SecurityToken'] return config class StorageResourceSample: def __init__(self): pass @staticmethod def create_oss_client( account_id: str, access_key_id: str, access_key_secret: str, role_name: str, region_id: str, ) -> Oss20190517Client: config = OpenAPI.create_api_models_config_by_assume_role(account_id, access_key_id, access_key_secret, role_name) config.endpoint = f'oss-{region_id}.aliyuncs.com' return Oss20190517Client(config) @staticmethod def query_oss_bucket_list(client: Oss20190517Client): list_project_request = oss_20190517_models.ListBucketsRequest() try: resp = client.list_buckets(list_project_request) resp_map = resp.body.to_map() # print(resp_map) if 'Buckets' in resp_map: return resp_map['Buckets']['Bucket'] else: return [] except Exception as error: UtilClient.assert_as_string(error) @staticmethod def query_oss_risk_item(account_id, access_key_id, access_key_secret, role_name): # for region_id in region_id_list: client = StorageResourceSample.create_oss_client(account_id, access_key_id, access_key_secret, role_name, 'cn-shanghai') buckets = StorageResourceSample.query_oss_bucket_list(client) for bucket in buckets: bucket_region = bucket['Region'] bucket_name = bucket['Name'] client = StorageResourceSample.create_oss_client(account_id, access_key_id, access_key_secret, role_name, bucket_region) output_str = f'AccountId: {account_id} Region:{bucket_region}\n' try: resp = client.get_bucket_lifecycle(bucket_name) # print(resp.to_map()['body']) rules = resp.to_map()['body']['Rule'] if rules is not list: rules = [rules] exist_enable_rule = False for rule in rules: if rule['Status'] == 'Enable': exist_enable_rule = True break if not exist_enable_rule: output_str += f'<Bucket:{bucket_name} Info:无启用的生命周期规则>\n' except TeaException as error: if error.code == 'NoSuchLifecycle': output_str += f'<Bucket:{bucket_name} Info:无生命周期规则>\n' # print(error) except Exception as error: print(repr(error)) print(output_str) @staticmethod def create_sls_client( account_id: str, access_key_id: str, access_key_secret: str, role_name: str, region_id: str, ) -> Sls20201230Client: config = OpenAPI.create_api_models_config_by_assume_role(account_id, access_key_id, access_key_secret, role_name) config.endpoint = f'{region_id}.log.aliyuncs.com' return Sls20201230Client(config) @staticmethod def query_sls_project(client: Sls20201230Client): list_project_request = sls_20201230_models.ListProjectRequest(size=500) runtime = util_models.RuntimeOptions() headers = {} try: resp = client.list_project_with_options(list_project_request, headers, runtime) return resp.body.to_map()['projects'] except Exception as error: UtilClient.assert_as_string(error) @staticmethod def query_sls_store(client: Sls20201230Client, project_name: str): list_log_stores_request = sls_20201230_models.ListLogStoresRequest() runtime = util_models.RuntimeOptions() headers = {} try: resp = client.list_log_stores_with_options(project_name, list_log_stores_request, headers, runtime) log_store_names = resp.body.to_map()['logstores'] log_store_detail_list = [] for log_store_name in log_store_names: resp = client.get_log_store_with_options(project_name, log_store_name, headers, runtime) log_store_detail_list.append(resp.body.to_map()) # print(log_store_detail_list) return log_store_detail_list except Exception as error: UtilClient.assert_as_string(error) @staticmethod def query_sls_index(client: Sls20201230Client, project_name: str, log_store_name: str, ): runtime = util_models.RuntimeOptions() headers = {} try: resp = client.get_index_with_options(project_name, log_store_name, headers, runtime) return resp.body.to_map() except Exception as error: UtilClient.assert_as_string(error) @staticmethod def query_sls_risk_item(account_id, access_key_id, access_key_secret, role_name): permanent_storage_days = 3650 for region_id in region_id_list: output_str = f'AccountId: {account_id} Region:{region_id}\n' client = StorageResourceSample.create_sls_client(account_id, access_key_id, access_key_secret, role_name, region_id) projects = StorageResourceSample.query_sls_project(client) for project in projects: project_name = project['projectName'] if project['status'] != 'Normal': continue log_store_list = StorageResourceSample.query_sls_store(client, project_name) for log_store in log_store_list: log_store_name = log_store["logstoreName"] if log_store['ttl'] >= permanent_storage_days: output_str += f'<Project:{project_name} LogStore:{log_store_name} Risk:开启永久存储>\n' continue if log_store['ttl'] >= sls_ttl_threshold_days: output_str += f'<Project:{project_name} LogStore:{log_store_name} ' \ f'Warn:存储天数大于设定的{sls_ttl_threshold_days}天阀值>\n' if log_store['ttl'] > sls_hot_ttl_threshold_days and 'hot_ttl' not in log_store: output_str += f'<Project:{project_name} LogStore:{log_store_name} ' \ f'Warn:热存储超过{sls_hot_ttl_threshold_days}天,未开启冷存储>\n' index = StorageResourceSample.query_sls_index(client, project_name, log_store_name) if 'line' in index: output_str += f'<Project:{project_name} LogStore:{log_store_name} Warn:开启全文索引\n' if 'log_reduce' in index and index['log_reduce']: output_str += f'<Project:{project_name} LogStore:{log_store_name} Info:开启日志聚类\n' print(output_str) @staticmethod def query_risk_item(): rd_member_account_rd_account_access_role = 'ResourceDirectoryAccountAccessRole' print(f'[OSS Risk Item]') try: StorageResourceSample.query_oss_risk_item(rd_management_account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_management_account_readonly_role_name) except Exception as error: print(f'AccountId {rd_management_account_id} query exception:{repr(error)}') for account_id in rd_member_account_id_list: try: StorageResourceSample.query_oss_risk_item(account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_member_account_rd_account_access_role) except Exception as error: # traceback.print_exc() print(f'AccountId {account_id} query exception:{repr(error)}') # SLS print(f'[SLS Risk Item]') try: StorageResourceSample.query_sls_risk_item(rd_management_account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_management_account_readonly_role_name) except Exception as error: # traceback.print_exc() print(f'AccountId {rd_management_account_id} query exception:{repr(error)}') for account_id in rd_member_account_id_list: try: StorageResourceSample.query_sls_risk_item(account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_member_account_rd_account_access_role) except Exception as error: # traceback.print_exc() print(f'AccountId {account_id} query exception:{repr(error)}') if __name__ == '__main__': StorageResourceSample.query_risk_item()