perfkitbenchmarker/linux_benchmarks/cloud_spanner_ycsb_benchmark.py (119 lines of code) (raw):
# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Run YCSB benchmark against Google Cloud Spanner.
By default, this benchmark provision 1 single-CPU VM and spawn 1 thread
to test Spanner. Configure the number of VMs via --ycsb_client_vms.
In some cases, sleep time in between loading and running may be required for
best performance. See https://cloud.google.com/spanner/docs/pre-warm-database
for best practices.
"""
from typing import Any
from absl import flags
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import configs
from perfkitbenchmarker import errors
from perfkitbenchmarker import sample
from perfkitbenchmarker import virtual_machine
from perfkitbenchmarker.linux_packages import ycsb
from perfkitbenchmarker.providers.gcp import gcp_spanner
BENCHMARK_NAME = 'cloud_spanner_ycsb'
BENCHMARK_DESCRIPTION = 'YCSB'
BENCHMARK_TABLE = 'usertable'
BENCHMARK_ZERO_PADDING = 12
REQUIRED_SCOPES = (
'https://www.googleapis.com/auth/spanner.admin',
'https://www.googleapis.com/auth/spanner.data',
)
BENCHMARK_CONFIG = f"""
cloud_spanner_ycsb:
description: >
Run YCSB against Google Cloud Spanner.
Configure the number of VMs via --ycsb_client_vms.
relational_db:
cloud: GCP
engine: spanner-googlesql
spanner_nodes: 1
spanner_description: {BENCHMARK_DESCRIPTION}
enable_freeze_restore: True
vm_groups:
default:
os_type: ubuntu2204 # Python 2
vm_spec: *default_dual_core
vm_count: 1
flags:
openjdk_version: 8
gcloud_scopes: >
{' '.join(REQUIRED_SCOPES)}"""
FLAGS = flags.FLAGS
flags.DEFINE_integer(
'cloud_spanner_ycsb_batchinserts',
1,
'The Cloud Spanner batch inserts used in the YCSB benchmark.',
)
flags.DEFINE_integer(
'cloud_spanner_ycsb_boundedstaleness',
0,
'The Cloud Spanner bounded staleness used in the YCSB benchmark.',
)
flags.DEFINE_enum(
'cloud_spanner_ycsb_readmode',
'query',
['query', 'read'],
'The Cloud Spanner read mode used in the YCSB benchmark.',
)
flags.DEFINE_list(
'cloud_spanner_ycsb_custom_vm_install_commands',
[],
'A list of strings. If specified, execute them on every '
'VM during the installation phase.',
)
def GetConfig(user_config):
config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
if FLAGS['ycsb_client_vms'].present:
config['relational_db']['vm_groups']['default'][
'vm_count'
] = FLAGS.ycsb_client_vms
return config
def CheckPrerequisites(_):
"""Validates correct flag usages before running this benchmark."""
for scope in REQUIRED_SCOPES:
if scope not in FLAGS.gcloud_scopes:
raise ValueError('Scope {} required.'.format(scope))
if ycsb.CPU_OPTIMIZATION.value and (
ycsb.CPU_OPTIMIZATION_MEASUREMENT_MINS.value
<= gcp_spanner.CPU_API_DELAY_MINUTES
):
raise errors.Setup.InvalidFlagConfigurationError(
f'measurement_mins {ycsb.CPU_OPTIMIZATION_MEASUREMENT_MINS.value} must'
' be greater than CPU_API_DELAY_MINUTES'
f' {gcp_spanner.CPU_API_DELAY_MINUTES}'
)
def Prepare(benchmark_spec):
"""Prepare the virtual machines to run cloud spanner benchmarks.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
benchmark_spec.always_call_cleanup = True
vms = benchmark_spec.vms
# Install required packages and copy credential files
background_tasks.RunThreaded(_Install, vms)
benchmark_spec.executor = ycsb.YCSBExecutor('cloudspanner')
spanner: gcp_spanner.GcpSpannerInstance = benchmark_spec.relational_db
spanner.CreateTables(_BuildSchema())
def _LoadDatabase(
executor: ycsb.YCSBExecutor,
spanner: gcp_spanner.GcpSpannerInstance,
vms: list[virtual_machine.VirtualMachine],
load_kwargs: dict[str, Any],
) -> list[sample.Sample]:
"""Loads the database with the specified infrastructure capacity."""
if spanner.restored or ycsb.SKIP_LOAD_STAGE.value:
return []
spanner.UpdateCapacityForLoad()
results = list(executor.Load(vms, load_kwargs=load_kwargs))
spanner.UpdateCapacityForRun()
return results
def Run(benchmark_spec):
"""Spawn YCSB and gather the results.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
Returns:
A list of sample.Sample instances.
"""
vms = benchmark_spec.vms
spanner: gcp_spanner.GcpSpannerInstance = benchmark_spec.relational_db
executor: ycsb.YCSBExecutor = benchmark_spec.executor
run_kwargs = {
'table': BENCHMARK_TABLE,
'zeropadding': BENCHMARK_ZERO_PADDING,
'cloudspanner.instance': spanner.instance_id,
'cloudspanner.database': spanner.database,
'cloudspanner.readmode': FLAGS.cloud_spanner_ycsb_readmode,
'cloudspanner.boundedstaleness': (
FLAGS.cloud_spanner_ycsb_boundedstaleness
),
'cloudspanner.batchinserts': FLAGS.cloud_spanner_ycsb_batchinserts,
}
# Uses overridden cloud spanner endpoint in gcloud configuration
end_point = spanner.GetApiEndPoint()
if end_point:
run_kwargs['cloudspanner.host'] = end_point
load_kwargs = run_kwargs.copy()
load_kwargs['core_workload_insertion_retry_limit'] = 100
samples = []
metadata = {'ycsb_client_type': 'java'}
samples += _LoadDatabase(executor, spanner, vms, load_kwargs)
samples += list(executor.Run(vms, run_kwargs=run_kwargs, database=spanner))
for result in samples:
result.metadata.update(metadata)
result.metadata.update(spanner.GetResourceMetadata())
return samples
def Cleanup(benchmark_spec):
"""Cleanup.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
del benchmark_spec
def _BuildSchema():
"""BuildSchema.
Returns:
A string of DDL for creating a Spanner table.
"""
fields = ',\n'.join(
[f'field{i} STRING(MAX)' for i in range(FLAGS.ycsb_field_count)]
)
return f"""
CREATE TABLE {BENCHMARK_TABLE} (
id STRING(MAX),
{fields}
) PRIMARY KEY(id)
"""
def _Install(vm):
"""Installs YCSB on the VM."""
vm.Install('ycsb')
# Run custom VM installation commands.
for command in FLAGS.cloud_spanner_ycsb_custom_vm_install_commands:
_, _ = vm.RemoteCommand(command)