solution/finance/idle_resource.py (566 lines of code) (raw):

# -*- coding: utf-8 -*- from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient from alibabacloud_sts20150401.client import Client as Sts20150401Client from alibabacloud_sts20150401 import models as sts_20150401_models from alibabacloud_vpc20160428.client import Client as Vpc20160428Client from alibabacloud_vpc20160428 import models as vpc_20160428_models from alibabacloud_alb20200616.client import Client as Alb20200616Client from alibabacloud_alb20200616 import models as alb_20200616_models from alibabacloud_bssopenapi20171214.client import Client as BssOpenApi20171214Client from alibabacloud_bssopenapi20171214 import models as bss_open_api_20171214_models from alibabacloud_ecs20140526.client import Client as Ecs20140526Client from alibabacloud_ecs20140526 import models as ecs_20140526_models from datetime import timedelta, datetime rd_management_account_access_key_id = 'yourAccessKeyId' rd_management_account_access_key_secret = 'yourAccessKeySecret' rd_management_account_readonly_role_name = 'readOnly' rd_management_account_id = 'yourRdManagementAccountId' rd_member_account_id_list = ['yourRdMemberAccountId', 'yourRdMemberAccountId'] region_id_list = ['cn-shanghai', 'cn-hangzhou'] class OpenAPI: def __init__(self): pass @staticmethod def create_sts_client( access_key_id: str, access_key_secret: str, ) -> Sts20150401Client: """ 使用AK&SK初始化账号Client @param access_key_id: @param access_key_secret: @return: Client @throws Exception """ config = open_api_models.Config( # 您的 AccessKey ID, access_key_id=access_key_id, # 您的 AccessKey Secret, access_key_secret=access_key_secret ) # 访问的域名 config.endpoint = f'sts.cn-shanghai.aliyuncs.com' return Sts20150401Client(config) @staticmethod def create_credentials_by_assume_role(account_id, access_key_id, access_key_secret, role_name): client = OpenAPI.create_sts_client(access_key_id, access_key_secret) assume_role_request = sts_20150401_models.AssumeRoleRequest( role_arn='acs:ram::' + account_id + ':role/' + role_name, role_session_name='management-account-programmaticUser' ) runtime = util_models.RuntimeOptions() try: resp = client.assume_role_with_options(assume_role_request, runtime) body_dict = resp.body.to_map() return body_dict['Credentials'] except Exception as error: print(UtilClient.assert_as_string(error)) @staticmethod def create_api_models_config_by_assume_role(account_id, access_key_id, access_key_secret, role_name): rd_role_credentials = OpenAPI.create_credentials_by_assume_role(account_id, access_key_id, access_key_secret, role_name) config = open_api_models.Config() config.access_key_id = rd_role_credentials['AccessKeyId'] config.access_key_secret = rd_role_credentials['AccessKeySecret'] config.security_token = rd_role_credentials['SecurityToken'] return config class IdleResourceSample: def __init__(self): pass @staticmethod def create_ecs_client( account_id: str, access_key_id: str, access_key_secret: str, role_name: str, region_id: str, ) -> Ecs20140526Client: config = OpenAPI.create_api_models_config_by_assume_role(account_id, access_key_id, access_key_secret, role_name) config.endpoint = f'ecs.{region_id}.aliyuncs.com' return Ecs20140526Client(config) @staticmethod def query_ecs_disk( region_id: str, status: str, client: Ecs20140526Client ): describe_disks_request = ecs_20140526_models.DescribeDisksRequest( region_id=region_id, status=status ) runtime = util_models.RuntimeOptions() try: resp = client.describe_disks_with_options(describe_disks_request, runtime) body_dict = resp.body.to_map() # print(body_dict) return body_dict['Disks']['Disk'] except Exception as error: UtilClient.assert_as_string(error) @staticmethod def query_idle_ecs_disk(account_id, access_key_id, access_key_secret, role_name): idle_resource_list = [] print('[Idle ECS Disk] AccountId: ' + account_id) for region_id in region_id_list: output_str = 'Region:' + region_id + ' ' client = IdleResourceSample.create_ecs_client(account_id, access_key_id, access_key_secret, role_name, region_id) result_list = IdleResourceSample.query_ecs_disk(region_id, 'Available', client) for disk in result_list: # 状态为待挂载,闲置 output_str += f'<ID:{disk["DiskId"]} Name:{disk["DiskName"]} Reason:状态为待挂载> ' idle_resource_list.append(disk) print(output_str) print() return idle_resource_list @staticmethod def create_bss_client( account_id: str, access_key_id: str, access_key_secret: str, role_name: str, ) -> BssOpenApi20171214Client: config = OpenAPI.create_api_models_config_by_assume_role(account_id, access_key_id, access_key_secret, role_name) # 访问的域名 config.endpoint = f'business.aliyuncs.com' return BssOpenApi20171214Client(config) @staticmethod def query_bss_resource_usage_detail( start_period: str, end_period: str, period_type: str, resource_type: str, client: BssOpenApi20171214Client, ): describe_resource_usage_detail_request = bss_open_api_20171214_models.DescribeResourceUsageDetailRequest( start_period=start_period, end_period=end_period, period_type=period_type, resource_type=resource_type, max_results=300 ) runtime = util_models.RuntimeOptions() try: resp = client.describe_resource_usage_detail_with_options(describe_resource_usage_detail_request, runtime) body_dict = resp.body.to_map() return body_dict['Data']['Items'] except Exception as error: UtilClient.assert_as_string(error) @staticmethod def query_idle_ri(rd_account_id, access_key_id, access_key_secret, role_name): client = IdleResourceSample.create_bss_client(rd_account_id, access_key_id, access_key_secret, role_name) # 2022-12-27 22:00:00 today_fixed_time = datetime.today().replace(hour=22, minute=0, second=0, microsecond=0) # 2022-12-24 20:00:00 - 2022-12-24 22:00:00 end_period = today_fixed_time + timedelta(days=-3) start_period = end_period + timedelta(hours=-2) ri_list = IdleResourceSample.query_bss_resource_usage_detail(start_period.strftime("%Y-%m-%d %H:%M:%S"), end_period.strftime("%Y-%m-%d %H:%M:%S"), 'HOUR', 'RI', client) # print(ri_list) id_set = set() idle_ri_list = [] usage_percentage_threshold = 0.1 output_str = '[Idle Reserved Instance Voucher] Region:All\n' for ri in ri_list: if ri['Status'] != 'Valid': continue ri_id = ri['ResourceInstanceId'] if ri_id not in id_set and ri['UsagePercentage'] < usage_percentage_threshold: # 查询时段内预留实例券利用率小于阈值,闲置 output_str += f'<AccountId:{ri["UserId"]} ID:{ri_id} Reason:{ri["StartTime"]} - {ri["EndTime"]}' \ f' {"使用率低于" + str(usage_percentage_threshold * 100) + "%"}> ' idle_ri_list.append(ri) id_set.add(ri_id) print(output_str + '\n') return idle_ri_list @staticmethod def create_alb_client( account_id: str, access_key_id: str, access_key_secret: str, role_name: str, region_id: str, ) -> Alb20200616Client: config = OpenAPI.create_api_models_config_by_assume_role(account_id, access_key_id, access_key_secret, role_name) config.endpoint = f'alb.{region_id}.aliyuncs.com' return Alb20200616Client(config) @staticmethod def query_alb_instance(client: Alb20200616Client): list_load_balancers_request = alb_20200616_models.ListLoadBalancersRequest( max_results=100 ) runtime = util_models.RuntimeOptions() try: resp = client.list_load_balancers_with_options(list_load_balancers_request, runtime) body_dict = resp.body.to_map() return body_dict['LoadBalancers'] except Exception as error: UtilClient.assert_as_string(error) @staticmethod def query_alb_listener( client: Alb20200616Client, load_balancer_ids: list, ): list_listeners_request = alb_20200616_models.ListListenersRequest( load_balancer_ids=load_balancer_ids, max_results=100 ) runtime = util_models.RuntimeOptions() try: resp = client.list_listeners_with_options(list_listeners_request, runtime) body_dict = resp.body.to_map() return body_dict['Listeners'] except Exception as error: UtilClient.assert_as_string(error) @staticmethod def query_alb_forward_rule( client: Alb20200616Client, load_balancer_ids: list, ): list_rules_request = alb_20200616_models.ListRulesRequest( load_balancer_ids=load_balancer_ids, max_results=100, ) runtime = util_models.RuntimeOptions() try: resp = client.list_rules_with_options(list_rules_request, runtime) body_dict = resp.body.to_map() return body_dict['Rules'] except Exception as error: UtilClient.assert_as_string(error) @staticmethod def query_alb_server_group( client: Alb20200616Client, server_group_ids: list, ): list_server_groups_request = alb_20200616_models.ListServerGroupsRequest( server_group_ids=server_group_ids, max_results=100, ) runtime = util_models.RuntimeOptions() try: resp = client.list_server_groups_with_options(list_server_groups_request, runtime) body_dict = resp.body.to_map() return body_dict['ServerGroups'] except Exception as error: # 如有需要,请打印 error UtilClient.assert_as_string(error) @staticmethod def query_idle_alb(account_id, access_key_id, access_key_secret, role_name): idle_resource_list = [] print('[Idle ALB] AccountId: ' + account_id) for region_id in region_id_list: output_str = 'Region:' + region_id + ' ' client = IdleResourceSample.create_alb_client(account_id, access_key_id, access_key_secret, role_name, region_id) result_list = IdleResourceSample.query_alb_instance(client) for alb in result_list: alb_id = alb['LoadBalancerId'] # print(alb_id) # 查询监听器 listeners = IdleResourceSample.query_alb_listener(client, [alb_id]) # print(listeners) if len(listeners) == 0: # 无监听器,闲置 output_str += f'<ID:{alb_id} Name:{alb["LoadBalancerName"]} Reason:无监听器> ' idle_resource_list.append(alb) continue all_listener_stopped = True # 转发服务器组ID列表 forward_server_group_ids = [] for listener in listeners: # 默认转发规则的服务器组ID forward_server_group_ids.append( listener['DefaultActions'][0]['ForwardGroupConfig']['ServerGroupTuples'][0]['ServerGroupId']) if listener['ListenerStatus'] != 'Stopped': all_listener_stopped = False break if all_listener_stopped: # 所有监听器停止,闲置 output_str += f'<ID:{alb_id} Name:{alb["LoadBalancerName"]} Reason:所有监听器停止> ' idle_resource_list.append(alb) continue # 查询转发规则 rules = IdleResourceSample.query_alb_forward_rule(client, [alb_id]) for rule in rules: # 添加转发规则中的服务器组ID forward_server_group_ids.append( rule['RuleActions'][0]['ForwardGroupConfig']['ServerGroupTuples'][0]['ServerGroupId']) # 查询服务器组信息 server_groups = IdleResourceSample.query_alb_server_group(client, list(set(forward_server_group_ids))) all_server_group_is_empty = True for server_group in server_groups: if server_group['ServerCount'] > 0: all_server_group_is_empty = False break if all_server_group_is_empty: # 所有服务器组都无后端服务器,闲置 output_str += f'<ID:{alb_id} Name:{alb["LoadBalancerName"]} Reason:所有服务器组都无后端服务器> ' idle_resource_list.append(alb) print(output_str) print() return idle_resource_list @staticmethod def create_vpc_client( account_id, access_key_id, access_key_secret, role_name ) -> Vpc20160428Client: config = OpenAPI.create_api_models_config_by_assume_role(account_id, access_key_id, access_key_secret, role_name) config.endpoint = f'vpc.aliyuncs.com' return Vpc20160428Client(config) @staticmethod def query_common_bandwidth_package( client: Vpc20160428Client, region_id: str ): describe_common_bandwidth_packages_request = vpc_20160428_models.DescribeCommonBandwidthPackagesRequest( region_id=region_id, page_size=50, page_number=1 ) runtime = util_models.RuntimeOptions() try: resp = client.describe_common_bandwidth_packages_with_options(describe_common_bandwidth_packages_request, runtime) body_dict = resp.body.to_map() return body_dict['CommonBandwidthPackages']['CommonBandwidthPackage'] except Exception as error: UtilClient.assert_as_string(error) @staticmethod def query_idle_common_bandwidth_package(account_id, access_key_id, access_key_secret, role_name): client = IdleResourceSample.create_vpc_client(account_id, access_key_id, access_key_secret, role_name) idle_resource_list = [] print('[Idle Common Bandwidth Package] AccountId: ' + account_id) for region_id in region_id_list: output_str = 'Region:' + region_id + ' ' result_list = IdleResourceSample.query_common_bandwidth_package(client, region_id) # print(result_list) for cbp in result_list: if cbp['BusinessStatus'] != 'Normal': continue if cbp['PublicIpAddresses'] is None or len(cbp['PublicIpAddresses']['PublicIpAddresse']) == 0: output_str += f'<ID:{cbp["BandwidthPackageId"]} Name:{cbp["Name"]} Reason:未添加EIP> ' idle_resource_list.append(cbp) print(output_str) print() return idle_resource_list @staticmethod def query_nat_gateway(client: Vpc20160428Client, region_id: str): describe_nat_gateways_request = vpc_20160428_models.DescribeNatGatewaysRequest( region_id=region_id, page_size=50, ) runtime = util_models.RuntimeOptions() try: resp = client.describe_nat_gateways_with_options(describe_nat_gateways_request, runtime) body_dict = resp.body.to_map() # print(body_dict) return body_dict['NatGateways']['NatGateway'] except Exception as error: UtilClient.assert_as_string(error) @staticmethod def query_nat_gateway_snat_entry(client: Vpc20160428Client, region_id: str, nat_gateway_id: str): describe_snat_table_entries_request = vpc_20160428_models.DescribeSnatTableEntriesRequest( nat_gateway_id=nat_gateway_id, region_id=region_id ) runtime = util_models.RuntimeOptions() try: resp = client.describe_snat_table_entries_with_options(describe_snat_table_entries_request, runtime) body_dict = resp.body.to_map() # print(body_dict) return body_dict['SnatTableEntries']['SnatTableEntry'] except Exception as error: UtilClient.assert_as_string(error) @staticmethod def query_nat_gateway_dnat_entry(client: Vpc20160428Client, region_id: str, nat_gateway_id: str): describe_forward_table_entries_request = vpc_20160428_models.DescribeForwardTableEntriesRequest( region_id=region_id, nat_gateway_id=nat_gateway_id ) runtime = util_models.RuntimeOptions() try: resp = client.describe_forward_table_entries_with_options(describe_forward_table_entries_request, runtime) body_dict = resp.body.to_map() # print(body_dict) return body_dict['ForwardTableEntries']['ForwardTableEntry'] except Exception as error: UtilClient.assert_as_string(error) @staticmethod def query_idle_nat_gateway(account_id, access_key_id, access_key_secret, role_name): client = IdleResourceSample.create_vpc_client(account_id, access_key_id, access_key_secret, role_name) idle_resource_list = [] print('[Idle NAT Gateway] AccountId: ' + account_id) for region_id in region_id_list: output_str = 'Region:' + region_id + ' ' result_list = IdleResourceSample.query_nat_gateway(client, region_id) # print(eip_list) for nat_gateway in result_list: if nat_gateway['NetworkType'] == 'internet' and len(nat_gateway['IpLists']['IpList']) == 0: # 公网NAT网关未绑定EIP,闲置 output_str += f'<ID:{nat_gateway["NatGatewayId"]} Name:{nat_gateway["Name"]}' \ f' Reason:公网NAT网关未绑定EIP> ' idle_resource_list.append(nat_gateway) continue snat_entries = IdleResourceSample.query_nat_gateway_snat_entry(client, region_id, nat_gateway['NatGatewayId']) if len(snat_entries) != 0: continue else: dnat_entries = IdleResourceSample.query_nat_gateway_dnat_entry(client, region_id, nat_gateway['NatGatewayId']) if len(dnat_entries) == 0: # 无SNAT及DNAT条目,闲置 output_str += f'<ID:{nat_gateway["NatGatewayId"]} Name:{nat_gateway["Name"]}' \ f' Reason:无SNAT及DNAT条目> ' idle_resource_list.append(nat_gateway) print(output_str) print() return idle_resource_list @staticmethod def query_eip(client: Vpc20160428Client, region_id: str): describe_eip_addresses_request = vpc_20160428_models.DescribeEipAddressesRequest( region_id=region_id, page_size=100, page_number=1 ) runtime = util_models.RuntimeOptions() try: resp = client.describe_eip_addresses_with_options(describe_eip_addresses_request, runtime) body_dict = resp.body.to_map() # print(body_dict) return body_dict['EipAddresses']['EipAddress'] except Exception as error: # 如有需要,请打印 error UtilClient.assert_as_string(error) @staticmethod def query_idle_eip(account_id, access_key_id, access_key_secret, role_name): client = IdleResourceSample.create_vpc_client(account_id, access_key_id, access_key_secret, role_name) idle_eip_list = [] print('[Idle EIP] AccountId: ' + account_id) for region_id in region_id_list: output_str = 'Region:' + region_id + ' ' eip_list = IdleResourceSample.query_eip(client, region_id) # print(eip_list) for eip in eip_list: bind_instance_id = str(eip['InstanceId']) if len(bind_instance_id) == 0 or len(bind_instance_id.strip()) == 0: output_str += f'<ID:{eip["AllocationId"]} Name:{eip["Name"]} Reason:未绑定资源> ' idle_eip_list.append(eip) print(output_str) print() return idle_eip_list @staticmethod def query_idle_resource(): rd_member_account_rd_account_access_role = 'ResourceDirectoryAccountAccessRole' # EIP try: IdleResourceSample.query_idle_eip(rd_management_account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_management_account_readonly_role_name) except Exception as error: print(f'AccountId {rd_management_account_id} query exception:{repr(error)}') for account_id in rd_member_account_id_list: try: IdleResourceSample.query_idle_eip(account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_member_account_rd_account_access_role) except Exception as error: # traceback.print_exc() print(f'AccountId {account_id} query exception:{repr(error)}') # 共享带宽包 try: IdleResourceSample.query_idle_common_bandwidth_package(rd_management_account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_management_account_readonly_role_name) except Exception as error: print(f'AccountId {rd_management_account_id} query exception:{repr(error)}') for account_id in rd_member_account_id_list: try: IdleResourceSample.query_idle_common_bandwidth_package(account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_member_account_rd_account_access_role) except Exception as error: # traceback.print_exc() print(f'AccountId {account_id} query exception:{repr(error)}') # ALB try: IdleResourceSample.query_idle_alb(rd_management_account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_management_account_readonly_role_name) except Exception as error: print(f'AccountId {rd_management_account_id} query exception:{repr(error)}') for account_id in rd_member_account_id_list: try: IdleResourceSample.query_idle_alb(account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_member_account_rd_account_access_role) except Exception as error: print(f'AccountId {account_id} query exception:{repr(error)}') # NAT网关 try: IdleResourceSample.query_idle_nat_gateway(rd_management_account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_management_account_readonly_role_name) except Exception as error: print(f'AccountId {rd_management_account_id} query exception:{repr(error)}') for account_id in rd_member_account_id_list: try: IdleResourceSample.query_idle_nat_gateway(account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_member_account_rd_account_access_role) except Exception as error: print(f'AccountId {account_id} query exception:{repr(error)}') # ECS云盘 try: IdleResourceSample.query_idle_ecs_disk(rd_management_account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_management_account_readonly_role_name) except Exception as error: print(f'AccountId {rd_management_account_id} query exception:{repr(error)}') for account_id in rd_member_account_id_list: try: IdleResourceSample.query_idle_ecs_disk(account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_member_account_rd_account_access_role) except Exception as error: print(f'AccountId {account_id} query exception:{repr(error)}') # 预留实例券RI try: IdleResourceSample.query_idle_ri(rd_management_account_id, rd_management_account_access_key_id, rd_management_account_access_key_secret, rd_management_account_readonly_role_name) except Exception as error: print(f'AccountId {rd_management_account_id} query exception:{repr(error)}') if __name__ == '__main__': IdleResourceSample.query_idle_resource()