perfkitbenchmarker/linux_benchmarks/cloudsuite_graph_analytics_benchmark.py (96 lines of code) (raw):
# Copyright 2016 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Runs the graph analytics benchmark of Cloudsuite.
More info: http://cloudsuite.ch/graphanalytics/
"""
import re
from absl import flags
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import configs
from perfkitbenchmarker import errors
from perfkitbenchmarker import sample
from perfkitbenchmarker.linux_packages import docker
flags.DEFINE_integer(
'cloudsuite_graph_analytics_worker_mem',
2,
'Amount of memory for the worker, in gigabytes',
)
FLAGS = flags.FLAGS
BENCHMARK_NAME = 'cloudsuite_graph_analytics'
BENCHMARK_CONFIG = """
cloudsuite_graph_analytics:
description: >
Run Cloudsuite graph analytics benchmark. Specify the number of worker
VMs with --num_vms.
vm_groups:
master:
vm_spec: *default_dual_core
vm_count: 1
workers:
vm_spec: *default_dual_core
"""
def GetConfig(user_config):
"""Reads the config file and overwrites vm_count with num_vms."""
config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
if FLAGS['num_vms'].present:
config['vm_groups']['workers']['vm_count'] = FLAGS.num_vms
return config
def Prepare(benchmark_spec):
"""Install docker.
Pull the required images from DockerHub, create datasets, and
start Spark master and workers.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
master = benchmark_spec.vm_groups['master'][0]
workers = benchmark_spec.vm_groups['workers']
def PrepareCommon(vm):
if not docker.IsInstalled(vm):
vm.Install('docker')
vm.Install('cloudsuite/spark')
vm.Install('cloudsuite/twitter-dataset-graph')
vm.RemoteCommand(
'sudo docker create --name data cloudsuite/twitter-dataset-graph'
)
def PrepareMaster(vm):
PrepareCommon(vm)
vm.Install('cloudsuite/graph-analytics')
master_cmd = (
'sudo docker run -d --net host -e SPARK_MASTER_IP=%s '
'--name spark-master cloudsuite/spark master'
% vm.internal_ip
)
vm.RemoteCommand(master_cmd)
def PrepareWorker(vm):
PrepareCommon(vm)
worker_cmd = (
'sudo docker run -d --net host --volumes-from data '
'--name spark-worker cloudsuite/spark worker '
'spark://%s:7077'
% master.internal_ip
)
vm.RemoteCommand(worker_cmd)
target_arg_tuples = [(PrepareWorker, [vm], {}) for vm in workers] + [(
PrepareMaster,
[master],
{},
)]
background_tasks.RunParallelThreads(target_arg_tuples, len(target_arg_tuples))
def Run(benchmark_spec):
"""Run the graph analytics benchmark.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
Returns:
A list of sample.Sample objects.
"""
master = benchmark_spec.vm_groups['master'][0]
results = []
memory_option = '--executor-memory %dg' % (
FLAGS.cloudsuite_graph_analytics_worker_mem
)
benchmark_cmd = (
'sudo docker run --rm --net host --volumes-from data '
'cloudsuite/graph-analytics %s --master spark://%s:7077'
% (memory_option, master.internal_ip)
)
stdout, _ = master.RemoteCommand(benchmark_cmd)
matches = re.findall(r'Running time = (\d+)', stdout)
if len(matches) != 1:
errors.Benchmarks.RunError('Expected to find benchmark execution time')
execution_time = matches[0]
results.append(
sample.Sample(
'Benchmark execution time', float(execution_time) / 1000, 'seconds'
)
)
return results
def Cleanup(benchmark_spec):
"""Stop and remove docker containers. Remove images.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
master = benchmark_spec.vm_groups['master'][0]
workers = benchmark_spec.vm_groups['workers']
def CleanupCommon(vm):
vm.RemoteCommand('sudo docker rm -v data')
def CleanupMaster(vm):
vm.RemoteCommand('sudo docker stop spark-master')
vm.RemoteCommand('sudo docker rm spark-master')
CleanupCommon(vm)
def CleanupWorker(vm):
vm.RemoteCommand('sudo docker stop spark-worker')
vm.RemoteCommand('sudo docker rm spark-worker')
CleanupCommon(vm)
target_arg_tuples = [(CleanupWorker, [vm], {}) for vm in workers] + [(
CleanupMaster,
[master],
{},
)]
background_tasks.RunParallelThreads(target_arg_tuples, len(target_arg_tuples))