perfkitbenchmarker/linux_benchmarks/cloud_redis_memtier_benchmark.py (99 lines of code) (raw):

# Copyright 2021 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Runs the memtier benchmark against managed Redis services. Spins up a cloud redis instance, runs memtier against it, then spins it down. """ import collections import itertools import pprint from absl import flags from absl import logging from perfkitbenchmarker import background_tasks from perfkitbenchmarker import configs from perfkitbenchmarker import linux_virtual_machine from perfkitbenchmarker import managed_memory_store from perfkitbenchmarker import sample from perfkitbenchmarker import vm_util from perfkitbenchmarker.linux_packages import memtier FLAGS = flags.FLAGS BENCHMARK_NAME = 'cloud_redis_memtier' BENCHMARK_CONFIG = f""" cloud_redis_memtier: description: Run memtier against cloud redis memory_store: service_type: memorystore memory_store_type: {managed_memory_store.REDIS} version: redis_6_x vm_groups: clients: vm_spec: *default_dual_core vm_count: 1 """ _LinuxVm = linux_virtual_machine.BaseLinuxVirtualMachine _ManagedRedis = managed_memory_store.BaseManagedMemoryStore def GetConfig(user_config): config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) if memtier.MEMTIER_RUN_MODE.value == memtier.MemtierMode.MEASURE_CPU_LATENCY: config['vm_groups']['clients']['vm_count'] += 1 return config def Prepare(benchmark_spec): """Prepare the cloud redis instance for memtier tasks. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. """ benchmark_spec.always_call_cleanup = True memtier_vms = benchmark_spec.vm_groups['clients'] background_tasks.RunThreaded(_Install, memtier_vms) cloud_redis_instance = benchmark_spec.memory_store memory_store_ip = cloud_redis_instance.GetMemoryStoreIp() memory_store_port = cloud_redis_instance.GetMemoryStorePort() password = cloud_redis_instance.GetMemoryStorePassword() memtier.Load(memtier_vms, memory_store_ip, memory_store_port, password) def _GetConnections( vms: list[_LinuxVm], redis_instance: _ManagedRedis ) -> list[memtier.MemtierConnection]: """Gets a list of connections mapping client VMs to shards.""" if len(vms) == 1: return [ memtier.MemtierConnection( vms[0], redis_instance.GetMemoryStoreIp(), redis_instance.GetMemoryStorePort(), ) ] # Spread shards by client VM (evenly distributed by zone) such that each # client VM gets an equal number of shards in each zone. connections = [] shards = redis_instance.GetShardEndpoints(vms[0]) shards_by_zone = collections.defaultdict(list) for shard in shards: shards_by_zone[shard.zone].append(shard) shards_by_vm = collections.defaultdict(list) # List shards alternating by zone and then distribute them to VMs. Example: # shards_by_zone = { # 'zone_a': [1, 2, 3] # 'zone_b': [4, 5, 6, 7] # } -> shards_list = [1, 2, 3, 4, 5, 6, 7] # vm1 gets [1, 3, 5, 7], vm2 gets [2, 4, 6] for shard_index, shard in enumerate( itertools.chain(*shards_by_zone.values()) ): vm_index = shard_index % len(vms) vm = vms[vm_index] connections.append(memtier.MemtierConnection(vm, shard.ip, shard.port)) shards_by_vm[vm].append(shard) logging.info('Shards by VM: %s', pprint.pformat(shards_by_vm)) return connections def _MeasureMemtierDistribution( redis_instance: _ManagedRedis, vms: list[_LinuxVm], ) -> list[sample.Sample]: """Runs and reports stats across a series of memtier runs.""" connections = _GetConnections(vms, redis_instance) return memtier.MeasureLatencyCappedThroughputDistribution( connections, redis_instance.GetMemoryStoreIp(), redis_instance.GetMemoryStorePort(), vms, redis_instance.shard_count, redis_instance.GetMemoryStorePassword(), ) def _Run(vms: list[_LinuxVm], redis_instance: _ManagedRedis): """Runs memtier based on provided flags.""" if memtier.MEMTIER_RUN_MODE.value == memtier.MemtierMode.MEASURE_CPU_LATENCY: return memtier.RunGetLatencyAtCpu(redis_instance, vms) if memtier.MEMTIER_LATENCY_CAPPED_THROUGHPUT.value: if memtier.MEMTIER_DISTRIBUTION_ITERATIONS.value: return _MeasureMemtierDistribution(redis_instance, vms) return memtier.MeasureLatencyCappedThroughput( vms[0], redis_instance.shard_count, redis_instance.GetMemoryStoreIp(), redis_instance.GetMemoryStorePort(), redis_instance.GetMemoryStorePassword(), ) return memtier.RunOverAllThreadsPipelinesAndClients( vms, redis_instance.GetMemoryStoreIp(), [redis_instance.GetMemoryStorePort()], redis_instance.GetMemoryStorePassword(), ) def Run(benchmark_spec): """Run benchmark and collect samples. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. Returns: A list of sample.Sample instances. """ memtier_vms = benchmark_spec.vm_groups['clients'] redis_instance: _ManagedRedis = benchmark_spec.memory_store return _Run(memtier_vms, redis_instance) def Cleanup(benchmark_spec): """Cleanup and delete redis instance. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. """ del benchmark_spec @vm_util.Retry(poll_interval=1) def _Install(vm): """Installs necessary client packages.""" vm.Install('memtier') vm.Install('redis_cli')