perfkitbenchmarker/providers/aws/aws_memorydb.py (162 lines of code) (raw):
# Copyright 2018 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module containing class for AWS' MemoryDB redis clusters.
Currently the following configurations are supported:
- Single primary node.
- Single primary node and single replica that provides failover into another
zone in the same region.
"""
import json
import logging
from typing import Any
from absl import flags
from perfkitbenchmarker import errors
from perfkitbenchmarker import managed_memory_store
from perfkitbenchmarker import provider_info
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.providers.aws import aws_network
from perfkitbenchmarker.providers.aws import flags as aws_flags
from perfkitbenchmarker.providers.aws import util
FLAGS = flags.FLAGS
_DEFAULT_VERSION = '7.1'
REDIS_VERSION_MAPPING = {
'redis_7_1': '7.1',
}
class AwsMemoryDb(managed_memory_store.BaseManagedMemoryStore):
"""Object representing a AWS MemoryDB instance."""
CLOUD = provider_info.AWS
SERVICE_TYPE = 'memorydb'
MEMORY_STORE = managed_memory_store.REDIS
# AWS Clusters can take up to 2 hours to create
READY_TIMEOUT = 120 * 60
def __init__(self, spec):
super().__init__(spec)
self.subnet_group_name = 'subnet-%s' % self.name
self.version = REDIS_VERSION_MAPPING[spec.version]
self.node_type = aws_flags.MEMORYDB_NODE_TYPE.value
self.redis_region = managed_memory_store.REGION.value
self.failover_zone = aws_flags.MEMORYDB_FAILOVER_ZONE.value
self.failover_subnet = None
def GetResourceMetadata(self) -> dict[str, Any]:
"""Returns a dict containing metadata about the instance.
Returns:
dict mapping string property key to value.
"""
self.metadata.update({
'cloud_redis_version': self.version,
'cloud_redis_node_type': self.node_type,
'cloud_redis_region': self.redis_region,
'cloud_redis_primary_zone': self._GetClientVm().zone,
'cloud_redis_failover_zone': self.failover_zone,
})
return self.metadata
def _CreateDependencies(self) -> None:
"""Create the subnet dependencies."""
subnet_id = self._GetClientVm().network.subnet.id
cmd = [
'aws',
'memorydb',
'create-subnet-group',
'--subnet-group-name',
self.subnet_group_name,
'--region',
self.redis_region,
'--subnet-ids',
subnet_id,
]
if self.failover_zone:
regional_network = self._GetClientVm().network.regional_network
vpc_id = regional_network.vpc.id
cidr = regional_network.vpc.NextSubnetCidrBlock()
self.failover_subnet = aws_network.AwsSubnet(
self.failover_zone, vpc_id, cidr_block=cidr
)
self.failover_subnet.Create()
cmd += [self.failover_subnet.id]
cmd += ['--tags']
cmd += util.MakeFormattedDefaultTags()
vm_util.IssueCommand(cmd)
def _DeleteDependencies(self) -> None:
"""Delete the subnet dependencies."""
cmd = [
'aws',
'memorydb',
'delete-subnet-group',
'--region=%s' % self.redis_region,
'--subnet-group-name=%s' % self.subnet_group_name,
]
vm_util.IssueCommand(cmd, raise_on_failure=False)
if self.failover_subnet:
self.failover_subnet.Delete()
def _Create(self) -> None:
"""Creates the cluster."""
cmd = [
'aws',
'memorydb',
'create-cluster',
'--cluster-name',
self.name,
'--node-type',
self.node_type,
'--engine',
'redis',
'--engine-version',
self.version,
'--num-shards',
'1',
'--num-replicas-per-shard',
str(self.replicas_per_shard),
'--parameter-group-name',
'default.memorydb-redis7.search',
'--subnet-group-name',
self.subnet_group_name,
'--acl-name',
'open-access',
'--region',
self.redis_region,
]
if self.enable_tls:
cmd += [
'--tls-enabled',
]
else:
cmd += [
'--no-tls-enabled',
]
cmd += ['--tags']
cmd += util.MakeFormattedDefaultTags()
vm_util.IssueCommand(cmd, raise_on_failure=False)
def _Delete(self):
"""Deletes the cluster."""
cmd = [
'aws',
'memorydb',
'delete-cluster',
'--cluster-name',
self.name,
'--region',
self.redis_region,
]
vm_util.IssueCommand(cmd, raise_on_failure=False)
def _IsDeleting(self) -> bool:
"""Returns True if cluster is being deleted and false otherwise."""
cluster_info = self.DescribeCluster()
return cluster_info.get('Status', '') == 'deleting'
def _IsReady(self) -> bool:
"""Returns True if cluster is ready and false otherwise."""
cluster_info = self.DescribeCluster()
return cluster_info.get('Status', '') == 'available'
def _Exists(self) -> bool:
"""Returns true if the cluster exists and is not being deleted."""
cluster_info = self.DescribeCluster()
return 'Status' in cluster_info and cluster_info['Status'] not in [
'deleting',
'create-failed',
]
def DescribeCluster(self) -> dict[str, Any]:
"""Returns the CLI describe output for the cluster."""
cmd = [
'aws',
'memorydb',
'describe-clusters',
'--cluster-name',
self.name,
'--region',
self.redis_region,
]
stdout, stderr, retcode = vm_util.IssueCommand(cmd, raise_on_failure=False)
if retcode != 0:
logging.info('Could not find cluster %s, %s', self.name, stderr)
return {}
for cluster_info in json.loads(stdout)['Clusters']:
if cluster_info['Name'] == self.name:
return cluster_info
return {}
@vm_util.Retry(max_retries=5)
def _PopulateEndpoint(self) -> None:
"""Populates address and port information from cluster_info.
Raises:
errors.Resource.RetryableGetError:
Failed to retrieve information on cluster
"""
cluster_info = self.DescribeCluster()
if not cluster_info:
raise errors.Resource.RetryableGetError(
f'Failed to retrieve information on {self.name}'
)
primary_endpoint = cluster_info['ClusterEndpoint']
self._ip = primary_endpoint['Address']
self._port = primary_endpoint['Port']