rules/customer-fc/vpc_flow_logs_enabled.py (151 lines of code) (raw):
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import json
import logging
from os.path import abspath
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
from aliyunsdkcore.http import protocol_type
"""
该函数同时支持config配置变更触发和周期触发。
收到配置变更或周期触发事件时,查询当前事件中的VPC是否开启了流日志,如果开启视为合规,未开启视为不合规。
判断当前触发类型如果是周期触发,则开启删除模式,config会自动清理非本周期内评估的记录
"""
logger = logging.getLogger()
# 合规类型
COMPLIANCE_TYPE_COMPLIANT = 'COMPLIANT'
COMPLIANCE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT'
COMPLIANCE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE'
# 资源配置推送类型
CONFIGURATION_TYPE_COMMON = 'COMMON'
CONFIGURATION_TYPE_OVERSIZE = 'OVERSIZE'
CONFIGURATION_TYPE_NONE = 'NONE'
# Config api endpoint, International sites use ap-southeast-1 and config.ap-southeast-1.aliyuncs.com
CONFIG_SERVICE_REGION = 'cn-shanghai'
CONFIG_SERVICE_ENDPOINT = 'config.cn-shanghai.aliyuncs.com'
AK = '******'
SK = '******'
# main function
# event schema https://help.aliyun.com/document_detail/127405.html
def handler(event, context):
evt = validate_event(event)
if not evt:
return None
rule_parameters = evt.get('ruleParameters')
result_token = evt.get('resultToken')
ordering_timestamp = evt.get('orderingTimestamp')
invoking_event = evt.get('invokingEvent')
configuration_item = invoking_event.get('configurationItem')
account_id = configuration_item.get('accountId')
resource_id = configuration_item.get('resourceId')
resource_type = configuration_item.get('resourceType')
region_id = configuration_item.get('regionId')
configuration_type = invoking_event.get('configurationType')
if configuration_type and configuration_type == CONFIGURATION_TYPE_OVERSIZE:
resource_result = get_discovered_resource(context, resource_id, resource_type, region_id)
resource_json = json.loads(resource_result)
configuration_item["configuration"] = resource_json["DiscoveredResourceDetail"]["Configuration"]
compliance_type, annotation = evaluate_configuration_item(context, rule_parameters, configuration_item)
evaluations = [
{
'accountId': account_id,
'complianceResourceId': resource_id,
'complianceResourceType': resource_type,
'complianceRegionId': region_id,
'orderingTimestamp': ordering_timestamp,
'complianceType': compliance_type,
'annotation': annotation
}
]
message_type = invoking_event.get('messageType')
put_evaluations(context, result_token, evaluations, message_type)
return evaluations
# 实现资源评估逻辑
def evaluate_configuration_item(context, rule_parameters, configuration_item):
vpc_region_id = configuration_item["regionId"]
vpc_id = json.loads(configuration_item['configuration'])["VpcId"]
vpc_logs_count = query_vpc_logs(context, vpc_region_id, vpc_id)
compliance_type = COMPLIANCE_TYPE_COMPLIANT
annotation = None
if not vpc_logs_count or vpc_logs_count <= 0:
compliance_type = COMPLIANCE_TYPE_NON_COMPLIANT
annotation = json.dumps({'configuration': 'Not enable flowLogs', 'desiredValue': 'Enable flowLogs'})
return compliance_type, annotation
def query_vpc_endpoint(context, region_id):
client = AcsClient(
AK,
SK,
'region_id',
)
vpc_center_endpoint = "vpc.aliyuncs.com"
request = CommonRequest()
request.set_domain('vpc.aliyuncs.com')
request.set_version('2016-04-28')
request.set_action_name('DescribeRegions')
request.set_method('GET')
response = client.do_action_with_exception(request)
res = str(response, encoding='utf-8')
json_res = json.loads(res)
for region_info in json_res["Regions"]["Region"]:
if region_id == region_info["RegionId"]:
return region_info["RegionEndpoint"]
return vpc_center_endpoint
def query_vpc_logs(context, region_id, vpc_id):
client = AcsClient(AK, SK, region_id)
vpc_endpoint = query_vpc_endpoint(context, region_id)
request = CommonRequest()
request.set_protocol_type(protocol_type.HTTPS)
request.set_domain(vpc_endpoint)
request.set_version('2016-04-28')
request.set_action_name('DescribeFlowLogs')
request.set_method('GET')
request.add_query_param('RegionId', region_id)
request.add_query_param('ResourceType', 'VPC')
request.add_query_param('ResourceId', vpc_id)
request.add_query_param('Status', 'Active')
response = client.do_action_with_exception(request)
res = str(response, encoding='utf-8')
json_res = json.loads(res)
if "TotalCount" in json_res and json_res["TotalCount"] > 0:
return json_res["TotalCount"] > 0
return None
def validate_event(event):
if not event:
logger.error('Event is empty.')
evt = parse_json(event)
logger.info('Loading event: %s .' % evt)
if 'resultToken' not in evt:
logger.error('ResultToken is empty.')
return None
if 'ruleParameters' not in evt:
logger.error('RuleParameters is empty.')
return None
if 'invokingEvent' not in evt:
logger.error('InvokingEvent is empty.')
return None
return evt
def parse_json(content):
try:
return json.loads(content)
except Exception as e:
logger.error('Parse content:{} to json error:{}.'.format(content, e))
return None
def put_evaluations(context, result_token, evaluations, message_type):
client = AcsClient(AK, SK, CONFIG_SERVICE_REGION)
request = CommonRequest()
request.set_domain(CONFIG_SERVICE_ENDPOINT)
request.set_version('2019-01-08')
request.set_action_name('PutEvaluations')
request.add_body_params('ResultToken', result_token)
request.add_body_params('Evaluations', evaluations)
if message_type and message_type == 'ScheduledNotification':
request.add_body_params('DeleteMode', True)
request.set_method('POST')
try:
response = client.do_action_with_exception(request)
logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response))
except Exception as e:
logger.error('PutEvaluations error: %s' % e)
def get_discovered_resource(context, resource_id, resource_type, region_id):
client = AcsClient(AK, SK, CONFIG_SERVICE_REGION)
request = CommonRequest()
request.set_domain(CONFIG_SERVICE_ENDPOINT)
request.set_version('2020-09-07')
request.set_action_name('GetDiscoveredResource')
request.add_query_param('ResourceId', resource_id)
request.add_query_param('ResourceType', resource_type)
request.add_query_param('Region', region_id)
request.set_method('GET')
try:
response = client.do_action_with_exception(request)
resource_result = str(response, encoding='utf-8')
return resource_result
except Exception as e:
logger.error('GetDiscoveredResource error: %s' % e)