perfkitbenchmarker/linux_benchmarks/aws_dynamodb_ycsb_benchmark.py (204 lines of code) (raw):

# Copyright 2018 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Run YCSB benchmark against AWS DynamoDB. This benchmark does not provision VMs for the corresponding DynamboDB database. The only VM group is client group that sends requests to specifiedDB. TODO: add DAX option. TODO: add global table option. """ from collections.abc import Collection, Iterable, MutableMapping import logging from typing import Any from absl import flags from perfkitbenchmarker import background_tasks from perfkitbenchmarker import configs from perfkitbenchmarker import errors from perfkitbenchmarker import sample from perfkitbenchmarker import virtual_machine from perfkitbenchmarker import vm_util from perfkitbenchmarker.linux_packages import aws_credentials from perfkitbenchmarker.linux_packages import ycsb from perfkitbenchmarker.providers.aws import aws_dynamodb _INITIAL_WRITES = flags.DEFINE_integer( 'aws_dynamodb_ycsb_provision_wcu', 10000, 'The provisioned WCU to use during the load phase.', ) flags.register_validator( 'aws_dynamodb_ycsb_provision_wcu', lambda wcu: wcu >= 1, message='WCU must be >=1 to load successfully.', ) _CONSISTENT_READS = flags.DEFINE_boolean( 'aws_dynamodb_ycsb_consistentReads', False, 'Consistent reads cost 2x eventual reads. ' "'false' is default which is eventual", ) _MAX_CONNECTIONS = flags.DEFINE_integer( 'aws_dynamodb_connectMax', 50, 'Maximum number of concurrent dynamodb connections. Defaults to 50.', ) _RAMP_UP = flags.DEFINE_boolean( 'aws_dynamodb_ycsb_ramp_up', False, 'If true, runs YCSB with a target throughput equal to the provisioned qps ' 'of the instance and increments until a max throughput is found.', ) _PROVISIONED_QPS = flags.DEFINE_boolean( 'aws_dynamodb_ycsb_provisioned_qps', False, 'If true, runs YCSB with a target throughput equal to the provisioned qps ' 'of the instance and returns the result.', ) _CLI_PROFILE = flags.DEFINE_string( 'aws_dynamodb_ycsb_cli_profile', None, 'Local AWS CLI profile to use with YCSB. Must be long term crendentials. ' '"default" will work with basic CLI set up. ' 'Using an IAM user that only has DynamoDB access limits the scope of ' 'credentials copied into the VM.', ) _RESTORE_VM_COUNT = flags.DEFINE_integer( 'aws_dynamodb_ycsb_restore_vm_count', None, 'Number of VMs to use for the benchmark if the restore flag is set. This is' ' used mostly as an override and should not generally be used manually. Use' ' --ycsb_client_vms instead to set the VMs for a normal run.', ) FLAGS = flags.FLAGS _TARGET_QPS_INCREMENT = 1000 _QPS_THRESHOLD = 100 BENCHMARK_NAME = 'aws_dynamodb_ycsb' BENCHMARK_CONFIG = """ aws_dynamodb_ycsb: description: > Run YCSB against AWS DynamoDB. Configure the number of VMs via --ycsb_client_vms. non_relational_db: service_type: dynamodb zone: us-east-1a enable_freeze_restore: True vm_groups: default: os_type: ubuntu2204 # Python 2 vm_spec: *default_dual_core vm_count: 1 flags: openjdk_version: 8""" def GetConfig(user_config): config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) if FLAGS['ycsb_client_vms'].present: config['vm_groups']['default']['vm_count'] = FLAGS.ycsb_client_vms # Override the VM count for restore runs (i.e. when some do not need multiple # VMs for loading). if FLAGS['restore'].present and _RESTORE_VM_COUNT.value is not None: config['vm_groups']['default']['vm_count'] = _RESTORE_VM_COUNT.value return config def CheckPrerequisites(benchmark_config): """Verifies that the required resources are present. Args: benchmark_config: Unused. Raises: perfkitbenchmarker.data.ResourceNotFound: On missing resource. """ del benchmark_config if not _CLI_PROFILE.value: raise errors.Config.MissingOption( '--aws_dynamodb_ycsb_cli_profile must be explicitly passed. "default" ' 'will use the default CLI profile and works with basic long lived ' 'credentials. Using DynamoDB specific credentials limits what is ' 'copied into the VM.' ) ycsb.CheckPrerequisites() def _GetYcsbArgs( client: virtual_machine.VirtualMachine, instance: aws_dynamodb.AwsDynamoDBInstance, ) -> dict[str, Any]: """Returns args to pass to YCSB.""" run_kwargs = { 'dynamodb.awsCredentialsFile': GetRemoteCredentialsFullPath(client), 'dynamodb.primaryKey': instance.primary_key, 'dynamodb.endpoint': instance.GetEndPoint(), 'dynamodb.region': instance.region, 'table': instance.table_name, } if FLAGS.aws_dynamodb_use_sort: run_kwargs.update({ 'dynamodb.primaryKeyType': 'HASH_AND_RANGE', 'aws_dynamodb_connectMax': _MAX_CONNECTIONS.value, 'dynamodb.hashKeyName': instance.primary_key, 'dynamodb.primaryKey': instance.sort_key, }) if _CONSISTENT_READS.value: run_kwargs['dynamodb.consistentReads'] = 'true' return run_kwargs def Prepare(benchmark_spec): """Install YCSB on the target vm. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. """ benchmark_spec.always_call_cleanup = True vms = benchmark_spec.vms # Install required packages. background_tasks.RunThreaded(_Install, vms) benchmark_spec.executor = ycsb.YCSBExecutor('dynamodb') instance: aws_dynamodb.AwsDynamoDBInstance = benchmark_spec.non_relational_db # Restored instances have already been loaded with data. if instance.restored: return load_kwargs = _GetYcsbArgs(vms[0], instance) if FLAGS['ycsb_preload_threads'].present: load_kwargs['threads'] = FLAGS.ycsb_preload_threads # More WCU results in a faster load stage. if instance.wcu < _INITIAL_WRITES.value and not instance.IsServerless(): instance.SetThroughput(wcu=_INITIAL_WRITES.value) benchmark_spec.executor.Load(vms, load_kwargs=load_kwargs) # Reset the WCU to the initial level. if not instance.IsServerless(): instance.SetThroughput() def Run(benchmark_spec): """Run YCSB on the target vm. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. Returns: A list of sample.Sample objects. """ vms = benchmark_spec.vms instance: aws_dynamodb.AwsDynamoDBInstance = benchmark_spec.non_relational_db run_kwargs = _GetYcsbArgs(vms[0], instance) samples = [] if _RAMP_UP.value: samples += RampUpRun(benchmark_spec.executor, instance, vms, run_kwargs) elif _PROVISIONED_QPS.value: samples += ExactThroughputRun( benchmark_spec.executor, instance, vms, run_kwargs ) else: samples += list(benchmark_spec.executor.Run(vms, run_kwargs=run_kwargs)) benchmark_metadata = { 'ycsb_client_vms': len(vms), 'aws_dynamodb_consistentReads': _CONSISTENT_READS.value, 'aws_dynamodb_connectMax': _MAX_CONNECTIONS.value, } for s in samples: s.metadata.update(instance.GetResourceMetadata()) s.metadata.update(benchmark_metadata) return samples def _ExtractThroughput(samples: Iterable[sample.Sample]) -> float: for result in samples: if result.metric == 'overall Throughput': return result.value return 0.0 def _GetTargetQps(db: aws_dynamodb.AwsDynamoDBInstance) -> int: """Gets the target QPS for eventual and strong reads.""" read_multiplier = 1 if _CONSISTENT_READS.value else 2 rcu = 0 if db.rcu <= 25 else db.rcu * read_multiplier wcu = 0 if db.wcu <= 25 else db.wcu return rcu + wcu def RampUpRun( executor: ycsb.YCSBExecutor, db: aws_dynamodb.AwsDynamoDBInstance, vms: Collection[virtual_machine.VirtualMachine], run_kwargs: MutableMapping[str, Any], ) -> list[sample.Sample]: """Runs YCSB starting from provisioned QPS until max throughput is found.""" # Database is already provisioned with the correct QPS. qps = _GetTargetQps(db) max_throughput = 0 while True: run_kwargs['target'] = qps run_samples = executor.Run(vms, run_kwargs=run_kwargs) throughput = _ExtractThroughput(run_samples) logging.info( 'Run had throughput target %s and measured throughput %s.', qps, throughput, ) if throughput < max_throughput + _QPS_THRESHOLD: logging.info('Found maximum throughput %s.', throughput) for s in run_samples: s.metadata['qps_threshold'] = _QPS_THRESHOLD s.metadata['ramp_up'] = True return run_samples max_throughput = throughput qps += _TARGET_QPS_INCREMENT def ExactThroughputRun( executor: ycsb.YCSBExecutor, db: aws_dynamodb.AwsDynamoDBInstance, vms: Collection[virtual_machine.VirtualMachine], run_kwargs: MutableMapping[str, Any], ) -> list[sample.Sample]: """Runs YCSB at provisioned QPS.""" run_kwargs['target'] = _GetTargetQps(db) run_samples = executor.Run(vms, run_kwargs=run_kwargs) for s in run_samples: s.metadata['provisioned_qps'] = True return run_samples def Cleanup(benchmark_spec): """Cleanup YCSB on the target vm. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. """ del benchmark_spec AWS_CREDENTIALS_FILE = 'aws_credentials.properties' def GetRemoteCredentialsFullPath(vm): """Returns the full path for the generated AWS credentials file.""" result, _ = vm.RemoteCommand(f'echo ~/{AWS_CREDENTIALS_FILE}') return result.strip() def GenerateCredentials(vm) -> None: """Generates AWS credentials properties file and pushes it to the VM.""" key_id, secret = aws_credentials.GetCredentials(profile=_CLI_PROFILE.value) with vm_util.NamedTemporaryFile(prefix=AWS_CREDENTIALS_FILE, mode='w') as f: f.write(f""" accessKey={key_id} secretKey={secret} """) f.close() vm.PushFile(f.name, GetRemoteCredentialsFullPath(vm)) def _Install(vm): """Install YCSB on client 'vm'.""" vm.Install('ycsb') GenerateCredentials(vm)