security-policies/bundle/compliance/policy/aws_eks/ensure_private_access.rego (43 lines of code) (raw):

package compliance.policy.aws_eks.ensure_private_access import data.compliance.lib.common import data.compliance.policy.aws_eks.data_adapter import future.keywords.if # Allow only private access to cluster. is_only_private(cluster, cidr_allowed) if { cluster.ResourcesVpcConfig.EndpointPrivateAccess public_access_is_restricted(cluster, cidr_allowed) } else := false public_access_is_restricted(cluster, _) if { not cluster.ResourcesVpcConfig.EndpointPublicAccess } public_access_is_restricted(cluster, cidr_allowed) if { cidr_allowed == true cluster.ResourcesVpcConfig.EndpointPublicAccess public_access_cidrs := cluster.ResourcesVpcConfig.PublicAccessCidrs # Ensure that publicAccessCidr has a valid filter allow_all_filter := "0.0.0.0/0" invalid_filters := [index | public_access_cidrs[index] == allow_all_filter] count(invalid_filters) == 0 } # Ensure there Kuberenetes endpoint private access is enabled finding(cidr_allowed) := result if { # filter data_adapter.is_aws_eks cluster := data_adapter.cluster rule_evaluation := is_only_private(cluster, cidr_allowed) # set result result := common.generate_result_without_expected( common.calculate_result(rule_evaluation), object.union_n([ { "endpoint_public_access": cluster.ResourcesVpcConfig.EndpointPublicAccess, "endpoint_private_access": cluster.ResourcesVpcConfig.EndpointPrivateAccess, }, cidr_evidence(cluster.ResourcesVpcConfig, cidr_allowed), ]), ) } cidr_evidence(config, cidr_allowed) := result if { cidr_allowed == true result := {"public_access_cidrs": config.PublicAccessCidrs} } else := {}