ebcli/lib/ec2.py (371 lines of code) (raw):
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import collections
import urllib.parse, urllib.request, urllib.error
import socket
from cement.utils.misc import minimal_logger
from ebcli.lib import aws
from ebcli.objects.exceptions import ServiceError, AlreadyExistsError, \
NotFoundError, NotAnEC2Instance
from ebcli.resources.strings import responses
from ebcli.core import fileoperations, io
LOG = minimal_logger(__name__)
def _make_api_call(operation_name, **operation_options):
return aws.make_api_call('ec2', operation_name, **operation_options)
def get_key_pairs():
result = _make_api_call('describe_key_pairs')
return result['KeyPairs']
def import_key_pair(keyname, key_material):
try:
result = _make_api_call(
'import_key_pair',
KeyName=keyname,
PublicKeyMaterial=key_material
)
except ServiceError as e:
if e.message.endswith('already exists.'):
raise AlreadyExistsError(e.message)
else:
raise
return result
def describe_instances(instance_ids):
result = _make_api_call('describe_instances',
InstanceIds=instance_ids)
instances = []
for r in result.get('Reservations', {}):
for i in r.get('Instances', {}):
instances.append(i)
return instances
def describe_instance(instance_id):
result = describe_instances([instance_id])
try:
return result[0]
except (IndexError, NotFoundError):
raise NotFoundError('Instance {0} not found.'.format(instance_id))
def has_default_vpc():
result = _make_api_call('describe_account_attributes',
AttributeNames=['default-vpc'])
default_vpc = None
for attribute in result['AccountAttributes']:
if attribute['AttributeName'] == 'default-vpc':
try:
default_vpc = attribute['AttributeValues'][0]['AttributeValue']
except (KeyError, IndexError):
default_vpc = None
if default_vpc and default_vpc.lower() != 'none':
return True
else:
return False
def revoke_ssh(security_group_id):
try:
_make_api_call(
'revoke_security_group_ingress',
GroupId=security_group_id,
IpProtocol='tcp',
ToPort=22,
FromPort=22,
CidrIp='0.0.0.0/0'
)
except ServiceError as e:
if e.message.startswith(responses['ec2.sshalreadyopen']):
pass
else:
raise
def authorize_ssh(security_group_id):
try:
_make_api_call(
'authorize_security_group_ingress',
GroupId=security_group_id,
IpProtocol='tcp',
ToPort=22,
FromPort=22,
CidrIp='0.0.0.0/0'
)
except ServiceError as e:
if e.code == 'InvalidPermission.Duplicate':
pass
else:
raise
def describe_security_group(security_group_id):
result = _make_api_call('describe_security_groups',
GroupIds=[security_group_id])
if result and len(result['SecurityGroups']) < 1:
raise NotFoundError('Security Group {} not found.'
.format(security_group_id))
return result['SecurityGroups'][0]
def terminate_instance(instance_id):
return _make_api_call('terminate_instances',
InstanceIds=[instance_id])
def reboot_instance(instance_id):
return _make_api_call('reboot_instances',
InstanceIds=[instance_id])
def ensure_vpc_exists(vpc_id):
return _make_api_call('describe_vpcs',
VpcIds=[vpc_id])
# Function to get metadata
def get_instance_metadata(path):
metadata_url = f"http://169.254.169.254/latest/meta-data/{path}"
token_url = "http://169.254.169.254/latest/api/token"
token_request = urllib.request.Request(token_url, method="PUT")
token_request.add_header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
try:
with urllib.request.urlopen(token_request) as token_response:
token = token_response.read().decode('utf-8')
metadata_request = urllib.request.Request(metadata_url)
metadata_request.add_header("X-aws-ec2-metadata-token", token)
with urllib.request.urlopen(metadata_request, timeout=5) as response:
return response.read().decode('utf-8')
except (urllib.error.URLError, socket.timeout, ConnectionError) as e:
if _is_timeout_exception(e):
LOG.debug("Communication with IMDSv2 timed out. This is likely not an EC2 instance.")
raise NotAnEC2Instance(e)
raise e
def _is_timeout_exception(exception: urllib.error.URLError) -> bool:
return (
isinstance(exception.__dict__.get('reason', False), TimeoutError)
or 'timed out' in str(exception)
)
def get_current_instance_details():
instance_id = get_instance_metadata('instance-id')
availability_zone = get_instance_metadata('placement/availability-zone')
region = availability_zone[:-1]
aws.set_region(region)
fileoperations.write_config_setting('global', 'default_region', region)
mac_address = get_instance_metadata('mac')
vpc_id = get_instance_metadata(f'network/interfaces/macs/{mac_address}/vpc-id')
subnet_id = get_instance_metadata(f'network/interfaces/macs/{mac_address}/subnet-id')
try:
ensure_vpc_exists(vpc_id)
instance = describe_instance(instance_id=instance_id)
except Exception as e:
if 'InvalidVpcID.NotFound' in str(e) or f"The vpc ID '{vpc_id}' does not exist" in str(e):
io.log_warning(f'Unable to retrieve details of VPC, {vpc_id}')
vpc_id, subnet_id, instance_id, security_group_ids, tags = None, None, None, [], []
elif 'InvalidInstanceID.NotFound' in str(e):
vpc_id, subnet_id, instance_id, security_group_ids, tags = None, None, None, [], []
io.log_warning(f'Unable to retrieve details of instance, {instance_id}')
instance = None
if instance:
security_group_ids = [sg['GroupId'] for sg in instance['SecurityGroups']]
else:
security_group_ids = []
try:
tags = instance_tags(instance_id)
except Exception:
# Probably client-error, ignore
tags = []
return {
'InstanceId': instance_id,
'VpcId': vpc_id,
'SubnetId': subnet_id,
'SecurityGroupIds': security_group_ids,
'Region': region,
'Tags': tags,
}
def list_subnets(vpc_id):
result = _make_api_call(
'describe_subnets',
Filters=[
{
'Name': 'vpc-id',
'Values': [vpc_id]
},
]
)
return [subnet['SubnetId'] for subnet in result['Subnets']]
def list_subnets_azs_interleaved(vpc_id):
result = _make_api_call(
'describe_subnets',
Filters=[
{
'Name': 'vpc-id',
'Values': [vpc_id]
},
]
)
subnet_map = collections.defaultdict(list)
subnet_ids = []
for subnet in result['Subnets']:
subnet_map[subnet['AvailabilityZone']].append(subnet['SubnetId'])
keys = subnet_map.keys()
while any([v for v in subnet_map.values()]):
for key in keys:
subnet_ids.append(subnet_map[key].pop())
return [subnet['SubnetId'] for subnet in result['Subnets']]
def get_instance_volumes(instance_id):
response = _make_api_call(
'describe_volumes',
Filters=[
{
'Name': 'attachment.instance-id',
'Values': [instance_id]
}
]
)
return response['Volumes']
def enable_ebs_volume_encryption():
_make_api_call('enable_ebs_encryption_by_default')
def instance_tags(instance_id):
response = _make_api_call(
'describe_tags',
Filters=[
{
'Name': 'resource-id',
'Values': [instance_id]
}
]
)
return [
{
"Key": tag['Key'],
"Value": tag['Value']
}
for tag in response['Tags']
]
def establish_security_group(ports, env_name, vpc_id):
ec2_security_group = get_security_group(f'{env_name}-EC2')
if ec2_security_group:
ec2_security_group_id = ec2_security_group['GroupId']
revoke_security_group_ingress(ec2_security_group_id, ec2_security_group['IpPermissions'])
else:
ec2_security_group_id = _create_security_group(f'{env_name}-EC2', vpc_id, f'EC2 Security group for {env_name}-EC2')
alb_security_group = get_security_group(f'{env_name}-ALB')
if alb_security_group:
alb_security_group_id = ec2_security_group['GroupId']
revoke_security_group_egress(alb_security_group_id, alb_security_group['IpPermissionsEgress'])
else:
alb_security_group_id = _create_security_group(f'{env_name}-ALB', vpc_id, f'ALB Security group for {env_name}-EC2')
ingress_permissions, egress_permissions = create_peer_security_group_permissions(
ports,
ec2_security_group_id,
alb_security_group_id,
vpc_id
)
authorize_security_group_egress(alb_security_group_id, egress_permissions)
authorize_security_group_ingress(ec2_security_group_id, ingress_permissions)
return [
{
'Namespace': 'aws:elbv2:loadbalancer',
'OptionName': 'SecurityGroups',
'Value': alb_security_group_id
},
{
'Namespace': 'aws:autoscaling:launchconfiguration',
'OptionName': 'SecurityGroups',
'Value': ec2_security_group_id
},
]
def create_peer_security_group_permissions(
ports,
from_security_group,
to_security_group,
vpc_id
):
egress_permissions, ingress_permissions = [], []
for port in ports:
egress_permission = define_group_pair_permission(
port,
from_security_group,
f"Rule to allow {from_security_group} to access {to_security_group} at port {port} over tcp"
)
ingress_permission = define_group_pair_permission(
port,
to_security_group,
f"Rule to allow {to_security_group} to receive traffic from {from_security_group} at {port} over tcp"
)
if vpc_id:
ingress_permission["UserIdGroupPairs"][0]["VpcId"] = vpc_id
egress_permission["UserIdGroupPairs"][0]["VpcId"] = vpc_id
egress_permissions.append(egress_permission)
ingress_permissions.append(ingress_permission)
return ingress_permissions, egress_permissions
def define_group_pair_permission(port, security_group_id, description):
return {
'IpProtocol': 'tcp',
'FromPort': port,
'ToPort': port,
"UserIdGroupPairs": [
{
"Description": description,
"GroupId": security_group_id,
}
],
}
def revoke_security_group_ingress(security_group_ip, ip_permissions):
kwargs = {
'GroupId': security_group_ip,
'IpPermissions': ip_permissions,
}
try:
_make_api_call(
'revoke_security_group_ingress',
**kwargs
)
except Exception as e:
if 'MissingParameter' in str(e) or "Either 'ipPermissions' or 'securityGroupRuleIds' should be provided." in str(e):
return
raise e
def revoke_security_group_egress(security_group_id, ip_permissions_egress):
kwargs = {
'GroupId': security_group_id,
'IpPermissions': ip_permissions_egress,
}
try:
_make_api_call(
'revoke_security_group_egress',
**kwargs
)
except Exception as e:
if 'MissingParameter' in str(e) or "Either 'ipPermissions' or 'securityGroupRuleIds' should be provided." in str(e):
return
raise e
def authorize_security_group_ingress(security_group_id, ingress_permissions):
kwargs = {
'GroupId': security_group_id,
'IpPermissions': ingress_permissions,
}
try:
_make_api_call(
'authorize_security_group_ingress',
**kwargs
)
except Exception as e:
if 'MissingParameter' in str(e):
return
raise e
def authorize_security_group_egress(security_group_id, egress_permissions):
kwargs = {
'GroupId': security_group_id,
'IpPermissions': egress_permissions,
}
try:
_make_api_call(
'authorize_security_group_egress',
**kwargs
)
except Exception as e:
if 'MissingParameter' in str(e):
return
if 'already exists' in str(e):
LOG.debug(f"Received non-fatal exception {str(e)} during invocation of ec2::authorize_security_group_egress.")
return
raise e
def get_security_group(group_name):
try:
response = _make_api_call(
'describe_security_groups',
GroupNames=[group_name]
)
return response['SecurityGroups'][0]
except Exception as e:
if 'InvalidGroup.NotFound' in str(e) or 'does not exist' in str(e):
return None
raise e
def _create_security_group(group_name, vpc_id, description):
kwargs = {
'GroupName': group_name,
'Description': description,
}
if vpc_id:
kwargs['VpcId'] = vpc_id
response = _make_api_call(
'create_security_group',
**kwargs
)
return response['GroupId']