perfkitbenchmarker/linux_benchmarks/hbase_ycsb_benchmark.py (143 lines of code) (raw):
# Copyright 2015 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Runs YCSB against HBase.
HBase is a scalable NoSQL database built on Hadoop.
https://hbase.apache.org/
A running installation consists of:
* An HDFS NameNode.
* HDFS DataNodes.
* An HBase master node.
* HBase regionservers.
* A zookeeper cluster (https://zookeeper.apache.org/).
See: http://hbase.apache.org/book.html#_distributed.
This benchmark provisions:
* A single node functioning as HDFS NameNode, HBase master, and zookeeper
quorum member.
* '--num_vms - 1' nodes serving as both HDFS DataNodes and HBase region
servers (so region servers and data are co-located).
By default only the master node runs Zookeeper. Some regionservers may be added
to the zookeeper quorum with the --hbase_zookeeper_nodes flag.
HBase web UI on 15030.
HDFS web UI on 50070.
"""
import functools
import logging
import os
import posixpath
from absl import flags
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import configs
from perfkitbenchmarker import data
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.linux_packages import hadoop
from perfkitbenchmarker.linux_packages import hbase
from perfkitbenchmarker.linux_packages import ycsb
FLAGS = flags.FLAGS
flags.DEFINE_string(
'hbase_binding',
'hbase12',
'HBase binding to use. After '
'YCSB 0.17.0, "hbase1x" is renamed to "hbase1".',
)
flags.DEFINE_integer('hbase_zookeeper_nodes', 1, 'Number of Zookeeper nodes.')
flags.DEFINE_boolean(
'hbase_use_snappy', True, 'Whether to use snappy compression.'
)
BENCHMARK_NAME = 'hbase_ycsb'
BENCHMARK_CONFIG = """
hbase_ycsb:
description: >
Run YCSB against HBase. Specify the HBase
cluster size with --num_vms. Specify the number of YCSB VMs
with --ycsb_client_vms.
vm_groups:
master:
vm_spec: *default_dual_core
disk_spec: *default_500_gb
workers:
vm_spec: *default_dual_core
disk_spec: *default_500_gb
clients:
os_type: ubuntu2204 # Python 2
vm_spec: *default_dual_core
"""
HBASE_SITE = 'hbase-site.xml'
CREATE_TABLE_SCRIPT = 'hbase/create-ycsb-table.hbaseshell.j2'
TABLE_NAME = 'usertable'
COLUMN_FAMILY = 'cf'
TABLE_SPLIT_COUNT = 200
def GetConfig(user_config):
config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
num_vms = max(FLAGS.num_vms, 2)
if FLAGS['num_vms'].present and FLAGS.num_vms < 2:
raise ValueError('hbase_ycsb requires at least 2 HBase VMs.')
if FLAGS['ycsb_client_vms'].present:
config['vm_groups']['clients']['vm_count'] = FLAGS.ycsb_client_vms
if FLAGS['num_vms'].present:
config['vm_groups']['workers']['vm_count'] = num_vms - 1
return config
def CheckPrerequisites(benchmark_config):
"""Verifies that the required resources are present.
Raises:
perfkitbenchmarker.data.ResourceNotFound: On missing resource.
"""
hbase.CheckPrerequisites()
hadoop.CheckPrerequisites()
ycsb.CheckPrerequisites()
def CreateYCSBTable(
vm,
table_name=TABLE_NAME,
family=COLUMN_FAMILY,
n_splits=TABLE_SPLIT_COUNT,
limit_filesize=True,
use_snappy=True,
):
"""Create a table for use with YCSB.
Args:
vm: Virtual machine from which to create the table.
table_name: Name for the table.
family: Column family name.
limit_filesize: boolean. Should the filesize be limited to 4GB?
n_splits: Initial number of regions for the table. Default follows
HBASE-4163.
"""
# See: https://issues.apache.org/jira/browse/HBASE-4163
template_path = data.ResourcePath(CREATE_TABLE_SCRIPT)
remote = posixpath.join(
hbase.HBASE_DIR, os.path.basename(os.path.splitext(template_path)[0])
)
vm.RenderTemplate(
template_path,
remote,
context={
'table_name': table_name,
'family': family,
'limit_filesize': limit_filesize,
'n_splits': n_splits,
'use_snappy': use_snappy,
},
)
# TODO(user): on HBase update, add '-n' flag.
command = '{}/hbase shell {}'.format(hbase.HBASE_BIN, remote)
vm.RemoteCommand(command)
def _GetVMsByRole(vm_groups):
"""Partition "vms" by role in the benchmark.
* The first VM is the master.
* The first FLAGS.hbase_zookeeper_nodes form the Zookeeper quorum.
* The last FLAGS.ycsb_client_vms are loader nodes.
* The nodes which are neither the master nor loaders are HBase region servers.
Args:
vm_groups: The benchmark_spec's vm_groups dict.
Returns:
A dictionary with keys 'vms', 'hbase_vms', 'master', 'zk_quorum', 'workers',
and 'clients'.
"""
hbase_vms = vm_groups['master'] + vm_groups['workers']
vms = hbase_vms + vm_groups['clients']
return {
'vms': vms,
'hbase_vms': hbase_vms,
'master': vm_groups['master'][0],
'zk_quorum': hbase_vms[: FLAGS.hbase_zookeeper_nodes],
'workers': vm_groups['workers'],
'clients': vm_groups['clients'],
}
def Prepare(benchmark_spec):
"""Prepare the virtual machines to run hadoop.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
by_role = _GetVMsByRole(benchmark_spec.vm_groups)
loaders = by_role['clients']
assert loaders, 'No loader VMs: {}'.format(by_role)
# HBase cluster
hbase_vms = by_role['hbase_vms']
assert hbase_vms, 'No HBase VMs: {}'.format(by_role)
master = by_role['master']
zk_quorum = by_role['zk_quorum']
assert zk_quorum, 'No zookeeper quorum: {}'.format(by_role)
workers = by_role['workers']
assert workers, 'No workers: {}'.format(by_role)
hbase_install_fns = [
functools.partial(vm.Install, 'hbase') for vm in hbase_vms
]
ycsb_install_fns = [functools.partial(vm.Install, 'ycsb') for vm in loaders]
background_tasks.RunThreaded(
lambda f: f(), hbase_install_fns + ycsb_install_fns
)
hadoop.ConfigureAndStart(master, workers, start_yarn=False)
hbase.ConfigureAndStart(master, workers, zk_quorum)
CreateYCSBTable(master, use_snappy=FLAGS.hbase_use_snappy)
# Populate hbase-site.xml on the loaders.
master.PullFile(
vm_util.GetTempDir(), posixpath.join(hbase.HBASE_CONF_DIR, HBASE_SITE)
)
def PushHBaseSite(vm):
conf_dir = posixpath.join(
ycsb.YCSB_DIR, FLAGS.hbase_binding + '-binding', 'conf'
)
vm.RemoteCommand('mkdir -p {}'.format(conf_dir))
vm.PushFile(
os.path.join(vm_util.GetTempDir(), HBASE_SITE),
posixpath.join(conf_dir, HBASE_SITE),
)
background_tasks.RunThreaded(PushHBaseSite, loaders)
benchmark_spec.executor = ycsb.YCSBExecutor(FLAGS.hbase_binding)
def Run(benchmark_spec):
"""Spawn YCSB and gather the results.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
Returns:
A list of sample.Sample objects.
"""
by_role = _GetVMsByRole(benchmark_spec.vm_groups)
loaders = by_role['clients']
logging.info('Loaders: %s', loaders)
metadata = {
'ycsb_client_vms': len(loaders),
'hbase_cluster_size': len(by_role['hbase_vms']),
'hbase_zookeeper_nodes': FLAGS.hbase_zookeeper_nodes,
'hbase_version': hbase.GetHBaseVersion(by_role['hbase_vms'][0]),
}
# By default YCSB uses a BufferedMutator for Puts / Deletes.
# This leads to incorrect update latencies, since since the call returns
# before the request is acked by the server.
# Disable this behavior during the benchmark run.
run_kwargs = {'columnfamily': COLUMN_FAMILY, 'clientbuffering': 'false'}
load_kwargs = run_kwargs.copy()
# During the load stage, use a buffered mutator with a single thread.
# The BufferedMutator will handle multiplexing RPCs.
load_kwargs['clientbuffering'] = 'true'
if not FLAGS['ycsb_preload_threads'].present:
load_kwargs['threads'] = 1
samples = list(
benchmark_spec.executor.LoadAndRun(
loaders, load_kwargs=load_kwargs, run_kwargs=run_kwargs
)
)
for sample in samples:
sample.metadata.update(metadata)
return samples
def Cleanup(benchmark_spec):
"""Cleanup.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
by_role = _GetVMsByRole(benchmark_spec.vm_groups)
hbase.Stop(by_role['master'])
hadoop.StopHDFS(by_role['master'])
background_tasks.RunThreaded(hadoop.CleanDatanode, by_role['workers'])