perfkitbenchmarker/providers/aws/aws_container_service.py (536 lines of code) (raw):
# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains classes/functions related to AWS container clusters."""
import json
import os
import uuid
from absl import flags
from perfkitbenchmarker import container_service
from perfkitbenchmarker import context
from perfkitbenchmarker import errors
from perfkitbenchmarker import provider_info
from perfkitbenchmarker import resource
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.providers.aws import aws_load_balancer
from perfkitbenchmarker.providers.aws import aws_logs
from perfkitbenchmarker.providers.aws import aws_network
from perfkitbenchmarker.providers.aws import s3
from perfkitbenchmarker.providers.aws import util
import requests
import yaml
FLAGS = flags.FLAGS
_ECS_NOT_READY = frozenset(['PROVISIONING', 'PENDING'])
class EcrRepository(resource.BaseResource):
"""Class representing an Elastic Container Registry image repository."""
def __init__(self, name, region):
super().__init__()
self.name = name
self.region = region
def _Create(self):
"""Creates the image repository."""
if self._Exists():
self.user_managed = True
return
create_cmd = util.AWS_PREFIX + [
'ecr',
'create-repository',
'--region',
self.region,
'--repository-name',
self.name,
]
_, stderr, retcode = vm_util.IssueCommand(
create_cmd, raise_on_failure=False
)
if retcode:
if 'InsufficientInstanceCapacity' in stderr:
raise errors.Benchmarks.InsufficientCapacityCloudFailure(stderr)
if 'InstanceLimitExceeded' in stderr or 'VpcLimitExceeded' in stderr:
raise errors.Benchmarks.QuotaFailure(stderr)
raise errors.Resource.CreationError(
'Failed to create EKS Cluster: {} return code: {}'.format(
retcode, stderr
)
)
def _Exists(self):
"""Returns True if the repository exists."""
describe_cmd = util.AWS_PREFIX + [
'ecr',
'describe-repositories',
'--region',
self.region,
'--repository-names',
self.name,
]
stdout, _, _ = vm_util.IssueCommand(describe_cmd, raise_on_failure=False)
if not stdout or not json.loads(stdout)['repositories']:
return False
return True
def _Delete(self):
"""Deletes the repository."""
delete_cmd = util.AWS_PREFIX + [
'ecr',
'delete-repository',
'--region',
self.region,
'--repository-name',
self.name,
'--force',
]
vm_util.IssueCommand(delete_cmd, raise_on_failure=False)
class ElasticContainerRegistry(container_service.BaseContainerRegistry):
"""Class for building and storing container images on AWS."""
CLOUD = provider_info.AWS
def __init__(self, registry_spec):
super().__init__(registry_spec)
self.account = self.project or util.GetAccount()
self.region = util.GetRegionFromZone(self.zone.split(',')[0])
self.repositories = []
def _Delete(self):
"""Deletes the repositories."""
for repository in self.repositories:
repository.Delete()
def PrePush(self, image):
"""Prepares registry to push a given image."""
repository_name = '{namespace}/{name}'.format(
namespace=self.name, name=image.name
)
repository = EcrRepository(repository_name, self.region)
self.repositories.append(repository)
repository.Create()
def GetRegistryServer(self) -> str:
"""Returns the registry server url."""
return f'{self.account}.dkr.ecr.{self.region}.amazonaws.com'
def GetFullRegistryTag(self, image) -> str:
"""Returns the full tag of the image."""
return f'{self.GetRegistryServer()}/{self.name}/{image}'
def Login(self):
"""Logs in to the registry."""
get_login_cmd = util.AWS_PREFIX + [
'--region',
self.region,
'ecr',
'get-login-password',
]
stdout, _, _ = vm_util.IssueCommand(get_login_cmd)
docker_login_cmd = [
'docker',
'login',
'--username',
'AWS',
'--password',
stdout,
self.GetRegistryServer(),
]
vm_util.IssueCommand(docker_login_cmd)
def RemoteBuild(self, image):
"""Build the image remotely."""
# TODO(ehankland) use AWS codebuild to build the image.
raise NotImplementedError()
class TaskDefinition(resource.BaseResource):
"""Class representing an AWS task definition."""
def __init__(self, name, container_spec, cluster):
super().__init__()
self.name = name
self.cpus = container_spec.cpus
self.memory = container_spec.memory
self.image = container_spec.image
self.container_port = container_spec.container_port
self.region = cluster.region
self.arn = None
self.log_group = aws_logs.LogGroup(self.region, 'pkb')
def _CreateDependencies(self):
"""Create the log group if it doesn't exist."""
if not self.log_group.Exists():
self.log_group.Create()
def _Create(self):
"""Create the task definition."""
register_cmd = util.AWS_PREFIX + [
'--region',
self.region,
'ecs',
'register-task-definition',
'--family',
self.name,
'--execution-role-arn',
'ecsTaskExecutionRole',
'--network-mode',
'awsvpc',
'--requires-compatibilities=FARGATE',
'--cpu',
str(int(1024 * self.cpus)),
'--memory',
str(self.memory),
'--container-definitions',
self._GetContainerDefinitions(),
]
stdout, _, _ = vm_util.IssueCommand(register_cmd)
response = json.loads(stdout)
self.arn = response['taskDefinition']['taskDefinitionArn']
def _Delete(self):
"""Deregister the task definition."""
if self.arn is None:
return
deregister_cmd = util.AWS_PREFIX + [
'--region',
self.region,
'ecs',
'deregister-task-definition',
'--task-definition',
self.arn,
]
vm_util.IssueCommand(deregister_cmd)
def _GetContainerDefinitions(self):
"""Returns a JSON representation of the container definitions."""
definitions = [{
'name': self.name,
'image': self.image,
'essential': True,
'portMappings': [{
'containerPort': self.container_port,
'protocol': 'TCP',
}],
'logConfiguration': {
'logDriver': 'awslogs',
'options': {
'awslogs-group': 'pkb',
'awslogs-region': self.region,
'awslogs-stream-prefix': 'pkb',
},
},
}]
return json.dumps(definitions)
class EcsTask(container_service.BaseContainer):
"""Class representing an ECS/Fargate task."""
def __init__(self, name, container_spec, cluster):
super().__init__(container_spec)
self.name = name
self.task_def = cluster.task_defs[name]
self.arn = None
self.region = cluster.region
self.cluster_name = cluster.name
self.subnet_id = cluster.network.subnet.id
self.ip_address = None
self.security_group_id = (
cluster.network.regional_network.vpc.default_security_group_id
)
def _GetNetworkConfig(self):
network_config = {
'awsvpcConfiguration': {
'subnets': [self.subnet_id],
'securityGroups': [self.security_group_id],
'assignPublicIp': 'ENABLED',
}
}
return json.dumps(network_config)
def _GetOverrides(self):
"""Returns a JSON representaion of task overrides.
While the container level resources can be overridden, they have no
effect on task level resources for Fargate tasks. This means
that modifying a container spec will only affect the command of any
new containers launched from it and not cpu/memory.
"""
overrides = {
'containerOverrides': [{
'name': self.name,
}]
}
if self.command:
overrides['containerOverrides'][0]['command'] = self.command
return json.dumps(overrides)
def _Create(self):
"""Creates the task."""
run_cmd = util.AWS_PREFIX + [
'--region',
self.region,
'ecs',
'run-task',
'--cluster',
self.cluster_name,
'--task-definition',
self.task_def.arn,
'--launch-type',
'FARGATE',
'--network-configuration',
self._GetNetworkConfig(),
'--overrides',
self._GetOverrides(),
]
stdout, _, _ = vm_util.IssueCommand(run_cmd)
response = json.loads(stdout)
self.arn = response['tasks'][0]['taskArn']
def _PostCreate(self):
"""Gets the tasks IP address."""
container = self._GetTask()['containers'][0]
self.ip_address = container['networkInterfaces'][0]['privateIpv4Address']
def _DeleteDependencies(self):
"""Delete the task def."""
self.task_def.Delete()
def _Delete(self):
"""Deletes the task."""
if self.arn is None:
return
stop_cmd = util.AWS_PREFIX + [
'--region',
self.region,
'ecs',
'stop-task',
'--cluster',
self.cluster_name,
'--task',
self.arn,
]
vm_util.IssueCommand(stop_cmd)
def _GetTask(self):
"""Returns a dictionary representation of the task."""
describe_cmd = util.AWS_PREFIX + [
'--region',
self.region,
'ecs',
'describe-tasks',
'--cluster',
self.cluster_name,
'--tasks',
self.arn,
]
stdout, _, _ = vm_util.IssueCommand(describe_cmd)
response = json.loads(stdout)
return response['tasks'][0]
def _IsReady(self):
"""Returns true if the task has stopped pending."""
return self._GetTask()['lastStatus'] not in _ECS_NOT_READY
def WaitForExit(self, timeout=None):
"""Waits until the task has finished running."""
@vm_util.Retry(
timeout=timeout,
retryable_exceptions=(container_service.RetriableContainerException,),
)
def _WaitForExit():
task = self._GetTask()
if task['lastStatus'] != 'STOPPED':
raise container_service.RetriableContainerException(
'Task is not STOPPED.'
)
return task
return _WaitForExit()
def GetLogs(self):
"""Returns the logs from the container."""
assert self.arn is not None
task_id = self.arn.split('/')[-1]
log_stream = 'pkb/{name}/{task_id}'.format(name=self.name, task_id=task_id)
return str(
aws_logs.GetLogStreamAsString(self.region, log_stream, 'pkb')
)
class EcsService(container_service.BaseContainerService):
"""Class representing an ECS/Fargate service."""
def __init__(self, name, container_spec, cluster):
super().__init__(container_spec)
self.client_token = str(uuid.uuid4())[:32]
self.name = name
self.task_def = cluster.task_defs[name]
self.arn = None
self.region = cluster.region
self.cluster_name = cluster.name
self.subnet_id = cluster.network.subnet.id
self.security_group_id = (
cluster.network.regional_network.vpc.default_security_group_id
)
self.load_balancer = aws_load_balancer.LoadBalancer(
[cluster.network.subnet]
)
self.target_group = aws_load_balancer.TargetGroup(
cluster.network.regional_network.vpc, self.container_port
)
self.port = 80
def _CreateDependencies(self):
"""Creates the load balancer for the service."""
self.load_balancer.Create()
self.target_group.Create()
listener = aws_load_balancer.Listener(
self.load_balancer, self.target_group, self.port
)
listener.Create()
self.ip_address = self.load_balancer.dns_name
def _DeleteDependencies(self):
"""Deletes the service's load balancer."""
self.task_def.Delete()
self.load_balancer.Delete()
self.target_group.Delete()
# TODO(user): Consider supporting the flag container_cluster_version.
def _Create(self):
"""Creates the service."""
create_cmd = util.AWS_PREFIX + [
'--region',
self.region,
'ecs',
'create-service',
'--desired-count',
'1',
'--client-token',
self.client_token,
'--cluster',
self.cluster_name,
'--service-name',
self.name,
'--task-definition',
self.task_def.arn,
'--launch-type',
'FARGATE',
'--network-configuration',
self._GetNetworkConfig(),
'--load-balancers',
self._GetLoadBalancerConfig(),
]
vm_util.IssueCommand(create_cmd)
def _Delete(self):
"""Deletes the service."""
update_cmd = util.AWS_PREFIX + [
'--region',
self.region,
'ecs',
'update-service',
'--cluster',
self.cluster_name,
'--service',
self.name,
'--desired-count',
'0',
]
vm_util.IssueCommand(update_cmd)
delete_cmd = util.AWS_PREFIX + [
'--region',
self.region,
'ecs',
'delete-service',
'--cluster',
self.cluster_name,
'--service',
self.name,
]
vm_util.IssueCommand(delete_cmd, raise_on_failure=False)
def _GetNetworkConfig(self):
network_config = {
'awsvpcConfiguration': {
'subnets': [self.subnet_id],
'securityGroups': [self.security_group_id],
'assignPublicIp': 'ENABLED',
}
}
return json.dumps(network_config)
def _GetLoadBalancerConfig(self):
"""Returns the JSON representation of the service load balancers."""
load_balancer_config = [{
'targetGroupArn': self.target_group.arn,
'containerName': self.name,
'containerPort': self.container_port,
}]
return json.dumps(load_balancer_config)
def _IsReady(self):
"""Returns True if the Service is ready."""
url = 'http://%s' % self.ip_address
try:
r = requests.get(url)
except requests.ConnectionError:
return False
if r.status_code == 200:
return True
return False
class FargateCluster(container_service.BaseContainerCluster):
"""Class representing an AWS Fargate cluster."""
CLOUD = provider_info.AWS
CLUSTER_TYPE = 'Fargate'
def __init__(self, cluster_spec):
super().__init__(cluster_spec)
self.region = util.GetRegionFromZone(self.zone)
self.network = aws_network.AwsNetwork.GetNetwork(self)
self.firewall = aws_network.AwsFirewall.GetFirewall()
self.name = 'pkb-%s' % FLAGS.run_uri
self.task_defs = {}
self.arn = None
def _Create(self):
"""Creates the cluster."""
create_cmd = util.AWS_PREFIX + [
'--region',
self.region,
'ecs',
'create-cluster',
'--cluster-name',
self.name,
]
stdout, _, _ = vm_util.IssueCommand(create_cmd)
response = json.loads(stdout)
self.arn = response['cluster']['clusterArn']
def _Exists(self):
"""Returns True if the cluster exists."""
if not self.arn:
return False
describe_cmd = util.AWS_PREFIX + [
'--region',
self.region,
'ecs',
'describe-clusters',
'--clusters',
self.arn,
]
stdout, _, _ = vm_util.IssueCommand(describe_cmd)
response = json.loads(stdout)
clusters = response['clusters']
if not clusters or clusters[0]['status'] == 'INACTIVE':
return False
return True
def _Delete(self):
"""Deletes the cluster."""
delete_cmd = util.AWS_PREFIX + [
'--region',
self.region,
'ecs',
'delete-cluster',
'--cluster',
self.name,
]
vm_util.IssueCommand(delete_cmd, raise_on_failure=False)
def DeployContainer(self, name, container_spec):
"""Deploys the container according to the spec."""
if name not in self.task_defs:
task_def = TaskDefinition(name, container_spec, self)
self.task_defs[name] = task_def
task_def.Create()
task = EcsTask(name, container_spec, self)
self.containers[name].append(task)
task.Create()
def DeployContainerService(self, name, container_spec):
"""Deploys the container service according to the spec."""
if name not in self.task_defs:
task_def = TaskDefinition(name, container_spec, self)
self.task_defs[name] = task_def
task_def.Create()
service = EcsService(name, container_spec, self)
self.services[name] = service
self.firewall.AllowPortInSecurityGroup(
service.region, service.security_group_id, service.container_port
)
service.Create()
class AwsKopsCluster(container_service.KubernetesCluster):
"""Class representing a kops based Kubernetes cluster."""
CLOUD = provider_info.AWS
CLUSTER_TYPE = 'kops'
def __init__(self, spec):
super().__init__(spec)
self.name += '.k8s.local'
self.config_bucket = 'kops-%s-%s' % (FLAGS.run_uri, str(uuid.uuid4()))
self.region = util.GetRegionFromZone(self.zone)
self.s3_service = s3.S3Service()
self.s3_service.PrepareService(self.region)
def _CreateDependencies(self):
"""Create the bucket to store cluster config."""
self.s3_service.MakeBucket(self.config_bucket)
def _DeleteDependencies(self):
"""Delete the bucket that stores cluster config."""
self.s3_service.DeleteBucket(self.config_bucket)
def _Create(self):
"""Creates the cluster."""
# Create the cluster spec but don't provision any resources.
create_cmd = [
FLAGS.kops,
'create',
'cluster',
'--name=%s' % self.name,
'--zones=%s' % self.default_node_pool.zone,
'--node-count=%s' % self.default_node_pool.num_nodes,
'--node-size=%s' % self.default_node_pool.machine_type,
]
env = os.environ.copy()
env['KUBECONFIG'] = FLAGS.kubeconfig
env['KOPS_STATE_STORE'] = 's3://%s' % self.config_bucket
vm_util.IssueCommand(create_cmd, env=env)
# Download the cluster spec and modify it.
get_cmd = [FLAGS.kops, 'get', 'cluster', self.name, '--output=yaml']
stdout, _, _ = vm_util.IssueCommand(get_cmd, env=env)
spec = yaml.safe_load(stdout)
spec['metadata']['creationTimestamp'] = None
spec['spec']['api']['loadBalancer']['idleTimeoutSeconds'] = 3600
benchmark_spec = context.GetThreadBenchmarkSpec()
spec['spec']['cloudLabels'] = {
'owner': FLAGS.owner,
'perfkitbenchmarker-run': FLAGS.run_uri,
'benchmark': benchmark_spec.name,
'perfkit_uuid': benchmark_spec.uuid,
'benchmark_uid': benchmark_spec.uid,
}
# Replace the cluster spec.
with vm_util.NamedTemporaryFile() as tf:
yaml.dump(spec, tf)
tf.close()
replace_cmd = [FLAGS.kops, 'replace', '--filename=%s' % tf.name]
vm_util.IssueCommand(replace_cmd, env=env)
# Create the actual cluster.
update_cmd = [FLAGS.kops, 'update', 'cluster', self.name, '--yes']
vm_util.IssueCommand(update_cmd, env=env)
def _Delete(self):
"""Deletes the cluster."""
super()._Delete()
delete_cmd = [
FLAGS.kops,
'delete',
'cluster',
'--name=%s' % self.name,
'--state=s3://%s' % self.config_bucket,
'--yes',
]
vm_util.IssueCommand(delete_cmd, raise_on_failure=False)
def _IsReady(self):
"""Returns True if the cluster is ready, else False."""
validate_cmd = [
FLAGS.kops,
'validate',
'cluster',
'--name=%s' % self.name,
'--state=s3://%s' % self.config_bucket,
]
env = os.environ.copy()
env['KUBECONFIG'] = FLAGS.kubeconfig
_, _, retcode = vm_util.IssueCommand(
validate_cmd, env=env, raise_on_failure=False
)
return not retcode