perfkitbenchmarker/linux_benchmarks/object_storage_curl.py (131 lines of code) (raw):

# Copyright 2021 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Use cURL to upload and download data to object storage in parallel. Consistent with object_storage_service multistream scenario. Due to the difficulty of signing to requests to S3 by hand (https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html). The benchmark uses insecure short lived buckets and should be used with caution. TODO(pclay): Consider signing requests and not using public buckets. """ import itertools from typing import List, Tuple from absl import flags import numpy as np from perfkitbenchmarker import background_tasks from perfkitbenchmarker import configs from perfkitbenchmarker import errors from perfkitbenchmarker import flag_util from perfkitbenchmarker import object_storage_service from perfkitbenchmarker import providers from perfkitbenchmarker import sample from perfkitbenchmarker.linux_benchmarks import object_storage_service_benchmark BENCHMARK_NAME = 'object_storage_curl' BENCHMARK_CONFIG = """ object_storage_curl: description: Use cURL to upload and download data to object storage in parallel. vm_groups: default: vm_spec: *default_dual_core flags: # Required object_storage_multistream_objects_per_stream: 1 object_storage_streams_per_vm: 10 """ # Blocksize for dd to pipe data into uploads. DD_BLOCKSIZE = 4000 flags.DEFINE_string( 'object_storage_curl_object_size', '1MB', 'Size of objects to upload / download. Similar to ' '--object_storage_object_sizes, but only takes a single ' 'size.', ) flags.DEFINE_bool( 'object_storage_curl_i_am_ok_with_public_read_write_buckets', False, 'Acknowledge that this bucket will create buckets ' 'which are publicly readable and writable. Required to run ' 'this benchmark.', ) FLAGS = flags.FLAGS def GetConfig(user_config): return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) def CheckPrerequisites(_): """Validate some unsupported flags.""" if ( flag_util.StringToBytes(FLAGS.object_storage_curl_object_size) < DD_BLOCKSIZE ): raise errors.Config.InvalidValue( '--object_storage_curl_object_size must be larger than 4KB' ) # TODO(pclay): Consider supporting multiple objects per stream. if FLAGS.object_storage_multistream_objects_per_stream != 1: raise errors.Config.InvalidValue( 'object_storage_curl only supports 1 object per stream' ) if FLAGS.object_storage_object_naming_scheme != 'sequential_by_stream': raise errors.Config.InvalidValue( 'object_storage_curl only supports sequential_by_stream naming.' ) if not FLAGS.object_storage_curl_i_am_ok_with_public_read_write_buckets: raise errors.Config.InvalidValue( 'This benchmark uses public read/write object storage bucket.\n' 'You must explicitly pass ' '--object_storage_curl_i_am_ok_with_public_read_write_buckets to ' 'acknowledge that it will be created.\n' 'If PKB is interrupted, you should ensure it is cleaned up.' ) # PyType does not currently support returning Abstract classes # TODO(user): stop suppressing def _GetService() -> object_storage_service.ObjectStorageService: # pytype: disable=not-instantiable """Get a ready to use instance of ObjectStorageService.""" # TODO(pclay): consider using FLAGS.storage to allow cross cloud testing? cloud = FLAGS.cloud providers.LoadProvider(cloud) service = object_storage_service.GetObjectStorageClass(cloud)() # This method is idempotent with default args and safe to call in each phase. service.PrepareService(FLAGS.object_storage_region) return service def _GetBucketName() -> str: return FLAGS.object_storage_bucket_name or 'pkb%s' % FLAGS.run_uri def Prepare(benchmark_spec): """Create and ACL bucket and install curl.""" # We would like to always cleanup server side states when exception happens. benchmark_spec.always_call_cleanup = True service = _GetService() bucket_name = _GetBucketName() service.MakeBucket(bucket_name) service.MakeBucketPubliclyReadable(bucket_name, also_make_writable=True) vms = benchmark_spec.vms background_tasks.RunThreaded(lambda vm: vm.InstallPackages('curl'), vms) def Run(benchmark_spec) -> List[sample.Sample]: """Run storage benchmark and publish results. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. Returns: The same samples as object_storage_multistream. """ service = _GetService() bucket = _GetBucketName() object_bytes = flag_util.StringToBytes(FLAGS.object_storage_curl_object_size) blocks = object_bytes // DD_BLOCKSIZE start_time_cmd = "date '+%s.%N'" generate_data_cmd = ( 'openssl aes-256-ctr -iter 1 -pass file:/dev/urandom -in /dev/zero' f' | dd bs={DD_BLOCKSIZE} count={blocks} iflag=fullblock' ) # TODO(pclay): consider adding size_down/upload to verify we are actually # reading the data. curl_cmd = "curl -fsw '%{time_total}' -o /dev/null" def Upload(vm, object_index): object_name = f'{vm.name}_{object_index}' url = service.GetUploadUrl(bucket=bucket, object_name=object_name) stdout, _ = vm.RemoteCommand( f'{start_time_cmd}; {generate_data_cmd} | ' f"{curl_cmd} -X {service.UPLOAD_HTTP_METHOD} --data-binary @- '{url}'" ) return stdout def Download(vm, object_index): object_name = f'{vm.name}_{object_index}' url = service.GetDownloadUrl(bucket=bucket, object_name=object_name) stdout, _ = vm.RemoteCommand(f"{start_time_cmd}; {curl_cmd} '{url}'") return stdout vms = benchmark_spec.vms streams_per_vm = FLAGS.object_storage_streams_per_vm samples = [] for operation, func in (('upload', Upload), ('download', Download)): output = background_tasks.RunThreaded( func, [(args, {}) for args in itertools.product(vms, range(streams_per_vm))], ) start_times, latencies = _LoadWorkerOutput(output) object_storage_service_benchmark.ProcessMultiStreamResults( start_times, latencies, all_sizes=[object_bytes], sizes=[np.array([object_bytes])] * streams_per_vm * len(vms), operation=operation, results=samples, ) return samples def Cleanup(_): service = _GetService() bucket_name = _GetBucketName() if not FLAGS.object_storage_dont_delete_bucket: service.DeleteBucket(bucket_name) service.CleanupService() def _LoadWorkerOutput( output: List[str], ) -> Tuple[List[np.ndarray], List[np.ndarray]]: """Parse the output of Upload and Download functions. The output of Upload and Download is 12345.6789 # Unix start time as float in seconds 1.2345 # Latency of curl in seconds Args: output: the output of each upload or download command Returns: the start times and latencies of the curl commands """ start_times = [] latencies = [] for worker_out in output: start_time_str, latency_str = worker_out.strip().split('\n') # curl 7.74 used μs instead of seconds. Not used in major OS types. # https://github.com/curl/curl/issues/6321 assert '.' in latency_str, 'Invalid curl output.' start_times.append(np.array([start_time_str], dtype=np.float64)) latencies.append(np.array([latency_str], dtype=np.float64)) return start_times, latencies