in installer/resources/src/cdk_construct.py [0:0]
def create_vpc_endpoints(self):
"""
Create VPC Endpoints for accessing AWS services.
"""
self.vpc_gateway_endpoints = {}
self.vpc_interface_endpoints = {}
# If using an existing VPC first import any existing vpc endpoints
if user_specified_variables.vpc_id:
ec2_client = boto3.client("ec2", region_name=user_specified_variables.region)
filters = [{"Name": "vpc-id", "Values": [user_specified_variables.vpc_id]}]
existing_security_groups = {}
for page in ec2_client.get_paginator('describe_vpc_endpoints').paginate(Filters=filters):
for vpc_endpoint in page["VpcEndpoints"]:
service_name = vpc_endpoint['ServiceName']
short_service_name = service_name.split('.')[-1]
resource_name = short_service_name + "VpcEndpoint"
security_groups = []
for group in vpc_endpoint['Groups']:
group_id = group['GroupId']
security_group = existing_security_groups.get(group_id, None)
if not security_group:
group_name = group['GroupName']
security_group = ec2.SecurityGroup.from_security_group_id(self, group_name, group_id)
existing_security_groups[group_id] = security_group
security_groups.append(security_group)
print(f"Importing resource {resource_name} for {service_name} {short_service_name}")
if vpc_endpoint['VpcEndpointType'] == 'Gateway':
self.vpc_gateway_endpoints[short_service_name] = ec2.GatewayVpcEndpoint.from_gateway_vpc_endpoint_id(
self, resource_name,
gateway_vpc_endpoint_id = vpc_endpoint['VpcEndpointId']
)
elif vpc_endpoint['VpcEndpointType'] == 'Interface':
self.vpc_interface_endpoints[short_service_name] = ec2.InterfaceVpcEndpoint.from_interface_vpc_endpoint_attributes(
self, resource_name,
vpc_endpoint_id = vpc_endpoint['VpcEndpointId'],
security_groups = security_groups,
port = 443
)
for short_service_name in install_props.Config.network.vpc_gateway_endpoints:
endpoint_service = ec2.GatewayVpcEndpointAwsService(short_service_name)
if short_service_name in self.vpc_gateway_endpoints:
continue
resource_name = f"{short_service_name}VpcEndpoint"
print(f"Creating resource {resource_name} for {short_service_name}")
self.vpc_gateway_endpoints[short_service_name] = self.soca_resources["vpc"].add_gateway_endpoint(
resource_name,
service = endpoint_service
)
core.CustomResource(
self, f"{short_service_name}VPCEndpointTags",
service_token=self.tag_ec2_resource_lambda.function_arn,
properties={
"ResourceId": self.vpc_gateway_endpoints[short_service_name].vpc_endpoint_id,
"Tags": [
{"Key": "Name", "Value": f"{user_specified_variables.cluster_id}-{short_service_name}-VpcEndpoint"},
{"Key": 'soca:ClusterId', "Value": user_specified_variables.cluster_id}
]
}
)
for short_service_name in install_props.Config.network.vpc_interface_endpoints:
endpoint_service = ec2.InterfaceVpcEndpointAwsService(short_service_name)
if short_service_name in self.vpc_interface_endpoints:
continue
resource_name = f"{short_service_name}VpcEndpoint"
print(f"Creating resource {resource_name} for {short_service_name}")
self.vpc_interface_endpoints[short_service_name] = ec2.InterfaceVpcEndpoint(
self, resource_name,
vpc = self.soca_resources['vpc'],
service = endpoint_service,
private_dns_enabled = True,
security_groups = [self.soca_resources["vpc_endpoint_sg"]]
)
core.CustomResource(
self, f"{short_service_name}VPCEndpointTags",
service_token=self.tag_ec2_resource_lambda.function_arn,
properties={
"ResourceId": self.vpc_interface_endpoints[short_service_name].vpc_endpoint_id,
"Tags": [
{"Key": "Name", "Value": f"{user_specified_variables.cluster_id}-{short_service_name}-VpcEndpoint"},
{"Key": 'soca:ClusterId', "Value": user_specified_variables.cluster_id}
]
}
)
for short_service_name, vpc_endpoint in self.vpc_interface_endpoints.items():
# Ingress
vpc_endpoint.connections.allow_from(self.soca_resources["compute_node_sg"], ec2.Port.tcp(443), "ComputeNodeSG to VpcEndpointSG - allow https traffic to vpc endpoints")
vpc_endpoint.connections.allow_from(self.soca_resources["scheduler_sg"], ec2.Port.tcp(443), "SchedulerSG to VpcEndpointSG - allow https traffic to vpc endpoints")