perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc.py (714 lines of code) (raw):

# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module containing class for GCP's dataproc service. Clusters can be created, have jobs submitted to them and deleted. See details at https://cloud.google.com/dataproc/ """ import datetime import json import logging import os import re from typing import Any, Dict, Tuple from absl import flags from perfkitbenchmarker import data from perfkitbenchmarker import dpb_constants from perfkitbenchmarker import dpb_service from perfkitbenchmarker import errors from perfkitbenchmarker import flag_util from perfkitbenchmarker import provider_info from perfkitbenchmarker import vm_util from perfkitbenchmarker.linux_packages import aws_credentials from perfkitbenchmarker.providers.gcp import flags as gcp_flags from perfkitbenchmarker.providers.gcp import gcp_dpb_dataproc_serverless_prices from perfkitbenchmarker.providers.gcp import gcs from perfkitbenchmarker.providers.gcp import util FLAGS = flags.FLAGS flags.DEFINE_string( 'dpb_dataproc_image_version', None, 'The image version to use for the cluster.', ) _S3A_CREDENTIALS = flags.DEFINE_string( 'dpb_dataproc_s3a_cross_cloud_credentials_cli_profile', None, 'The local AWS CLI profile to use to copy data to S3.', ) disk_to_hdfs_map = { 'pd-standard': 'HDD', 'pd-balanced': 'SSD (Balanced)', 'pd-ssd': 'SSD', } serverless_disk_to_hdfs_map = { 'standard': 'HDD', 'premium': 'Local SSD', } _DATAPROC_SERVERLESS_PRICES = ( gcp_dpb_dataproc_serverless_prices.DATAPROC_SERVERLESS_PRICES ) DATAPROC_FLINK_INIT_SCRIPT = os.path.join('beam', 'flink-init.sh') DATAPROC_FLINK_PRESUBMIT_SCRIPT = os.path.join('beam', 'flink-presubmit.sh') DATAPROC_FLINK_TRIGGER_SCRIPT = os.path.join('beam', 'flink-trigger.sh') class MetricNotReadyError(Exception): """Used to signal metric is not ready.""" pass class GcpDpbBaseDataproc(dpb_service.BaseDpbService): """Base class for all Dataproc-based services (cluster or serverless).""" def __init__(self, dpb_service_spec): super().__init__(dpb_service_spec) self.project = FLAGS.project if not self.dpb_service_zone: raise errors.Setup.InvalidSetupError( 'dpb_service_zone must be provided, for provisioning.' ) self.region = self.dpb_service_zone.rsplit('-', 1)[0] self.storage_service = gcs.GoogleCloudStorageService() self.storage_service.PrepareService(location=self.region) self.persistent_fs_prefix = 'gs://' self._cluster_create_time: float | None = None self._cluster_ready_time: float | None = None self._cluster_delete_time: float | None = None def GetDpbVersion(self) -> str | None: return FLAGS.dpb_dataproc_image_version or super().GetDpbVersion() @staticmethod def _ParseTime(state_time: str) -> datetime.datetime: """Parses time from json output. Args: state_time: string. the state start time. Returns: Parsed datetime. """ try: return datetime.datetime.strptime(state_time, '%Y-%m-%dT%H:%M:%S.%fZ') except ValueError: return datetime.datetime.strptime(state_time, '%Y-%m-%dT%H:%M:%SZ') def DataprocGcloudCommand(self, *args): all_args = ('dataproc',) + tuple(args) cmd = util.GcloudCommand(self, *all_args) cmd.flags['region'] = self.region return cmd def MigrateCrossCloud( self, source_location, destination_location, dest_cloud='AWS' ): """Method to copy data cross cloud using a distributed job on the cluster. Currently the only supported destination cloud is AWS. TODO(user): Add support for other destination clouds. Args: source_location: The source GCS path to migrate. destination_location: The destination path. dest_cloud: The cloud to copy data to. Returns: A dictionary with key 'success' and boolean value set to the status of data migration command. """ if dest_cloud == 'AWS': dest_prefix = 's3a://' if not _S3A_CREDENTIALS.value: raise ValueError( '--dpb_dataproc_s3a_cross_cloud_credentials_cli_profile required ' 'for writing to s3.' ) s3_access_key, s3_secret_key = aws_credentials.GetCredentials( _S3A_CREDENTIALS.value ) properties = { 'fs.s3a.access.key': s3_access_key, 'fs.s3a.secret.key': s3_secret_key, } else: raise ValueError('Unsupported destination cloud.') return self.DistributedCopy( 'gs://' + source_location, dest_prefix + destination_location, properties=properties, ) class GcpDpbDataproc(GcpDpbBaseDataproc): """Object representing a managed GCP Dataproc cluster. Attributes: project: ID of the project. """ CLOUD = provider_info.GCP SERVICE_TYPE = 'dataproc' SUPPORTS_NO_DYNALLOC = True def __init__(self, dpb_service_spec): super().__init__(dpb_service_spec) if self.user_managed and not FLAGS.dpb_service_bucket: self.bucket = self._GetCluster()['config']['tempBucket'] def GetClusterCreateTime(self) -> float | None: """Returns the cluster creation time. On this implementation, the time returned is based on the timestamps reported by the Dataproc API (which is stored in the _cluster_create_time attribute). Returns: A float representing the creation time in seconds or None. """ if self._cluster_create_time is None or self._cluster_ready_time is None: return None return self._cluster_ready_time - self._cluster_create_time def _Create(self): """Creates the cluster.""" cmd = self.DataprocGcloudCommand('clusters', 'create', self.cluster_id) if self.project is not None: cmd.flags['project'] = self.project if self.spec.worker_count: # The number of worker machines in the cluster cmd.flags['num-workers'] = self.spec.worker_count else: cmd.flags['single-node'] = True # Initialize applications on the dataproc cluster if self.spec.applications: logging.info('Include the requested applications') cmd.flags['optional-components'] = ','.join(self.spec.applications) # Enable component gateway for debuggability. Does not impact performance. cmd.flags['enable-component-gateway'] = True # TODO(pclay): stop ignoring spec.master_group? for role in ['worker', 'master']: # Set machine type if self.spec.worker_group.vm_spec.machine_type: self._AddToCmd( cmd, '{}-machine-type'.format(role), self.spec.worker_group.vm_spec.machine_type, ) # Set boot_disk_size if self.spec.worker_group.disk_spec.disk_size: size_in_gb = '{}GB'.format( str(self.spec.worker_group.disk_spec.disk_size) ) self._AddToCmd(cmd, '{}-boot-disk-size'.format(role), size_in_gb) # Set boot_disk_type if self.spec.worker_group.disk_spec.disk_type: self._AddToCmd( cmd, '{}-boot-disk-type'.format(role), self.spec.worker_group.disk_spec.disk_type, ) # Set ssd count if self.spec.worker_group.vm_spec.num_local_ssds: self._AddToCmd( cmd, 'num-{}-local-ssds'.format(role), self.spec.worker_group.vm_spec.num_local_ssds, ) # Set SSD interface if self.spec.worker_group.vm_spec.ssd_interface: self._AddToCmd( cmd, '{}-local-ssd-interface'.format(role), self.spec.worker_group.vm_spec.ssd_interface, ) # Set zone cmd.flags['zone'] = self.dpb_service_zone if self.GetDpbVersion(): cmd.flags['image-version'] = self.GetDpbVersion() if FLAGS.gcp_dataproc_image: cmd.flags['image'] = FLAGS.gcp_dataproc_image # http://cloud/dataproc/docs/guides/profiling#enable_profiling if FLAGS.gcloud_scopes: cmd.flags['scopes'] = ','.join(re.split(r'[,; ]', FLAGS.gcloud_scopes)) if self.GetClusterProperties(): cmd.flags['properties'] = ','.join(self.GetClusterProperties()) if FLAGS.dpb_initialization_actions: cmd.flags['initialization-actions'] = FLAGS.dpb_initialization_actions # Ideally DpbServiceSpec would have a network spec, which we would create to # Resolve the name, but because EMR provisions its own VPC and we are # generally happy using pre-existing networks for Dataproc. Just use the # underlying flag instead. if FLAGS.gce_network_name: cmd.flags['network'] = FLAGS.gce_network_name[0] metadata = util.GetDefaultTags() metadata.update(flag_util.ParseKeyValuePairs(FLAGS.gcp_instance_metadata)) if gcp_flags.SPARK_BIGQUERY_CONNECTOR.value: metadata['SPARK_BQ_CONNECTOR_URL'] = ( gcp_flags.SPARK_BIGQUERY_CONNECTOR.value ) cmd.flags['metadata'] = util.FormatTags(metadata) cmd.flags['labels'] = util.MakeFormattedDefaultTags() timeout = 900 # 15 min stdout, stderr, retcode = cmd.Issue(timeout=timeout, raise_on_failure=False) self._cluster_create_time, self._cluster_ready_time = ( self._ParseClusterCreateTime(stdout) ) if retcode: util.CheckGcloudResponseKnownFailures(stderr, retcode) raise errors.Resource.CreationError(stderr) @classmethod def _ParseClusterCreateTime( cls, stdout: str ) -> Tuple[float | None, float | None]: """Parses the cluster create & ready time from a raw API response.""" try: creation_data = json.loads(stdout) except json.JSONDecodeError: creation_data = {} can_parse = creation_data.get('status', {}).get('state') == 'RUNNING' status_history = creation_data.get('statusHistory', []) can_parse = ( can_parse and len(status_history) == 1 and status_history[0]['state'] == 'CREATING' ) if not can_parse: logging.warning('Unable to parse cluster creation duration.') return None, None creation_start = ( cls._ParseTime(status_history[0]['stateStartTime']) .replace(tzinfo=datetime.timezone.utc) .timestamp() ) creation_end = ( cls._ParseTime(creation_data['status']['stateStartTime']) .replace(tzinfo=datetime.timezone.utc) .timestamp() ) return creation_start, creation_end def _Delete(self): """Deletes the cluster.""" cmd = self.DataprocGcloudCommand('clusters', 'delete', self.cluster_id) stdout, _, _ = cmd.Issue(raise_on_failure=False) try: response = json.loads(stdout) except ValueError: return status = response.get('metadata', {}).get('status', {}) if status.get('state') == 'DONE': delete_done_start_time_str = status.get('stateStartTime') self._cluster_delete_time = ( self._ParseTime(delete_done_start_time_str) .replace(tzinfo=datetime.timezone.utc) .timestamp() ) def _GetCluster(self) -> Dict[str, Any] | None: """Gets the cluster resource in a dict.""" cmd = self.DataprocGcloudCommand('clusters', 'describe', self.cluster_id) stdout, _, retcode = cmd.Issue(raise_on_failure=False) if not retcode: return json.loads(stdout) def _Exists(self): """Checks to see whether the cluster exists.""" return self._GetCluster() is not None def SubmitJob( self, jarfile=None, classname=None, pyspark_file=None, query_file=None, job_poll_interval=None, job_stdout_file=None, job_arguments=None, job_files=None, job_jars=None, job_py_files=None, job_type=None, properties=None, ): """See base class.""" assert job_type args = ['jobs', 'submit', job_type] if job_type == dpb_constants.PYSPARK_JOB_TYPE: args.append(pyspark_file) cmd = self.DataprocGcloudCommand(*args) cmd.flags['cluster'] = self.cluster_id cmd.flags['labels'] = util.MakeFormattedDefaultTags() job_jars = job_jars or [] if classname: if jarfile: # Dataproc does not support both a main class and a main jar so just # make the main jar an additional jar instead. job_jars.append(jarfile) cmd.flags['class'] = classname elif jarfile: cmd.flags['jar'] = jarfile if query_file: cmd.flags['file'] = query_file if job_files: cmd.flags['files'] = ','.join(job_files) if job_jars: cmd.flags['jars'] = ','.join(job_jars) if job_py_files: cmd.flags['py-files'] = ','.join(job_py_files) # Dataproc gives as stdout an object describing job execution. # Its stderr contains a mix of the stderr of the job, and the # stdout of the job. We set the driver log level to FATAL # to suppress those messages, and we can then separate, hopefully # the job standard out from the log messages. cmd.flags['driver-log-levels'] = 'root={}'.format(FLAGS.dpb_log_level) all_properties = self.GetJobProperties() all_properties.update(properties or {}) if all_properties: # For commas: https://cloud.google.com/sdk/gcloud/reference/topic/escaping cmd.flags['properties'] = '^@^' + '@'.join( '{}={}'.format(k, v) for k, v in all_properties.items() ) if job_arguments: cmd.additional_flags = ['--'] + job_arguments stdout, stderr, retcode = cmd.Issue(timeout=None, raise_on_failure=False) if retcode != 0: raise dpb_service.JobSubmissionError(stderr) results = json.loads(stdout) # Otherwise retcode would not have been 0 assert results['status']['state'] == 'DONE' done_time = GcpDpbDataproc._ParseTime(results['status']['stateStartTime']) pending_time = None start_time = None for state in results['statusHistory']: if state['state'] == 'PENDING': pending_time = GcpDpbDataproc._ParseTime(state['stateStartTime']) elif state['state'] == 'RUNNING': start_time = GcpDpbDataproc._ParseTime(state['stateStartTime']) assert pending_time and start_time and done_time return dpb_service.JobResult( run_time=(done_time - start_time).total_seconds(), pending_time=(start_time - pending_time).total_seconds(), ) def _AddToCmd(self, cmd, cmd_property, cmd_value): flag_name = cmd_property cmd.flags[flag_name] = cmd_value def GetHdfsType(self) -> str | None: """Gets human friendly disk type for metric metadata.""" hdfs_type = None if self.spec.worker_group.disk_spec.disk_type: hdfs_type = disk_to_hdfs_map[self.spec.worker_group.disk_spec.disk_type] # Change to SSD if Local SSDs are specified. if self.spec.worker_group.vm_spec.num_local_ssds: hdfs_type = 'Local SSD' return hdfs_type class GcpDpbDpgke(GcpDpbDataproc): """Dataproc on GKE cluster. Extends from GcpDpbDataproc and not GcpDpbBaseDataproc as this represents a cluster with managed infrastructure. """ CLOUD = provider_info.GCP SERVICE_TYPE = 'dataproc_gke' def __init__(self, dpb_service_spec): super().__init__(dpb_service_spec) required_spec_attrs = [ 'gke_cluster_name', 'gke_cluster_nodepools', 'gke_cluster_location', ] missing_attrs = [ attr for attr in required_spec_attrs if not getattr(self.spec, attr, None) ] if missing_attrs: raise errors.Setup.InvalidSetupError( f'{missing_attrs} must be provided for provisioning DPGKE.' ) def _Create(self): """Creates the dpgke virtual cluster.""" cmd = self.DataprocGcloudCommand( 'clusters', 'gke', 'create', self.cluster_id ) cmd.use_alpha_gcloud = True cmd.flags['setup-workload-identity'] = True cmd.flags['gke-cluster'] = self.spec.gke_cluster_name cmd.flags['namespace'] = self.cluster_id # replace ':' field delimiter with '=' since create cluster command # only accept '=' as field delimiter but pkb doesn't allow overriding # spec parameters containing '=' cmd.flags['pools'] = self.spec.gke_cluster_nodepools.replace(':', '=') cmd.flags['gke-cluster-location'] = self.spec.gke_cluster_location if FLAGS.dpb_service_bucket: cmd.flags['staging-bucket'] = FLAGS.dpb_service_bucket if self.project is not None: cmd.flags['project'] = self.project cmd.flags['image-version'] = self.spec.version if self.GetClusterProperties(): cmd.flags['properties'] = ','.join(self.GetClusterProperties()) timeout = 900 # 15 min logging.info( 'Issuing command to create dpgke cluster. Flags %s, Args %s', cmd.flags, cmd.args, ) stdout, stderr, retcode = cmd.Issue(timeout=timeout, raise_on_failure=False) self._cluster_create_time, self._cluster_delete_time = ( self._ParseClusterCreateTime(stdout) ) if retcode: util.CheckGcloudResponseKnownFailures(stderr, retcode) raise errors.Resource.CreationError(stderr) def GetHdfsType(self) -> str | None: """Gets human friendly disk type for metric metadata.""" return None class GcpDpbDataprocServerless( dpb_service.DpbServiceServerlessMixin, GcpDpbBaseDataproc ): """Resource that allows spawning serverless Dataproc Jobs.""" CLOUD = provider_info.GCP SERVICE_TYPE = 'dataproc_serverless' def __init__(self, dpb_service_spec): self._dpb_s8s_disk_type = ( dpb_service_spec.worker_group.disk_spec.disk_type or 'standard' ) self._dpb_s8s_machine_type = ( dpb_service_spec.worker_group.vm_spec.machine_type or 'standard' ) # This is to make it work with dpb_sparksql_benchmark GCP defaults. if self._dpb_s8s_disk_type == 'pd-standard': self._dpb_s8s_disk_type = 'standard' if self._dpb_s8s_machine_type == 'n1-standard-4': self._dpb_s8s_machine_type = 'standard' super().__init__(dpb_service_spec) self._job_counter = 0 self.batch_name = f'{self.cluster_id}-{self._job_counter}' self._FillMetadata() def SubmitJob( self, jarfile=None, classname=None, pyspark_file=None, query_file=None, job_poll_interval=None, job_stdout_file=None, job_arguments=None, job_files=None, job_jars=None, job_py_files=None, job_type=None, properties=None, ): """See base class.""" assert job_type args = ['batches', 'submit', job_type] additional_args = [] if job_type == dpb_constants.PYSPARK_JOB_TYPE: args.append(pyspark_file) cmd = self.DataprocGcloudCommand(*args) self.batch_name = f'{self.cluster_id}-{self._job_counter}' self._job_counter += 1 cmd.flags['batch'] = self.batch_name cmd.flags['labels'] = util.MakeFormattedDefaultTags() job_jars = job_jars or [] if classname: if jarfile: # Dataproc does not support both a main class and a main jar so just # make the main jar an additional jar instead. job_jars.append(jarfile) cmd.flags['class'] = classname elif jarfile: cmd.flags['jar'] = jarfile if query_file: additional_args += query_file if job_files: cmd.flags['files'] = ','.join(job_files) if job_jars: cmd.flags['jars'] = ','.join(job_jars) if job_py_files: cmd.flags['py-files'] = ','.join(job_py_files) if FLAGS.gce_network_name: cmd.flags['network'] = FLAGS.gce_network_name[0] if self.GetDpbVersion(): cmd.flags['version'] = self.GetDpbVersion() if FLAGS.gcp_dataproc_image: cmd.flags['container-image'] = FLAGS.gcp_dataproc_image all_properties = self.GetJobProperties() all_properties.update(properties or {}) if all_properties: # For commas: https://cloud.google.com/sdk/gcloud/reference/topic/escaping cmd.flags['properties'] = '^@^' + '@'.join( '{}={}'.format(k, v) for k, v in all_properties.items() ) if job_arguments: additional_args += ['--'] + job_arguments cmd.additional_flags = additional_args _, stderr, retcode = cmd.Issue(timeout=None, raise_on_failure=False) if retcode != 0: raise dpb_service.JobSubmissionError(stderr) job_stderr = stderr fetch_batch_cmd = self.DataprocGcloudCommand( 'batches', 'describe', self.batch_name ) stdout, stderr, retcode = fetch_batch_cmd.Issue( timeout=None, raise_on_failure=False ) if retcode != 0: raise dpb_service.JobSubmissionError(stderr) results = json.loads(stdout) # Otherwise retcode would not have been 0 assert results['state'] == 'SUCCEEDED' done_time = self._ParseTime(results['stateTime']) pending_time = None start_time = None for state in results['stateHistory']: if state['state'] == 'PENDING': pending_time = self._ParseTime(state['stateStartTime']) elif state['state'] == 'RUNNING': start_time = self._ParseTime(state['stateStartTime']) assert pending_time and start_time and done_time return dpb_service.JobResult( run_time=(done_time - start_time).total_seconds(), pending_time=(start_time - pending_time).total_seconds(), fetch_output_fn=lambda: (None, job_stderr), ) def GetJobProperties(self) -> Dict[str, str]: result = {} if self.spec.dataproc_serverless_core_count: result['spark.executor.cores'] = self.spec.dataproc_serverless_core_count result['spark.driver.cores'] = self.spec.dataproc_serverless_core_count if self.spec.dataproc_serverless_initial_executors: result['spark.executor.instances'] = ( self.spec.dataproc_serverless_initial_executors ) if self.spec.dataproc_serverless_min_executors: result['spark.dynamicAllocation.minExecutors'] = ( self.spec.dataproc_serverless_min_executors ) if self.spec.dataproc_serverless_max_executors: result['spark.dynamicAllocation.maxExecutors'] = ( self.spec.dataproc_serverless_max_executors ) if self.spec.worker_group.disk_spec.disk_size: result['spark.dataproc.driver.disk.size'] = ( f'{self.spec.worker_group.disk_spec.disk_size}g' ) result['spark.dataproc.executor.disk.size'] = ( f'{self.spec.worker_group.disk_spec.disk_size}g' ) if self.spec.worker_group.disk_spec.disk_type: result['spark.dataproc.driver.disk.tier'] = self._dpb_s8s_disk_type result['spark.dataproc.executor.disk.tier'] = self._dpb_s8s_disk_type if self.spec.worker_group.vm_spec.machine_type: result['spark.dataproc.driver.compute.tier'] = self._dpb_s8s_machine_type result['spark.dataproc.executor.compute.tier'] = ( self._dpb_s8s_machine_type ) if self.spec.dataproc_serverless_memory: result['spark.driver.memory'] = f'{self.spec.dataproc_serverless_memory}m' result['spark.executor.memory'] = ( f'{self.spec.dataproc_serverless_memory}m' ) if self.spec.dataproc_serverless_driver_memory: result['spark.driver.memory'] = ( f'{self.spec.dataproc_serverless_driver_memory}m' ) if self.spec.dataproc_serverless_executor_memory: result['spark.executor.memory'] = ( f'{self.spec.dataproc_serverless_executor_memory}m' ) if self.spec.dataproc_serverless_off_heap_memory: result['spark.memory.offHeap.size'] = ( f'{self.spec.dataproc_serverless_off_heap_memory}m' ) if self.spec.dataproc_serverless_memory_overhead: result['spark.driver.memoryOverhead'] = ( f'{self.spec.dataproc_serverless_memory_overhead}m' ) result['spark.executor.memoryOverhead'] = ( f'{self.spec.dataproc_serverless_memory_overhead}m' ) if self.spec.dataproc_serverless_runtime_engine == 'native': result['spark.dataproc.runtimeEngine'] = 'native' result.update(super().GetJobProperties()) return result def _FillMetadata(self) -> None: cluster_shape_tier = ( '-premium' if self._dpb_s8s_machine_type == 'premium' else '' ) cluster_shape_cores = self.spec.dataproc_serverless_core_count or 'default' cluster_shape = ( f'dataproc-serverless{cluster_shape_tier}-{cluster_shape_cores}' ) initial_executors = ( self.spec.dataproc_serverless_initial_executors or 'default' ) min_executors = self.spec.dataproc_serverless_min_executors or 'default' max_executors = self.spec.dataproc_serverless_max_executors or 'default' cluster_size = None if initial_executors == min_executors == max_executors: cluster_size = initial_executors memory_per_node = ( self.spec.dataproc_serverless_executor_memory or self.spec.dataproc_serverless_memory or 'default' ) self.metadata = { 'dpb_service': self.metadata['dpb_service'], 'dpb_version': self.metadata['dpb_version'], 'dpb_service_version': self.metadata['dpb_service_version'], 'dpb_batch_id': self.metadata['dpb_cluster_id'], 'dpb_cluster_shape': cluster_shape, 'dpb_cluster_size': cluster_size, 'dpb_cluster_min_executors': min_executors, 'dpb_cluster_max_executors': max_executors, 'dpb_cluster_initial_executors': initial_executors, 'dpb_cores_per_node': ( self.spec.dataproc_serverless_core_count or 'default' ), 'dpb_memory_per_node': memory_per_node, 'dpb_memory_overhead_per_node': ( self.spec.dataproc_serverless_memory_overhead or 'default' ), 'dpb_off_heap_memory_per_node': ( self.spec.dataproc_serverless_off_heap_memory or 'default' ), 'dpb_hdfs_type': self.metadata['dpb_hdfs_type'], 'dpb_disk_size': self.metadata['dpb_disk_size'], 'dpb_service_zone': self.metadata['dpb_service_zone'], 'dpb_job_properties': self.metadata['dpb_job_properties'], 'dpb_runtime_engine': self.spec.dataproc_serverless_runtime_engine, } def CalculateLastJobCosts(self) -> dpb_service.JobCosts: fetch_batch_cmd = self.DataprocGcloudCommand( 'batches', 'describe', self.batch_name ) @vm_util.Retry( timeout=180, poll_interval=15, fuzz=0, retryable_exceptions=(MetricNotReadyError,), ) def FetchBatchResults(): stdout, _, _ = fetch_batch_cmd.Issue(timeout=None, raise_on_failure=False) results = json.loads(stdout) # If the approximate usage data is not available, sleep and retry if ( 'runtimeInfo' not in results or 'approximateUsage' not in results['runtimeInfo'] ): raise MetricNotReadyError('Usage metric is not ready') return results # Pricing may vary based on region and tier. Only some regions available. usd_per_milli_dcu_sec = ( _DATAPROC_SERVERLESS_PRICES.get(self._dpb_s8s_machine_type, {}) .get(self.region, {}) .get('usd_per_milli_dcu_sec') ) usd_per_shuffle_storage_gb_sec = ( _DATAPROC_SERVERLESS_PRICES.get(self._dpb_s8s_disk_type, {}) .get(self.region, {}) .get('usd_per_shuffle_storage_gb_sec') ) if usd_per_milli_dcu_sec is None or usd_per_shuffle_storage_gb_sec is None: return dpb_service.JobCosts() results = FetchBatchResults() milli_dcu_seconds = int( results['runtimeInfo']['approximateUsage']['milliDcuSeconds'] ) shuffle_storage_gb_seconds = int( results['runtimeInfo']['approximateUsage']['shuffleStorageGbSeconds'] ) compute_cost = usd_per_milli_dcu_sec * milli_dcu_seconds storage_cost = usd_per_shuffle_storage_gb_sec * shuffle_storage_gb_seconds return dpb_service.JobCosts( total_cost=compute_cost + storage_cost, compute_cost=compute_cost, storage_cost=storage_cost, compute_units_used=milli_dcu_seconds / 1000 / 3600, storage_units_used=shuffle_storage_gb_seconds / 3600, compute_unit_cost=usd_per_milli_dcu_sec * 1000 * 3600, storage_unit_cost=usd_per_shuffle_storage_gb_sec * 3600, compute_unit_name='DCU*hr', storage_unit_name='GB*hr', ) def GetHdfsType(self) -> str | None: """Gets human friendly disk type for metric metadata.""" try: return serverless_disk_to_hdfs_map[self._dpb_s8s_disk_type] except KeyError: raise errors.Setup.InvalidSetupError( f'Invalid disk_type={self._dpb_s8s_disk_type!r} in spec.' ) from None class GcpDpbDataprocFlink(GcpDpbDataproc): """Dataproc with Flink component. Extends from GcpDpbDataproc and not GcpDpbBaseDataproc as this represents a cluster with managed infrastructure. """ CLOUD = provider_info.GCP SERVICE_TYPE = 'dataproc_flink' def _Create(self): # Make flink component installed when using Dataproc for flink jobs if self.spec.applications: self.spec.applications.append('flink') else: self.spec.applications = ['flink'] super()._Create() self.ExecuteOnMaster(data.ResourcePath(DATAPROC_FLINK_INIT_SCRIPT), []) def SubmitJob( self, jarfile=None, classname=None, pyspark_file=None, query_file=None, job_poll_interval=None, job_stdout_file=None, job_arguments=None, job_files=None, job_jars=None, job_py_files=None, job_type=None, properties=None, ): """See base class.""" assert job_type in [ dpb_constants.FLINK_JOB_TYPE, dpb_constants.BEAM_JOB_TYPE, ], 'Unsupported job type {}'.format(job_type) logging.info('Running presubmit script...') start_time = datetime.datetime.now() self.ExecuteOnMaster( data.ResourcePath(DATAPROC_FLINK_PRESUBMIT_SCRIPT), [jarfile] ) presubmit_done_time = datetime.datetime.now() logging.info('Submitting beam jobs with flink component enabled.') job_script_args = [] job_script_args.append('-c {}'.format(classname)) job_script_args.append('--') if job_type == dpb_constants.BEAM_JOB_TYPE: job_script_args.append('--runner=FlinkRunner') job_script_args.extend(job_arguments) else: job_script_args.extend([arg.replace('=', ' ') for arg in job_arguments]) self.ExecuteOnMaster( data.ResourcePath(DATAPROC_FLINK_TRIGGER_SCRIPT), job_script_args ) done_time = datetime.datetime.now() logging.info('Flink job done.') return dpb_service.JobResult( run_time=(done_time - presubmit_done_time).total_seconds(), pending_time=(presubmit_done_time - start_time).total_seconds(), ) def ExecuteOnMaster(self, script_path, script_args): master_name = self.cluster_id + '-m' script_name = os.path.basename(script_path) if FLAGS.gcp_internal_ip: scp_cmd = ['gcloud', 'beta', 'compute', 'scp', '--internal-ip'] else: scp_cmd = ['gcloud', 'compute', 'scp'] if self.project is not None: scp_cmd += ['--project', self.project] scp_cmd += [ '--zone', self.dpb_service_zone, '--quiet', script_path, 'pkb@' + master_name + ':/tmp/' + script_name, ] vm_util.IssueCommand(scp_cmd) ssh_cmd = ['gcloud', 'compute', 'ssh'] if self.project is not None: ssh_cmd += ['--project', self.project] if FLAGS.gcp_internal_ip: ssh_cmd += ['--internal-ip'] ssh_cmd += [ '--zone=' + self.dpb_service_zone, '--quiet', 'pkb@' + master_name, '--', 'chmod +x /tmp/' + script_name + '; sudo /tmp/' + script_name + ' ' + ' '.join(script_args), ] vm_util.IssueCommand(ssh_cmd, timeout=None)