in cfn_policy_validator/validation/validator.py [0:0]
def validate_resources(self, resources):
"""
Validate resource policies
"""
previews_to_await = []
for resource in resources:
# we want to run validate_policy on all resource policies regardless of if they are supported policies
# for access previews
LOGGER.info(f'Validating resource policy for resource {resource.ResourceName} of type {resource.ResourceType}')
validate_policy_resource_type = self.service_specific_policy_validation.get(resource.ResourceType)
if validate_policy_resource_type is None:
response = self.client.validate_policy(
policyType='RESOURCE_POLICY',
policyDocument=json.dumps(resource.Policy.Policy)
)
else:
LOGGER.info(f'Running service specific policy validation for {validate_policy_resource_type}')
response = self.client.validate_policy(
policyType='RESOURCE_POLICY',
policyDocument=json.dumps(resource.Policy.Policy),
validatePolicyResourceType=validate_policy_resource_type
)
LOGGER.info(f'ValidatePolicy response {response}')
validation_findings = response['findings']
self.findings.add_validation_finding(validation_findings, resource.ResourceName, resource.Policy.Name)
# only supported policies for access previews will have config builders
preview_builder = self.preview_builders.get(resource.ResourceType)
if preview_builder is not None:
try:
configuration = preview_builder.build_configuration(resource)
except InvalidPolicyException as e:
self._raise_invalid_configuration_error_for(resource.ResourceName, validation_findings, e.to_string())
LOGGER.info(f'Creating access preview for resource {resource.ResourceName} of type {resource.ResourceType}')
LOGGER.info(f'Using access preview configuration: {configuration}')
try:
response = self.client.create_access_preview(
analyzerArn=self.analyzer_arn,
configurations=configuration
)
except Exception as e:
raise ApplicationError(f'Failed to create access preview for {resource.ResourceName}.', e)
LOGGER.info(f'CreateAccessPreview response: {response}')
preview = PreviewAwaitingResponse(response['id'], resource, resource.ResourceName, validation_findings)
previews_to_await.append(preview)
# batch and wait for all access previews to complete
access_preview_findings = self._wait_for_findings(previews_to_await)
for access_preview_finding in access_preview_findings:
self.findings.add_external_principal_finding(access_preview_finding.findings,
access_preview_finding.resource.ResourceName,
access_preview_finding.resource.Policy.Name)