perfkitbenchmarker/dpb_service.py (974 lines of code) (raw):

# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Benchmarking support for Data Processing Backend Services. In order to benchmark Data Processing Backend services such as Google Cloud Platform's Dataproc and Dataflow or Amazon's EMR, we create a BaseDpbService class. Classes to wrap specific backend services are in the corresponding provider directory as a subclass of BaseDpbService. """ import abc from collections.abc import Callable, MutableMapping import dataclasses import datetime import logging import os import shutil import tempfile from typing import Dict, List, Type, TypeAlias from absl import flags import jinja2 from perfkitbenchmarker import background_tasks from perfkitbenchmarker import container_service from perfkitbenchmarker import context from perfkitbenchmarker import data from perfkitbenchmarker import dpb_constants from perfkitbenchmarker import errors from perfkitbenchmarker import resource from perfkitbenchmarker import sample from perfkitbenchmarker import units from perfkitbenchmarker import vm_util from perfkitbenchmarker.linux_packages import hadoop from perfkitbenchmarker.linux_packages import spark from perfkitbenchmarker.providers.aws import flags as aws_flags from perfkitbenchmarker.providers.aws import s3 from perfkitbenchmarker.providers.aws import util as aws_util from perfkitbenchmarker.providers.gcp import gcs from perfkitbenchmarker.providers.gcp import util as gcp_util import yaml flags.DEFINE_string( 'static_dpb_service_instance', None, 'If set, the name of the pre created dpb implementation,' 'assumed to be ready.', ) flags.DEFINE_string('dpb_log_level', 'INFO', 'Manipulate service log level') flags.DEFINE_string( 'dpb_job_jarfile', None, 'Executable Jarfile containing workload implementation', ) flags.DEFINE_string( 'dpb_job_classname', None, 'Classname of the job implementation in the jar file', ) flags.DEFINE_string( 'dpb_service_bucket', None, 'A bucket to use with the DPB ' 'service. If none is provided one will be created by the ' 'benchmark and cleaned up afterwards unless you are using ' 'a static instance.', ) flags.DEFINE_string( 'dpb_service_zone', None, 'The zone for provisioning the dpb_service instance.', ) flags.DEFINE_list( 'dpb_job_properties', [], 'A list of strings of the form "key=value" to be passed into DPB jobs.', ) flags.DEFINE_list( 'dpb_cluster_properties', [], 'A list of strings of the form ' '"type:key=value" to be passed into DPB clusters. See ' 'https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/cluster-properties.', ) flags.DEFINE_float( 'dpb_job_poll_interval_secs', 5, 'Poll interval to check submitted job status in seconds. Only applies for ' 'DPB service implementations that do not support synchronous job ' 'submissions (i.e. not Dataproc).', lower_bound=0, upper_bound=120, ) flags.DEFINE_string( 'dpb_initialization_actions', None, 'A comma separated list of Google Cloud Storage URIs of executables to run ' 'on each node in the DPB cluster. See ' 'https://cloud.google.com/sdk/gcloud/reference/dataproc/clusters/create#--initialization-actions.', ) flags.DEFINE_bool( 'dpb_export_job_stats', False, 'Exports job stats such as CPU usage and cost. Disabled by default and not ' 'necessarily implemented on all services.', ) flags.DEFINE_enum( 'dpb_job_type', None, [ dpb_constants.PYSPARK_JOB_TYPE, dpb_constants.SPARKSQL_JOB_TYPE, dpb_constants.SPARK_JOB_TYPE, dpb_constants.HADOOP_JOB_TYPE, dpb_constants.BEAM_JOB_TYPE, dpb_constants.DATAFLOW_JOB_TYPE, dpb_constants.FLINK_JOB_TYPE, ], 'The type of the job to be run on the backends.', ) _HARDWARE_HOURLY_COST = flags.DEFINE_float( 'dpb_hardware_hourly_cost', None, 'Hardware hourly USD cost of running the DPB cluster. Set it along with ' '--dpb_service_premium_hourly_cost to publish cost estimate metrics.', ) _SERVICE_PREMIUM_HOURLY_COST = flags.DEFINE_float( 'dpb_service_premium_hourly_cost', None, 'Hardware hourly USD cost of running the DPB cluster. Set it along with ' '--dpb_hardware_hourly_cost to publish cost estimate metrics.', ) _DYNAMIC_ALLOCATION = flags.DEFINE_bool( 'dpb_dynamic_allocation', True, 'True by default. Set it to False to disable dynamic allocation and assign ' 'all cluster executors to an incoming job. Setting this off is only ' 'supported by Dataproc and EMR (non-serverless versions).', ) FLAGS = flags.FLAGS class JobNotCompletedError(Exception): """Used to signal a job is still running.""" pass class JobSubmissionError(errors.Benchmarks.RunError): """Thrown by all implementations if SubmitJob fails.""" pass FetchOutputFn: TypeAlias = Callable[[], tuple[str | None, str | None]] @dataclasses.dataclass class JobResult: """Data class for the timing of a successful DPB job. Attributes: run_time: Service reported execution time. pending_time: Service reported pending time (0 if service does not report). stdout: Job's stdout. Call FetchOutput before to ensure it's populated. stderr: Job's stderr. Call FetchOutput before to ensure it's populated. fetch_output_fn: Callback expected to return a 2-tuple of str or None whose values correspond to stdout and stderr respectively. This is called by FetchOutput which updates stdout and stderr if their respective value in this callback's return tuple is not None. Defaults to a no-op. """ run_time: float pending_time: float = 0 stdout: str = '' stderr: str = '' fetch_output_fn: FetchOutputFn = lambda: (None, None) def FetchOutput(self): """Populates stdout and stderr according to fetch_output_fn callback.""" stdout, stderr = self.fetch_output_fn() if stdout is not None: self.stdout = stdout if stderr is not None: self.stderr = stderr @property def wall_time(self) -> float: """The total time the service reported it took to execute.""" return self.run_time + self.pending_time @dataclasses.dataclass class JobCosts: """Contains cost stats for a job execution on a serverless DPB service. Attributes: total_cost: Total costs incurred by the job. compute_cost: Compute costs incurred by the job. RAM may be billed as a different item on some DPB service implementations. memory_cost: RAM costs incurred by the job if available. storage_cost: Shuffle storage costs incurred by the job. compute_units_used: Compute units consumed by the job. memory_units_used: RAM units consumed by this job. storage_units_used: Shuffle storage units consumed by the job. compute_unit_cost: Cost of 1 compute unit. memory_unit_cost: Cost of 1 memory unit. storage_unit_cost: Cost of 1 shuffle storage unit. compute_unit_name: Name of the compute units used by the service. memory_unit_name: Name of the memory units used by the service. storage_unit_name: Name of the shuffle storage units used by the service. """ total_cost: float | None = None compute_cost: float | None = None memory_cost: float | None = None storage_cost: float | None = None compute_units_used: float | None = None memory_units_used: float | None = None storage_units_used: float | None = None compute_unit_cost: float | None = None memory_unit_cost: float | None = None storage_unit_cost: float | None = None compute_unit_name: str | None = None memory_unit_name: str | None = None storage_unit_name: str | None = None def GetSamples( self, prefix: str = '', renames: dict[str, str] | None = None, metadata: MutableMapping[str, str] | None = None, ) -> list[sample.Sample]: """Gets PKB samples for these costs. Args: prefix: Add a prefix before the samples name if passed. renames: Optional dict arg where each entry represents a metric rename, being the key the original name and the value the new name. prefix is added after the rename. metadata: String mapping with the metadata for each benchmark. Returns: A list of samples to be published. """ if renames is None: renames = {} if metadata is None: metadata = {} def GetName(original_name): return f'{prefix}{renames.get(original_name) or original_name}' metrics = [ ('total_cost', self.total_cost, '$'), ('compute_cost', self.compute_cost, '$'), ('memory_cost', self.memory_cost, '$'), ('storage_cost', self.storage_cost, '$'), ('compute_units_used', self.compute_units_used, self.compute_unit_name), ('memory_units_used', self.memory_units_used, self.memory_unit_name), ('storage_units_used', self.storage_units_used, self.storage_unit_name), ( 'compute_unit_cost', self.compute_unit_cost, f'$/({self.compute_unit_name})', ), ( 'memory_unit_cost', self.memory_unit_cost, f'$/({self.memory_unit_name})', ), ( 'storage_unit_cost', self.storage_unit_cost, f'$/({self.storage_unit_name})', ), ] results = [] for metric_name, value, unit in metrics: if value is not None and unit is not None: results.append( sample.Sample(GetName(metric_name), value, unit, metadata) ) return results class BaseDpbService(resource.BaseResource): """Object representing a Data Processing Backend Service.""" REQUIRED_ATTRS = ['CLOUD', 'SERVICE_TYPE'] RESOURCE_TYPE = 'BaseDpbService' CLOUD = 'abstract' SERVICE_TYPE = 'abstract' HDFS_FS = dpb_constants.HDFS_FS GCS_FS = dpb_constants.GCS_FS S3_FS = dpb_constants.S3_FS SUPPORTS_NO_DYNALLOC = False def _JobJars(self) -> Dict[str, Dict[str, str]]: """Known mappings of jars in the cluster used by GetExecutionJar.""" return { dpb_constants.SPARK_JOB_TYPE: { # Default for Dataproc and EMR 'examples': 'file:///usr/lib/spark/examples/jars/spark-examples.jar' } } def __init__(self, dpb_service_spec): """Initialize the Dpb service object. Args: dpb_service_spec: spec of the dpb service. """ is_user_managed = dpb_service_spec.static_dpb_service_instance is not None # Hand over the actual creation to the resource module which treats the # user_managed resources in a special manner and skips creation attempt super().__init__(user_managed=is_user_managed) self.spec = dpb_service_spec if dpb_service_spec.static_dpb_service_instance: self.cluster_id = dpb_service_spec.static_dpb_service_instance else: self.cluster_id = 'pkb-' + FLAGS.run_uri if FLAGS.dpb_service_bucket: self.bucket = FLAGS.dpb_service_bucket self.manage_bucket = False else: self.bucket = 'pkb-' + FLAGS.run_uri self.manage_bucket = True self.dpb_service_zone = FLAGS.dpb_service_zone self.dpb_service_type = self.SERVICE_TYPE self.storage_service = None if not self.SUPPORTS_NO_DYNALLOC and not _DYNAMIC_ALLOCATION.value: raise errors.Setup.InvalidFlagConfigurationError( 'Dynamic allocation off is not supported for the current DPB ' f'Service: {type(self).__name__}.' ) self.cluster_duration = None self._InitializeMetadata() def GetDpbVersion(self) -> str | None: return self.spec.version def GetHdfsType(self) -> str | None: """Gets human friendly disk type for metric metadata.""" return None @property def base_dir(self): return self.persistent_fs_prefix + self.bucket # pytype: disable=attribute-error # bind-properties @abc.abstractmethod def SubmitJob( self, jarfile: str | None = None, classname: str | None = None, pyspark_file: str | None = None, query_file: str | None = None, job_poll_interval: float | None = None, job_stdout_file: str | None = None, job_arguments: List[str] | None = None, job_files: List[str] | None = None, job_jars: List[str] | None = None, job_py_files: List[str] | None = None, job_type: str | None = None, properties: Dict[str, str] | None = None, ) -> JobResult: """Submit a data processing job to the backend. Args: jarfile: Jar file to execute. classname: Name of the main class. pyspark_file: Comma separated list of Python files to be provided to the job. Must be one of the following file formats ".py, .zip, or .egg". query_file: HCFS URI of file containing Spark SQL script to execute as the job. job_poll_interval: number of seconds saying how often to poll for job completion. Not used by providers for which submit job is a synchronous operation. job_stdout_file: String giving the location of the file in which to put the standard out of the job. job_arguments: List of string arguments to pass to driver application. These are not the arguments passed to the wrapper that submits the job. job_files: Files passed to a Spark Application to be distributed to executors. job_jars: Jars to pass to the application. job_py_files: Python source files to pass to the application. job_type: Spark or Hadoop job properties: Dict of properties to pass with the job. Returns: A JobResult with the timing of the successful job. Raises: JobSubmissionError if job fails. """ pass def _WaitForJob(self, job_id, timeout, poll_interval): if poll_interval is None: poll_interval = FLAGS.dpb_job_poll_interval_secs @vm_util.Retry( timeout=timeout, poll_interval=poll_interval, fuzz=0, retryable_exceptions=(JobNotCompletedError,), ) def Poll(): result = self._GetCompletedJob(job_id) if result is None: raise JobNotCompletedError('Job {} not complete.'.format(job_id)) return result return Poll() def _GetCompletedJob(self, job_id: str) -> JobResult | None: """Get the job result if it has finished. Args: job_id: The step id to query. Returns: A dictionary describing the job if the step the step is complete, None otherwise. Raises: JobSubmissionError if job fails. """ raise NotImplementedError( 'You need to implement _GetCompletedJob if you use _WaitForJob' ) def GetSparkSubmitCommand( self, jarfile: str | None = None, classname: str | None = None, pyspark_file: str | None = None, query_file: str | None = None, job_arguments: List[str] | None = None, job_files: List[str] | None = None, job_jars: List[str] | None = None, job_type: str | None = None, job_py_files: List[str] | None = None, properties: Dict[str, str] | None = None, spark_submit_cmd: str = spark.SPARK_SUBMIT, ) -> List[str]: """Builds the command to run spark-submit on cluster.""" # TODO(pclay): support dpb_constants.SPARKSQL_JOB_TYPE if job_type not in [ dpb_constants.PYSPARK_JOB_TYPE, dpb_constants.SPARK_JOB_TYPE, ]: raise NotImplementedError cmd = [spark_submit_cmd] # Order is important if classname: cmd += ['--class', classname] all_properties = self.GetJobProperties() all_properties.update(properties or {}) for k, v in all_properties.items(): cmd += ['--conf', '{}={}'.format(k, v)] if job_files: cmd = ['--files', ','.join(job_files)] if job_py_files: cmd += ['--py-files', ','.join(job_py_files)] # Main jar/script goes last before args. if job_type == dpb_constants.SPARK_JOB_TYPE: assert jarfile cmd.append(jarfile) elif job_type == dpb_constants.PYSPARK_JOB_TYPE: assert pyspark_file cmd.append(pyspark_file) if job_arguments: cmd += job_arguments return cmd def DistributedCopy( self, source: str, destination: str, properties: Dict[str, str] | None = None, ) -> JobResult: """Method to copy data using a distributed job on the cluster. Args: source: HCFS directory to copy data from. destination: name of new HCFS directory to copy data into. properties: properties to add to the job. Not supported on EMR. Returns: A JobResult with the timing of the successful job. Raises: JobSubmissionError if job fails. """ return self.SubmitJob( classname='org.apache.hadoop.tools.DistCp', job_arguments=[source, destination], job_type=dpb_constants.HADOOP_JOB_TYPE, properties=properties, ) def _InitializeMetadata(self) -> None: pretty_version = self.GetDpbVersion() self.metadata = { 'dpb_service': self.dpb_service_type, 'dpb_version': pretty_version, 'dpb_service_version': '{}_{}'.format( self.dpb_service_type, pretty_version ), 'dpb_cluster_id': self.cluster_id, 'dpb_cluster_shape': self.spec.worker_group.vm_spec.machine_type, 'dpb_cluster_size': self.spec.worker_count, 'dpb_hdfs_type': self.GetHdfsType(), 'dpb_disk_size': self.spec.worker_group.disk_spec.disk_size, 'dpb_service_zone': self.dpb_service_zone, 'dpb_job_properties': ','.join( '{}={}'.format(k, v) for k, v in self.GetJobProperties().items() ), 'dpb_cluster_properties': ','.join(self.GetClusterProperties()), 'dpb_dynamic_allocation': _DYNAMIC_ALLOCATION.value, } def _CreateDependencies(self): """Creates a bucket to use with the cluster.""" if self.manage_bucket: self.storage_service.MakeBucket(self.bucket) def _Create(self): """Creates the underlying resource.""" raise NotImplementedError() def _DeleteDependencies(self): """Deletes the bucket used with the cluster.""" if self.manage_bucket: self.storage_service.DeleteBucket(self.bucket) def _Delete(self): """Deletes the underlying resource. Implementations of this method should be idempotent since it may be called multiple times, even if the resource has already been deleted. """ raise NotImplementedError() def _ProcessWallTime(self, start_time, end_time): """Compute the wall time from the given start and end processing time. Args: start_time: Datetime value when the processing was started. end_time: Datetime value when the processing completed. Returns: Wall time in seconds. Raises: ValueError: Exception raised when invalid input is provided. """ if start_time > end_time: raise ValueError('start_time cannot be later than the end_time') return (end_time - start_time).total_seconds() def GetClusterProperties(self) -> list[str]: """Gets cluster props in the format of the dpb_cluster_properties flag. Note that this might return an empty list if both --dpb_dynamic_allocation and --dpb_cluster_properties are unset. Returns: A list of cluster properties, with each element being in the same format the --dpb_cluster_properties flag uses. """ properties = [] if not _DYNAMIC_ALLOCATION.value: properties.extend([ 'spark:spark.executor.instances=9999', 'spark:spark.dynamicAllocation.enabled=false', ]) properties.extend(FLAGS.dpb_cluster_properties) return properties def GetJobProperties(self) -> Dict[str, str]: """Parse the dpb_job_properties_flag.""" return dict(pair.split('=') for pair in FLAGS.dpb_job_properties) def GetExecutionJar(self, job_category: str, job_type: str) -> str: """Retrieve execution jar corresponding to the job_category and job_type. Args: job_category: String category of the job for eg. hadoop, spark, hive, etc. job_type: String name of the type of workload to executed on the cluster, for eg. word_count, terasort, etc. Returns: The path to the execusion jar on the cluster Raises: NotImplementedError: An unsupported combination of job_category and job_type was provided for execution on the cluster. """ jar = self._JobJars().get(job_category, {}).get(job_type) if jar: return jar raise NotImplementedError( f'No jar found for category {job_category} and type {job_type}.' ) def GetClusterCreateTime(self) -> float | None: """Returns the cluster creation time. This default implementation computes it by substracting the resource_ready_time and create_start_time attributes. Returns: A float representing the creation time in seconds or None. """ if self.resource_ready_time is None or self.create_start_time is None: return None return self.resource_ready_time - self.create_start_time def GetServiceWrapperScriptsToUpload(self) -> List[str]: """Gets service wrapper scripts to upload alongside benchmark scripts.""" return [] def CalculateLastJobCosts(self) -> JobCosts: """Returns the cost of last job submitted. Returns: A dpb_service.JobCosts object. """ return JobCosts() def GetClusterDuration(self) -> float | None: """Gets how much time the cluster has been running in seconds. This default implementation just returns None. Override in subclasses if needed. Returns: A float representing the number of seconds the cluster has been running or None if it cannot be obtained. """ return self.cluster_duration def GetClusterCost(self) -> float | None: """Gets the cost of running the cluster if applicable. Default implementation returns the sum of cluster hardware cost and service premium cost. Guaranteed to be called after the cluster has been shut down if applicable. Returns: A float representing the cost in dollars or None if not implemented. """ hardware_cost = self.GetClusterHardwareCost() premium_cost = self.GetClusterPremiumCost() if hardware_cost is None or premium_cost is None: return None return hardware_cost + premium_cost def GetClusterHardwareCost(self) -> float | None: """Computes the hardware cost with --dpb_hardware_hourly_cost value. Default implementation multiplies --dpb_hardware_hourly_cost with the value returned by GetClusterDuration(). Returns: A float representing the cost in dollars or None if not implemented. """ # pylint: disable-next=assignment-from-none cluster_duration = self.GetClusterDuration() if cluster_duration is None or _HARDWARE_HOURLY_COST.value is None: return None return cluster_duration / 3600 * _HARDWARE_HOURLY_COST.value def GetClusterPremiumCost(self) -> float | None: """Computes the premium cost with --dpb_service_premium_hourly_cost value. Default implementation multiplies --dpb_service_premium_hourly_cost with the value returned by GetClusterDuration(). Returns: A float representing the cost in dollars or None if not implemented. """ # pylint: disable-next=assignment-from-none cluster_duration = self.GetClusterDuration() if cluster_duration is None or _SERVICE_PREMIUM_HOURLY_COST.value is None: return None return cluster_duration / 3600 * _SERVICE_PREMIUM_HOURLY_COST.value def GetSamples(self) -> list[sample.Sample]: """Gets samples with service statistics.""" samples = [] metrics: dict[str, tuple[float | None, str]] = { # Cluster creation time as reported by the DPB service # (non-Serverless DPB services only). 'dpb_cluster_create_time': (self.GetClusterCreateTime(), 'seconds'), # Cluster duration as computed by the underlying benchmark. # (non-Serverless DPB services only). 'dpb_cluster_duration': (self.GetClusterDuration(), 'seconds'), # Cluster hardware cost computed from cluster duration and # hourly costs passed in flags (non-Serverless DPB services only). 'dpb_cluster_hardware_cost': (self.GetClusterHardwareCost(), '$'), # Cluster DPB service premium cost computed from cluster duration and # hourly costs passed in flags (non-Serverless DPB services only). 'dpb_cluster_premium_cost': (self.GetClusterPremiumCost(), '$'), # Cluster hardware cost computed from cluster duration and # hourly costs passed in flags (non-Serverless DPB services only). 'dpb_cluster_total_cost': (self.GetClusterCost(), '$'), # Cluster hardware cost per hour as specified in PKB flags. 'dpb_cluster_hardware_hourly_cost': ( _HARDWARE_HOURLY_COST.value, '$/hour', ), # DPB Service premium cost per hour as specified in PKB flags. 'dpb_cluster_premium_hourly_cost': ( _SERVICE_PREMIUM_HOURLY_COST.value, '$/hour', ), } for metric, value_unit_tuple in metrics.items(): value, unit = value_unit_tuple if value is not None: samples.append( sample.Sample(metric, value, unit, self.GetResourceMetadata()) ) return samples class DpbServiceServerlessMixin: """Mixin with default methods dpb services without managed infrastructure.""" def _Create(self) -> None: pass def _Delete(self) -> None: pass def GetClusterCreateTime(self) -> float | None: return None def GetClusterDuration(self) -> float | None: return None def GetClusterCost(self) -> float | None: return None def GetClusterHardwareCost(self) -> float | None: return None def GetClusterPremiumCost(self) -> float | None: return None class UnmanagedDpbService(BaseDpbService): """Object representing an un-managed dpb service.""" def __init__(self, dpb_service_spec): super().__init__(dpb_service_spec) # Dictionary to hold the cluster vms. self.vms = {} self.cloud = dpb_service_spec.worker_group.cloud if not self.dpb_service_zone: raise errors.Setup.InvalidSetupError( 'dpb_service_zone must be provided, for provisioning.' ) if self.cloud == 'GCP': self.region = gcp_util.GetRegionFromZone(FLAGS.dpb_service_zone) self.storage_service = gcs.GoogleCloudStorageService() self.persistent_fs_prefix = 'gs://' elif self.cloud == 'AWS': self.region = aws_util.GetRegionFromZone(FLAGS.dpb_service_zone) self.storage_service = s3.S3Service() self.persistent_fs_prefix = 's3://' else: self.region = None self.storage_service = None self.persistent_fs_prefix = None self.manage_bucket = False logging.warning( 'Cloud provider %s does not support object storage. ' 'Some benchmarks will not work.', self.cloud, ) if self.storage_service: self.storage_service.PrepareService(location=self.region) # set in _Create of derived classes self.leader = None def CheckPrerequisites(self): if self.cloud == 'AWS' and not aws_flags.AWS_EC2_INSTANCE_PROFILE.value: raise ValueError( 'EC2 based Spark and Hadoop services require ' '--aws_ec2_instance_profile.' ) def GetClusterCreateTime(self) -> float | None: """Returns the cluster creation time. UnmanagedDpbService Create phase doesn't consider actual VM creation, just further provisioning. Thus, we need to add the VMs create time to the default implementation. Returns: A float representing the creation time in seconds or None. """ my_create_time = super().GetClusterCreateTime() if my_create_time is None: return None vms = [] for vm_group in self.vms.values(): for vm in vm_group: vms.append(vm) first_vm_create_start_time = min( ( vm.create_start_time for vm in vms if vm.create_start_time is not None ), default=None, ) last_vm_ready_start_time = max( ( vm.resource_ready_time for vm in vms if vm.resource_ready_time is not None ), default=None, ) if first_vm_create_start_time is None or last_vm_ready_start_time is None: return None return ( my_create_time + last_vm_ready_start_time - first_vm_create_start_time ) class UnmanagedDpbServiceYarnCluster(UnmanagedDpbService): """Object representing an un-managed dpb service yarn cluster.""" CLOUD = 'Unmanaged' SERVICE_TYPE = dpb_constants.UNMANAGED_DPB_SVC_YARN_CLUSTER def __init__(self, dpb_service_spec): super().__init__(dpb_service_spec) self.cloud = dpb_service_spec.worker_group.cloud def GetDpbVersion(self) -> str | None: return str(hadoop.HadoopVersion()) def _Create(self): """Create an un-managed yarn cluster.""" logging.info('Should have created vms by now.') logging.info(str(self.vms)) def InstallHadoop(vm): vm.Install('hadoop') if self.cloud == 'GCP': hadoop.InstallGcsConnector(vm) if 'worker_group' not in self.vms: raise errors.Resource.CreationError( 'UnmanagedDpbServiceYarnCluster requires VMs in a worker_group.' ) background_tasks.RunThreaded( InstallHadoop, self.vms['worker_group'] + self.vms['master_group'] ) self.leader = self.vms['master_group'][0] hadoop.ConfigureAndStart( self.leader, self.vms['worker_group'], configure_s3=self.cloud == 'AWS' ) def SubmitJob( self, jarfile=None, classname=None, pyspark_file=None, query_file=None, job_poll_interval=None, job_stdout_file=None, job_arguments=None, job_files=None, job_jars=None, job_py_files=None, job_type=None, properties=None, ): """Submit a data processing job to the backend.""" if job_type != dpb_constants.HADOOP_JOB_TYPE: raise NotImplementedError cmd_list = [hadoop.HADOOP_CMD] # Order is important if jarfile: cmd_list += ['jar', jarfile] # Specifying classname only works if jarfile is omitted or if it has no # main class. if classname: cmd_list += [classname] all_properties = self.GetJobProperties() all_properties.update(properties or {}) cmd_list += ['-D{}={}'.format(k, v) for k, v in all_properties.items()] if job_arguments: cmd_list += job_arguments cmd_string = ' '.join(cmd_list) start_time = datetime.datetime.now() try: stdout, stderr = self.leader.RobustRemoteCommand(cmd_string) except errors.VirtualMachine.RemoteCommandError as e: raise JobSubmissionError() from e end_time = datetime.datetime.now() if job_stdout_file: with open(job_stdout_file, 'w') as f: f.write(stdout) return JobResult( run_time=(end_time - start_time).total_seconds(), stdout=stdout, stderr=stderr, ) def _Delete(self): pass def _GetCompletedJob(self, job_id: str) -> JobResult | None: """Submitting Job via SSH is blocking so this is not meaningful.""" raise NotImplementedError('Submitting Job via SSH is a blocking command.') class UnmanagedDpbSparkCluster(UnmanagedDpbService): """Object representing an un-managed dpb service spark cluster.""" CLOUD = 'Unmanaged' SERVICE_TYPE = dpb_constants.UNMANAGED_SPARK_CLUSTER def _JobJars(self) -> Dict[str, Dict[str, str]]: """Known mappings of jars in the cluster used by GetExecutionJar.""" return { dpb_constants.SPARK_JOB_TYPE: {'examples': spark.SparkExamplesJarPath()} } def __init__(self, dpb_service_spec): super().__init__(dpb_service_spec) # Dictionary to hold the cluster vms. self.vms = {} self.cloud = dpb_service_spec.worker_group.cloud def GetDpbVersion(self) -> str | None: return f'spark_{spark.SparkVersion()}' def _Create(self): """Create an un-managed yarn cluster.""" logging.info('Should have created vms by now.') logging.info(str(self.vms)) def InstallSpark(vm): vm.Install('spark') if self.cloud == 'GCP': hadoop.InstallGcsConnector(vm) if 'worker_group' not in self.vms: raise errors.Resource.CreationError( 'UnmanagedDpbSparkCluster requires VMs in a worker_group.' ) background_tasks.RunThreaded( InstallSpark, self.vms['worker_group'] + self.vms['master_group'] ) self.leader = self.vms['master_group'][0] spark.ConfigureAndStart( self.leader, self.vms['worker_group'], configure_s3=self.cloud == 'AWS' ) def SubmitJob( self, jarfile=None, classname=None, pyspark_file=None, query_file=None, job_poll_interval=None, job_stdout_file=None, job_arguments=None, job_files=None, job_jars=None, job_py_files=None, job_type=None, properties=None, ): """Submit a data processing job to the backend.""" cmd = self.GetSparkSubmitCommand( jarfile=jarfile, classname=classname, pyspark_file=pyspark_file, job_arguments=job_arguments, job_files=job_files, job_jars=job_jars, job_py_files=job_py_files, job_type=job_type, properties=properties, ) start_time = datetime.datetime.now() try: stdout, _ = self.leader.RobustRemoteCommand(' '.join(cmd)) except errors.VirtualMachine.RemoteCommandError as e: raise JobSubmissionError() from e end_time = datetime.datetime.now() if job_stdout_file: with open(job_stdout_file, 'w') as f: f.write(stdout) return JobResult(run_time=(end_time - start_time).total_seconds()) def _Delete(self): pass def _GetCompletedJob(self, job_id: str) -> JobResult | None: """Submitting Job via SSH is blocking so this is not meaningful.""" raise NotImplementedError('Submitting Job via SSH is a blocking command.') class KubernetesSparkCluster(BaseDpbService): """Object representing a Kubernetes dpb service spark cluster.""" CLOUD = container_service.KUBERNETES SERVICE_TYPE = dpb_constants.KUBERNETES_SPARK_CLUSTER # Constants to sychronize between YAML and Spark configuration # TODO(pclay): Consider setting in YAML SPARK_DRIVER_SERVICE = 'spark-driver' SPARK_DRIVER_PORT = 4042 SPARK_K8S_SERVICE_ACCOUNT = 'spark' def _JobJars(self) -> Dict[str, Dict[str, str]]: """Known mappings of jars in the cluster used by GetExecutionJar.""" return { dpb_constants.SPARK_JOB_TYPE: {'examples': spark.SparkExamplesJarPath()} } # TODO(odiego): Implement GetClusterCreateTime adding K8s cluster create time def __init__(self, dpb_service_spec): super().__init__(dpb_service_spec) benchmark_spec = context.GetThreadBenchmarkSpec() self.k8s_cluster = benchmark_spec.container_cluster assert self.k8s_cluster assert self.k8s_cluster.CLUSTER_TYPE == container_service.KUBERNETES self.cloud = self.k8s_cluster.CLOUD self.container_registry = benchmark_spec.container_registry assert self.container_registry self.spark_drivers = [] # TODO(pclay): Support overriding image? # Corresponds with data/docker/spark directory self.image = 'spark' if self.cloud == 'GCP': self.region = gcp_util.GetRegionFromZone(self.k8s_cluster.zone) self.storage_service = gcs.GoogleCloudStorageService() self.persistent_fs_prefix = 'gs://' elif self.cloud == 'AWS': self.region = self.k8s_cluster.region self.storage_service = s3.S3Service() self.persistent_fs_prefix = 's3://' else: raise errors.Config.InvalidValue( f'Unsupported Cloud provider {self.cloud}' ) self.storage_service.PrepareService(location=self.region) # TODO(pclay): support assert not FLAGS.dpb_cluster_properties if self.k8s_cluster.num_nodes < 2: raise errors.Config.InvalidValue( f'Cluster type {dpb_constants.KUBERNETES_SPARK_CLUSTER} requires at' f' least 2 nodes.Found {self.k8s_cluster.num_nodes}.' ) def GetDpbVersion(self) -> str | None: return 'spark_' + FLAGS.spark_version def _Create(self): """Create docker image for cluster.""" logging.info('Should have created k8s cluster by now.') # TODO(pclay): Should resources publicly declare they have been created? assert self.k8s_cluster.resource_ready_time assert self.container_registry.resource_ready_time logging.info(self.k8s_cluster) logging.info(self.container_registry) logging.info('Building Spark image.') self.image = self.container_registry.GetOrBuild(self.image) # https://spark.apache.org/docs/latest/running-on-kubernetes.html#rbac # TODO(pclay): Consider moving into manifest self.k8s_cluster.CreateServiceAccount( self.SPARK_K8S_SERVICE_ACCOUNT, clusterrole='edit' ) def _GetDriverName(self): return f'spark-driver-{len(self.spark_drivers)}' # Kubernetes unlike other Spark Shedulers reserves 40% of memory for PySpark # instead of the normal 10%. We don't need much memory for PySpark, because # our PySpark is 100% SQL and thus on the JVM (in dpb_sparksql_benchmark). # Force Spark to reserve the normal 10% overhead in all cases. # https://spark.apache.org/docs/latest/running-on-kubernetes.html#spark-properties MEMORY_OVERHEAD_FACTOR = 0.1 def GetJobProperties(self) -> Dict[str, str]: node_cpu = self.k8s_cluster.node_num_cpu # TODO(pclay): Validate that we don't have too little memory? node_memory_mb = self.k8s_cluster.node_memory_allocatable.m_as( units.mebibyte ) # Reserve 512 MB for system daemons node_memory_mb -= 512 # Remove overhead node_memory_mb /= 1 + self.MEMORY_OVERHEAD_FACTOR node_memory_mb = int(node_memory_mb) # Common PKB Spark cluster properties properties = spark.GetConfiguration( driver_memory_mb=node_memory_mb, worker_memory_mb=node_memory_mb, # Schedule one thread per vCPU worker_cores=node_cpu, # Reserve one node for driver num_workers=self.k8s_cluster.num_nodes - 1, configure_s3=self.cloud == 'AWS', ) # k8s specific properties properties.update({ 'spark.driver.host': self.SPARK_DRIVER_SERVICE, 'spark.driver.port': str(self.SPARK_DRIVER_PORT), 'spark.kubernetes.driver.pod.name': self._GetDriverName(), # Tell Spark to under-report cores by 1 to fit next to k8s services 'spark.kubernetes.executor.request.cores': str(node_cpu - 1), 'spark.kubernetes.container.image': self.image, # No HDFS available 'spark.hadoop.fs.defaultFS': self.base_dir, 'spark.kubernetes.memoryOverheadFactor': str( self.MEMORY_OVERHEAD_FACTOR ), }) # User specified properties properties.update(super().GetJobProperties()) return properties def SubmitJob( self, jarfile=None, classname=None, pyspark_file=None, query_file=None, job_poll_interval=None, job_stdout_file=None, job_arguments=None, job_files=None, job_jars=None, job_py_files=None, job_type=None, properties=None, ): """Submit a data processing job to the backend.""" # Specs can't be copied or created by hand. So we override the command of # the spec for each job. command = self.GetSparkSubmitCommand( jarfile=jarfile, classname=classname, pyspark_file=pyspark_file, job_arguments=job_arguments, job_files=job_files, job_jars=job_jars, job_py_files=job_py_files, job_type=job_type, properties=properties, ) driver_name = self._GetDriverName() # Request memory for driver. This should guarantee that driver does not get # scheduled on same VM as exectutor and OOM. driver_memory_mb = int( self.GetJobProperties()[spark.SPARK_DRIVER_MEMORY].strip('m') ) start_time = datetime.datetime.now() self.k8s_cluster.ApplyManifest( 'container/spark/spark-driver.yaml.j2', name=driver_name, command=command, driver_memory_mb=driver_memory_mb, driver_port=self.SPARK_DRIVER_PORT, driver_service=self.SPARK_DRIVER_SERVICE, image=self.image, service_account=self.SPARK_K8S_SERVICE_ACCOUNT, ) container = container_service.KubernetesPod(driver_name) # increments driver_name for next job self.spark_drivers.append(container) try: container.WaitForExit() except container_service.ContainerException as e: raise JobSubmissionError() from e end_time = datetime.datetime.now() if job_stdout_file: with open(job_stdout_file, 'w') as f: f.write(container.GetLogs()) # TODO(pclay): use k8s output for timing? return JobResult(run_time=(end_time - start_time).total_seconds()) def _Delete(self): pass def _GetCompletedJob(self, job_id: str) -> JobResult | None: """container.WaitForExit is blocking so this is not meaningful.""" raise NotImplementedError('container.WaitForExit is a blocking command.') class KubernetesFlinkCluster(BaseDpbService): """Object representing a Kubernetes dpb service flink cluster.""" CLOUD = container_service.KUBERNETES SERVICE_TYPE = dpb_constants.KUBERNETES_FLINK_CLUSTER FLINK_JOB_MANAGER_SERVICE = 'flink-jobmanager' DEFAULT_FLINK_IMAGE = 'flink' def __init__(self, dpb_service_spec): super().__init__(dpb_service_spec) benchmark_spec = context.GetThreadBenchmarkSpec() self.k8s_cluster = benchmark_spec.container_cluster assert self.k8s_cluster assert self.k8s_cluster.CLUSTER_TYPE == container_service.KUBERNETES self.cloud = self.k8s_cluster.CLOUD self.container_registry = benchmark_spec.container_registry assert self.container_registry self.flink_jobmanagers = [] self.image = self.DEFAULT_FLINK_IMAGE if self.cloud == 'GCP': self.region = gcp_util.GetRegionFromZone(self.k8s_cluster.zone) self.storage_service = gcs.GoogleCloudStorageService() self.persistent_fs_prefix = 'gs://' else: raise errors.Config.InvalidValue( f'Unsupported Cloud provider {self.cloud}' ) self.storage_service.PrepareService(location=self.region) if self.k8s_cluster.num_nodes < 2: raise errors.Config.InvalidValue( f'Cluster type {dpb_constants.KUBERNETES_FLINK_CLUSTER} requires at' f' least 2 nodes.Found {self.k8s_cluster.num_nodes}.' ) def GetDpbVersion(self) -> str | None: return self.spec.version or self.DEFAULT_FLINK_IMAGE def _CreateConfigMapDir(self): """Returns a TemporaryDirectory containing files in the ConfigMap.""" temp_directory = tempfile.TemporaryDirectory() # Create flink-conf.yaml to configure flink. flink_conf_filename = os.path.join(temp_directory.name, 'flink-conf.yaml') with open(flink_conf_filename, 'w') as flink_conf_file: yaml.dump( self.GetJobProperties(), flink_conf_file, default_flow_style=False ) flink_conf_file.close() # Create log4j-console.properties for logging configuration. logging_property_file = data.ResourcePath( 'container/flink/log4j-console.properties' ) logging_property_filename = os.path.join( temp_directory.name, 'log4j-console.properties' ) shutil.copyfile(logging_property_file, logging_property_filename) return temp_directory def _GenerateConfig(self, config_file, **kwargs): """Returns a temporary config file.""" filename = data.ResourcePath(config_file) environment = jinja2.Environment(undefined=jinja2.StrictUndefined) with open(filename) as template_file, vm_util.NamedTemporaryFile( mode='w', suffix='.yaml', dir=vm_util.GetTempDir(), delete=False ) as rendered_template: config = environment.from_string(template_file.read()).render(kwargs) rendered_template.write(config) rendered_template.close() logging.info('Finish generating config file %s', rendered_template.name) return rendered_template.name def _Create(self): logging.info('Should have created k8s cluster by now.') # Create docker image for containers assert self.k8s_cluster.resource_ready_time assert self.container_registry.resource_ready_time logging.info(self.k8s_cluster) logging.info(self.container_registry) logging.info('Building Flink image.') if FLAGS.container_remote_build_config is None: FLAGS.container_remote_build_config = self._GenerateConfig( 'docker/flink/cloudbuild.yaml.j2', dpb_job_jarfile=FLAGS.dpb_job_jarfile, base_image=self.GetDpbVersion(), full_tag=self.container_registry.GetFullRegistryTag(self.image), ) self.image = self.container_registry.GetOrBuild(self.image) logging.info('Configuring Kubernetes Flink Cluster.') with self._CreateConfigMapDir() as config_dir: self.k8s_cluster.CreateConfigMap('default-config', config_dir) def _GetJobManagerName(self): return f'flink-jobmanager-{len(self.flink_jobmanagers)}' def GetJobProperties(self) -> Dict[str, str]: node_cpu = self.k8s_cluster.node_num_cpu node_memory_mb = self.k8s_cluster.node_memory_allocatable.m_as( units.mebibyte ) # Reserve 512 MB for system daemons node_memory_mb -= 512 node_memory_mb = int(node_memory_mb) # Common flink properties properties = { 'jobmanager.rpc.address': self.FLINK_JOB_MANAGER_SERVICE, 'blob.server.port': 6124, 'jobmanager.rpc.port': 6123, 'taskmanager.rpc.port': 6122, 'queryable-state.proxy.ports': 6125, 'jobmanager.memory.process.size': f'{node_memory_mb}m', 'taskmanager.memory.process.size': f'{node_memory_mb}m', 'taskmanager.numberOfTaskSlots': node_cpu, 'parallelism.default': node_cpu * (self.k8s_cluster.num_nodes - 1), 'execution.attached': True, } # User specified properties properties.update(super().GetJobProperties()) return properties def SubmitJob( self, jarfile=None, classname=None, pyspark_file=None, query_file=None, job_poll_interval=None, job_stdout_file=None, job_arguments=None, job_files=None, job_jars=None, job_py_files=None, job_type=None, properties=None, ): """Submit a data processing job to the backend.""" job_manager_name = self._GetJobManagerName() job_properties = self.GetJobProperties() start_time = datetime.datetime.now() self.k8s_cluster.ApplyManifest( 'container/flink/flink-job-and-deployment.yaml.j2', job_manager_name=job_manager_name, job_manager_service=self.FLINK_JOB_MANAGER_SERVICE, classname=classname, job_arguments=','.join(job_arguments), image=self.image, task_manager_replicas=self.k8s_cluster.num_nodes - 1, task_manager_rpc_port=job_properties.get('taskmanager.rpc.port'), job_manager_rpc_port=job_properties.get('jobmanager.rpc.port'), blob_server_port=job_properties.get('blob.server.port'), queryable_state_proxy_ports=job_properties.get( 'queryable-state.proxy.ports' ), ) stdout, _, _ = container_service.RunKubectlCommand( ['get', 'pod', f'--selector=job-name={job_manager_name}', '-o', 'yaml'] ) pods = yaml.safe_load(stdout)['items'] if len(pods) <= 0: raise JobSubmissionError('No pod was created for the job.') container = container_service.KubernetesPod(pods[0]['metadata']['name']) self.flink_jobmanagers.append(container) try: container.WaitForExit() except container_service.ContainerException as e: raise JobSubmissionError() from e end_time = datetime.datetime.now() if job_stdout_file: with open(job_stdout_file, 'w') as f: f.write(container.GetLogs()) return JobResult(run_time=(end_time - start_time).total_seconds()) def CheckPrerequisites(self): # Make sure dpb_job_jarfile is provided when using flink assert FLAGS.dpb_job_jarfile # dpb_cluster_properties is to be supported assert not FLAGS.dpb_cluster_properties def _GetCompletedJob(self, job_id: str) -> JobResult | None: """container.WaitForExit is blocking so this is not meaningful.""" raise NotImplementedError('container.WaitForExit is a blocking command.') def _Delete(self): pass def GetDpbServiceClass( cloud: str, dpb_service_type: str ) -> Type[BaseDpbService] | None: """Gets the Data Processing Backend class corresponding to 'service_type'. Args: cloud: String name of cloud of the service dpb_service_type: String service type as specified in configuration Returns: Implementation class corresponding to the argument dpb_service_type Raises: Exception: An invalid data processing backend service type was provided """ if dpb_service_type in dpb_constants.UNMANAGED_SERVICES: cloud = 'Unmanaged' elif dpb_service_type in [ dpb_constants.KUBERNETES_SPARK_CLUSTER, dpb_constants.KUBERNETES_FLINK_CLUSTER, ]: cloud = container_service.KUBERNETES return resource.GetResourceClass( BaseDpbService, CLOUD=cloud, SERVICE_TYPE=dpb_service_type )