solution/compliance/3.4-sub-resource-changes/python_script/subscription.py (308 lines of code) (raw):

# -*- coding:utf-8 -*- import logging import json from typing import Dict, List import random from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest from aliyunsdkcore.auth.credentials import StsTokenCredential, AccessKeyCredential from aliyunsdksts.request.v20150401.AssumeRoleRequest import AssumeRoleRequest from aliyunsdkresourcemanager.request.v20200331.ListAccountsRequest import ListAccountsRequest from aliyunsdkresourcemanager.request.v20200331.CreateServiceLinkedRoleRequest import CreateServiceLinkedRoleRequest from alibabacloud_fc_open20210406.client import Client as FC_Open20210406Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_fc_open20210406 import models as fc__open_20210406_models from alibabacloud_tea_util import models as util_models from alibabacloud_eventbridge.client import Client as EventBridgeClient from alibabacloud_eventbridge import models as event_bridge_models from alibabacloud_tea_console.client import Client as ConsoleClient from alibabacloud_tea_util.client import Client as UtilClient from aliyunsdkram.request.v20150501.AttachPolicyToRoleRequest import AttachPolicyToRoleRequest from aliyunsdkram.request.v20150501.CreateRoleRequest import CreateRoleRequest from Tea.model import TeaModel from config_default import master_account_secret_key, master_account_access_key, region_id, event_bridge_bus_name, \ event_bridge_rule_name, fc_endpoint from config_default import member_account_eventbridge_filter, log_account_put_events_policy from config_default import log_archive_uid,member_uid,member_uid_role_name,sec_group_id,vswitch,vpc from config_default import fc_role,srv_name,fc_name,code_oss_bucket_name,code_oss_object_name,mysql_endpoint from config_default import mysql_port,mysql_user,mysql_password,mysql_dbname class RAM(object): def __init__(self, sts_access_key, sts_access_secret, sts_token, region_id): self.sts_access_key = sts_access_key self.sts_access_secret = sts_access_secret self.sts_token = sts_token self.region_id = region_id self.credentials = StsTokenCredential(self.sts_access_key, self.sts_access_secret, self.sts_token) self.clt = AcsClient(region_id=self.region_id, credential=self.credentials) def ListUsers(self): '''查询全部RAM用户列表 ''' request = CommonRequest() request.set_accept_format('json') request.set_domain('ram.aliyuncs.com') request.set_method('POST') request.set_protocol_type('https') # https | http request.set_version('2015-05-01') request.set_action_name('ListUsers') response = self.clt.do_action(request) logging.debug(str(response, encoding='utf-8')) return json.loads(response) # 创建自定义角色. """ @params: service 指的是成员账号的UID,需要在日志账号中配置权限策略 """ def CreateRole(self, roleName, desc, service): request = CreateRoleRequest() request.set_RoleName(roleName) policy = f'{{\"Statement\": [{{\"Action\": \"sts:AssumeRole\",\"Effect\": \"Allow\",\"Principal\": {{\"Service\": [\"{service}@eventbridge.aliyuncs.com\"]}}}}],\"Version\": \"1\"}}' request.set_AssumeRolePolicyDocument(policy) request.set_Description(desc) request.set_accept_format('json') try: response = self.clt.do_action_with_exception(request) ConsoleClient.log((str(response, encoding='utf-8'))) except Exception as error: ConsoleClient.log(error.message) # 给指定的角色赋权限 def AttachPolicyToRole(self, policyName, roleName, policyType): request = AttachPolicyToRoleRequest() request.set_accept_format('json') request.set_PolicyType(policyType) request.set_PolicyName(policyName) request.set_RoleName(roleName) try: response = self.clt.do_action_with_exception(request) ConsoleClient.log(str(response, encoding='utf-8')) except Exception as error: ConsoleClient.log(error.message) # 创建服务关联角色 ''' @Link 支持服务关联角色的列表 https://help.aliyun.com/document_detail/160674.htm ''' def CreateServiceLinkedRole(self, serviceName): request = CreateServiceLinkedRoleRequest() request.set_ServiceName(serviceName) request.set_accept_format('json') try: response = self.clt.do_action_with_exception(request) ConsoleClient.log(str(response, encoding='utf-8')) except Exception as error: ConsoleClient.log(error.message) class ResourceManage(object): def __init__(self, sts_access_key, sts_access_secret, region_id): self.sts_access_key = sts_access_key self.sts_access_secret = sts_access_secret self.region_id = region_id self.credentials = AccessKeyCredential(self.sts_access_key, self.sts_access_secret) self.clt = AcsClient(region_id=self.region_id, credential=self.credentials) def ListAccounts(self): ''' 获取当前资源目录下的全部账号列表 Returns: ''' request = ListAccountsRequest() request.set_accept_format('json') response = self.clt.do_action_with_exception(request) member_ids = [] for account in json.loads(response).get("Accounts").get("Account"): member_ids.append(account.get("AccountId")) return member_ids def AssumeRole(self, memberUid): request = AssumeRoleRequest() request.set_accept_format('json') request.set_RoleArn("acs:ram::%s:role/resourcedirectoryaccountaccessrole" % (memberUid)) request.set_RoleSessionName("rdMaster") response = self.clt.do_action_with_exception(request) response = json.loads(response).get('Credentials') return response.get('AccessKeyId'), response.get('AccessKeySecret'), response.get('SecurityToken') class EventBridge(object): def __init__(self): pass """ @params account: 当前成员账号的UID """ @staticmethod def createClient(sts_access_key, sts_access_secret, sts_token, account): global region_id """ Create client初始化公共请求参数。 """ config = event_bridge_models.Config() # 您的AccessKey ID。 config.access_key_id = sts_access_key # 您的AccessKey Secret。 config.access_key_secret = sts_access_secret # STS config.security_token = sts_token # 您的接入点。 config.endpoint = account + ".eventbridge." + region_id + ".aliyuncs.com" return EventBridgeClient(config) """ @param logArchiveUid: 日志账号的UID 在成员账号中配置事件目标为日志账号 """ @staticmethod def createEventRule(client, logArchiveUid, logArchiveRole, ruleName): global region_id, event_bridge_bus_name, member_account_eventbridge_filter try: create_event_rule_request = event_bridge_models.CreateRuleRequest() target_entry = event_bridge_models.TargetEntry() target_entry.id = random.randint(0,10000) target_entry.type = "acs.eventbridge" target_entry.endpoint = "acs:eventbridge:" + region_id + ":" + logArchiveUid + ":eventbus/default" target_entry.push_retry_strategy = "BACKOFF_RETRY" # 需要使用指定的Model初始化对象 param_list = [] paramObject1 = event_bridge_models.EBTargetParam("AccountType", "CONSTANT", "AnotherAccount", None) paramObject2 = event_bridge_models.EBTargetParam("AccountId", "CONSTANT", logArchiveUid, None) paramObject3 = event_bridge_models.EBTargetParam("EventBusName", "CONSTANT", event_bridge_bus_name, None) paramObject4 = event_bridge_models.EBTargetParam("RAMRoleName", "CONSTANT", logArchiveRole, None) paramObject5 = event_bridge_models.EBTargetParam("Body", "ORIGINAL", "", None) param_list.append(paramObject1) param_list.append(paramObject2) param_list.append(paramObject3) param_list.append(paramObject4) param_list.append(paramObject5) target_entry.param_list = param_list # 定义Targets 事件列表 target_entry_list = [ target_entry ] create_event_rule_request.rule_name = ruleName create_event_rule_request.event_bus_name = event_bridge_bus_name # TODO 用户需要参考内部规范,配置对应的过滤条件 create_event_rule_request.filter_pattern = member_account_eventbridge_filter create_event_rule_request.status = "enable" create_event_rule_request.targets = target_entry_list response = client.create_rule(create_event_rule_request) ConsoleClient.log("--------------------create rule success--------------------") ConsoleClient.log(UtilClient.to_jsonstring(response.to_map())) except Exception as error: ConsoleClient.log(error.message) class FCConfig(TeaModel): def __init__( self, access_key: str = None, sk: str = None, sts: str = None, account: str = None, sec_group_id: str = None, vswitch: List[str] = None, vpc: str = None, fc_role: str = None, srv_name: str = None, code_oss_bucket_name: str = None, code_oss_object_name: str = None, fc_name: str = None, mysql_endpoint: str = None, mysql_port: str = None, mysql_user: str = None, mysql_password: str = None, mysql_dbname: str = None, ): # ak / sk / sts self.ak = access_key self.sk = sk self.sts = sts # 日志账号的UID,用于配置函数计算 self.account = account # 网络配置 self.sec_group_id = sec_group_id self.vswitch = vswitch self.vpc = vpc # 角色 / 名称 self.fc_role = fc_role self.srv_name = srv_name self.fc_name = fc_name # OSS配置相关用于存储FC函数 self.code_oss_bucket_name = code_oss_bucket_name self.code_oss_object_name = code_oss_object_name # MYSQL相关连接 self.mysql_endpoint = mysql_endpoint self.mysql_port = mysql_port self.mysql_user = mysql_user self.mysql_password = mysql_password self.mysql_dbname = mysql_dbname # 定义函数计算配置 class Fc(object): def __init__(self): pass """ @params account: 日志账号的UID """ @staticmethod def createClient( access_key_id: str, access_key_secret: str, security_token: str, account: str, ) -> FC_Open20210406Client: global region_id config = open_api_models.Config( # 您的AccessKey ID access_key_id=access_key_id, # 您的AccessKey Secret access_key_secret=access_key_secret, # security_token security_token=security_token ) # 访问的域名 config.endpoint = account + "." + region_id + ".fc.aliyuncs.com" return FC_Open20210406Client(config) @staticmethod def createService(args: FCConfig, ) -> None: client = Fc.createClient(args.ak, args.sk, args.sts, args.account) create_service_headers = fc__open_20210406_models.CreateServiceHeaders() vpcconfig = fc__open_20210406_models.VPCConfig( security_group_id=args.sec_group_id, v_switch_ids=args.vswitch, vpc_id=args.vpc ) create_service_request = fc__open_20210406_models.CreateServiceRequest( internet_access=True, role=args.fc_role, service_name=args.srv_name, vpc_config=vpcconfig ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 client.create_service_with_options(create_service_request, create_service_headers, runtime) ConsoleClient.log("创建服务{}成功".format(args.srv_name)) except Exception as error: # 如有需要,请打印 error ConsoleClient.log(error.message) @staticmethod def createFunction(args: FCConfig, ) -> None: client = Fc.createClient(args.ak, args.sk, args.sts, args.account) create_function_headers = fc__open_20210406_models.CreateFunctionHeaders() lifecycConfig = fc__open_20210406_models.InstanceLifecycleConfig( pre_stop=fc__open_20210406_models.LifecycleHook( handler="index.pre_stop", ), ) code = fc__open_20210406_models.Code( oss_bucket_name=args.code_oss_bucket_name, oss_object_name=args.code_oss_object_name ) create_function_request = fc__open_20210406_models.CreateFunctionRequest( function_name=args.fc_name, code=code, environment_variables={"MYSQL_ENDPOING": args.mysql_endpoint, "MYSQL_USER": args.mysql_user,"MYSQL_PORT": args.mysql_port, "MYSQL_PASSWORD": args.mysql_password,"MYSQL_DBNAME": args.mysql_dbname}, handler="index.handler", initializer="index.initializer", runtime="python3", instance_lifecycle_config=lifecycConfig ) print(create_function_request) print(args.srv_name) print(args.fc_name) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 client.create_function_with_options(args.srv_name, create_function_request, create_function_headers, runtime) ConsoleClient.log("创建函数:{}成功".format(args.fc_name)) except Exception as error: # 如有需要,请打印 error ConsoleClient.log(error.message) def pipeline(): global event_bridge_rule_name, log_account_put_events_policy,region_id global log_archive_uid,member_uid,member_uid_role_name global sec_group_id,vswitch,vpc,fc_role,srv_name global fc_name,code_oss_bucket_name,code_oss_object_name global mysql_endpoint,mysql_port,mysql_dbname,mysql_password,mysql_user rdMaster = ResourceManage(master_account_access_key, master_account_secret_key, region_id) # 1、先在日志账号添加角色,信任策略为成员账号 [参数:日志账号] (sts_ak, sts_sk, sts_token) = rdMaster.AssumeRole(log_archive_uid) ram = RAM(sts_ak, sts_sk, sts_token, region_id) ConsoleClient.log("【第一步】日志账号中配置事件总线跨账号路由所需要的角色") ram.CreateRole(member_uid_role_name, "账号投递的角色", member_uid) ram.AttachPolicyToRole(log_account_put_events_policy, member_uid_role_name, "System") # 2、在日志账号中完成函数计算配置 - 配置服务 args = FCConfig( access_key=sts_ak, sk=sts_sk, sts=sts_token, account=log_archive_uid, sec_group_id=sec_group_id, vswitch=vswitch, vpc=vpc, fc_role=fc_role, srv_name=srv_name, fc_name=fc_name, code_oss_bucket_name=code_oss_bucket_name, code_oss_object_name=code_oss_object_name, mysql_endpoint=mysql_endpoint, mysql_port=mysql_port, mysql_user=mysql_user, mysql_password=mysql_password, mysql_dbname=mysql_dbname, ) ConsoleClient.log("【第二步】日志账号中配置函数计算服务") # Fc.createService(args) # 3、在日志账号中完成函数计算配置- 配置函数 ConsoleClient.log("【第三步】日志账号中配置函数计算服务,定义函数配置") Fc.createFunction(args) # 4、对成员账号配置SLR [参数:成员账号的UID] (sts_ak, sts_sk, sts_token) = rdMaster.AssumeRole(member_uid) ram = RAM(sts_ak, sts_sk, sts_token, region_id) ConsoleClient.log("【第四步】成员账号中配置事件总线所需要用到的SLR") ram.CreateServiceLinkedRole("source-actiontrail.eventbridge.aliyuncs.com") # 5、对成员账号配置事件总线 ConsoleClient.log("【第五步】成员账号中配置事件总线:配置规则过滤及事件投递目标") client = EventBridge.createClient(sts_ak, sts_sk, sts_token, member_uid) EventBridge.createEventRule(client, log_archive_uid, member_uid_role_name, event_bridge_rule_name) if __name__ == '__main__': pipeline()