solution/tools/aliyun_subscribe_actiontrail/index.py (149 lines of code) (raw):

# -*- coding:utf-8 -*- from aliyunsdkcore.client import AcsClient from aliyunsdkcore.auth.credentials import StsTokenCredential, AccessKeyCredential from aliyunsdkecs.request.v20140526.DescribeSnapshotsRequest import DescribeSnapshotsRequest from aliyunsdkresourcemanager.request.v20200331.MoveResourcesRequest import MoveResourcesRequest import pymysql from aliyunsdksts.request.v20150401.AssumeRoleRequest import AssumeRoleRequest import json, logging import os connection = None logger = logging.getLogger() class ECS(object): def __init__(self, sts_access_key, sts_access_secret, sts_token, region_id): self.sts_access_key = sts_access_key self.sts_access_secret = sts_access_secret self.sts_token = sts_token self.region_id = region_id if sts_token is None: self.credentials = AccessKeyCredential(self.sts_access_key, self.sts_access_secret) else: self.credentials = StsTokenCredential(self.sts_access_key, self.sts_access_secret, self.sts_token) self.clt = AcsClient(region_id=self.region_id, credential=self.credentials) def ListDiskSnapshot(self, diskid): request = DescribeSnapshotsRequest() request.set_accept_format('json') request.set_DiskId(diskid) request.set_PageSize(100) response = self.clt.do_action_with_exception(request) return response def ListEcsDiskSnapshot(self, instanceid): request = DescribeSnapshotsRequest() request.set_accept_format('json') request.set_InstanceId(instanceid) request.set_PageSize(100) response = self.clt.do_action_with_exception(request) return response def MoveResources(self, target_rg, resources): request = MoveResourcesRequest() request.set_accept_format('json') request.set_ResourceGroupId(target_rg) request.set_Resources(resources) response = self.clt.do_action_with_exception(request) logger.info("批量转组成功,转组对象:" + json.dumps(resources)) class ResourceManage(object): def __init__(self, sts_access_key, sts_access_secret, region_id): self.sts_access_key = sts_access_key self.sts_access_secret = sts_access_secret self.region_id = region_id self.credentials = AccessKeyCredential(self.sts_access_key, self.sts_access_secret) self.clt = AcsClient(region_id=self.region_id, credential=self.credentials) """ 当需要通过资源管理账号操作多个成员账号时适用 """ def AssumeRole(self, memberUid): request = AssumeRoleRequest() request.set_accept_format('json') request.set_RoleArn("acs:ram::%s:role/resourcedirectoryaccountaccessrole" % (memberUid)) request.set_RoleSessionName("rdMaster") response = self.clt.do_action_with_exception(request) response = json.loads(response).get('Credentials') return response.get('AccessKeyId'), response.get('AccessKeySecret'), response.get('SecurityToken') def initialize(context): global connection try: connection = pymysql.connect( host=os.environ['MYSQL_ENDPOING'], # 替换为您的HOST名称。 port=int(os.environ['MYSQL_PORT']), # 替换为您的端口号。 user=os.environ['MYSQL_USER'], # 替换为您的用户名。 passwd=os.environ['MYSQL_PASSWORD'], # 替换为您的用户名对应的密码。 db=os.environ['MYSQL_DBNAME'], # 替换为您的数据库名称。 connect_timeout=5) logger.info('eb job connect mysql success!!!') except Exception as e: logger.error( "ERROR: Unexpected error: Could not connect to MySql instance.") def pre_stop(context): if connection != None: connection.close() """ 通过主键异常来发现是否当前这条EB事件有处理过 """ def save_transactional(sql, params): if connection is None: return True try: cursor = connection.cursor() cursor.execute(sql, params) connection.commit() return True except Exception as e: logger.error(e) return False def check_pk(sql): if connection is None: # 表示不做幂等验证,直接处理 return None try: cursor = connection.cursor() cursor.execute(sql) data = cursor.fetchone() return data except Exception as e: logger.error(e) def scene(context): """ Params: context 格式 { "accountId":"", "eventId:":"", "rgId":"", "resId":"", "resType":"disk" } 场景一:有两个账号: 1、日志账号,开通事件总线 + 函数计算 + RDS,用于响应事件。 2、业务账号,开通事件总线,将事件总线消息推到日志账号。 场景二:有三个账号: 1、日志账号,开通事件总线 + 函数计算 + RDS,用于响应事件。 2、资源管理账号,开通资源目录,在资源目录里面统一管理成员账号变更资源组。 3、业务账号,开通事件总线,将事件总线消息推到日志账号。 """ if os.environ.get("AK") is None or os.environ.get("SK") is None or os.environ.get( "REGION") is None or os.environ.get("SCENE") is None: logger.error("没有配置程序用的AK/SK/Region,请检查环境变量") return else: ak = os.environ.get("AK") sk = os.environ.get("SK") region_id = os.environ.get("REGION") # 1. 先判断幂等 pk_sql = "insert into pk_eventbridge(eb_id) values(%s)" params = (context.get("eventId")) if not save_transactional(pk_sql, params): logger.error("当前事件%s已处理过,忽略" % (context.get("eventId"))) return # 2. 依据磁盘ID或实例ID查询快照列表 resId = context.get("resId") # 需要判断一下是场景一还是场景二 scene_type = os.environ.get("SCENE") if scene_type is not None and scene_type == "ma": # 需要先在MA账号里面AssumeRole到成员账号拿到STS_TOKEN resM = ResourceManage(ak, sk, region_id) sts_ak, sts_sk, sts_token = resM.AssumeRole(context.get("accountId")) ecs = ECS(sts_ak, sts_sk, sts_token, region_id) else: ecs = ECS(ak, sk, None, region_id) # 做多一步判断,如果是disk直接就查快照,如果是ECS实例则需要先查询出磁盘出来 if context.get("resType") == "instance": resp = ecs.ListEcsDiskSnapshot(resId) if context.get("resType") == "disk": resp = ecs.ListDiskSnapshot(resId) if resp is None: return snap_object = json.loads(resp) if len(snap_object["Snapshots"]["Snapshot"]) == 0: logger.error("当前账号:%s,磁盘没有快照%s不需要处理,忽略" % (context.get("accountId"), context.get("resId"))) return resources = [] for item in snap_object["Snapshots"]["Snapshot"]: resources.append( {"ResourceId": item["SnapshotId"], "RegionId": region_id, "Service": "ecs", "ResourceType": "snapshot"}) # 3. 批量转组 ecs.MoveResources(context.get("rgId"), resources) def transfer(event): eb = json.loads(event).get("data") context = { "accountId": eb.get("recipientAccountId"), "eventId": eb.get("eventId"), "rgId": eb.get("requestParameters").get("ResourceGroupId"), "resId": eb.get("requestParameters").get("ResourceId"), "resType": eb.get("requestParameters").get("ResourceType") } if context.get("resType") == "disk" or context.get("resType") == "instance": return context return None def handler(event, context): transfer_context = transfer(event) if transfer_context is None: return scene(transfer_context)