in perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc.py [0:0]
def _Create(self):
"""Creates the cluster."""
cmd = self.DataprocGcloudCommand('clusters', 'create', self.cluster_id)
if self.project is not None:
cmd.flags['project'] = self.project
if self.spec.worker_count:
# The number of worker machines in the cluster
cmd.flags['num-workers'] = self.spec.worker_count
else:
cmd.flags['single-node'] = True
# Initialize applications on the dataproc cluster
if self.spec.applications:
logging.info('Include the requested applications')
cmd.flags['optional-components'] = ','.join(self.spec.applications)
# Enable component gateway for debuggability. Does not impact performance.
cmd.flags['enable-component-gateway'] = True
# TODO(pclay): stop ignoring spec.master_group?
for role in ['worker', 'master']:
# Set machine type
if self.spec.worker_group.vm_spec.machine_type:
self._AddToCmd(
cmd,
'{}-machine-type'.format(role),
self.spec.worker_group.vm_spec.machine_type,
)
# Set boot_disk_size
if self.spec.worker_group.disk_spec.disk_size:
size_in_gb = '{}GB'.format(
str(self.spec.worker_group.disk_spec.disk_size)
)
self._AddToCmd(cmd, '{}-boot-disk-size'.format(role), size_in_gb)
# Set boot_disk_type
if self.spec.worker_group.disk_spec.disk_type:
self._AddToCmd(
cmd,
'{}-boot-disk-type'.format(role),
self.spec.worker_group.disk_spec.disk_type,
)
# Set ssd count
if self.spec.worker_group.vm_spec.num_local_ssds:
self._AddToCmd(
cmd,
'num-{}-local-ssds'.format(role),
self.spec.worker_group.vm_spec.num_local_ssds,
)
# Set SSD interface
if self.spec.worker_group.vm_spec.ssd_interface:
self._AddToCmd(
cmd,
'{}-local-ssd-interface'.format(role),
self.spec.worker_group.vm_spec.ssd_interface,
)
# Set zone
cmd.flags['zone'] = self.dpb_service_zone
if self.GetDpbVersion():
cmd.flags['image-version'] = self.GetDpbVersion()
if FLAGS.gcp_dataproc_image:
cmd.flags['image'] = FLAGS.gcp_dataproc_image
# http://cloud/dataproc/docs/guides/profiling#enable_profiling
if FLAGS.gcloud_scopes:
cmd.flags['scopes'] = ','.join(re.split(r'[,; ]', FLAGS.gcloud_scopes))
if self.GetClusterProperties():
cmd.flags['properties'] = ','.join(self.GetClusterProperties())
if FLAGS.dpb_initialization_actions:
cmd.flags['initialization-actions'] = FLAGS.dpb_initialization_actions
# Ideally DpbServiceSpec would have a network spec, which we would create to
# Resolve the name, but because EMR provisions its own VPC and we are
# generally happy using pre-existing networks for Dataproc. Just use the
# underlying flag instead.
if FLAGS.gce_network_name:
cmd.flags['network'] = FLAGS.gce_network_name[0]
metadata = util.GetDefaultTags()
metadata.update(flag_util.ParseKeyValuePairs(FLAGS.gcp_instance_metadata))
if gcp_flags.SPARK_BIGQUERY_CONNECTOR.value:
metadata['SPARK_BQ_CONNECTOR_URL'] = (
gcp_flags.SPARK_BIGQUERY_CONNECTOR.value
)
cmd.flags['metadata'] = util.FormatTags(metadata)
cmd.flags['labels'] = util.MakeFormattedDefaultTags()
timeout = 900 # 15 min
stdout, stderr, retcode = cmd.Issue(timeout=timeout, raise_on_failure=False)
self._cluster_create_time, self._cluster_ready_time = (
self._ParseClusterCreateTime(stdout)
)
if retcode:
util.CheckGcloudResponseKnownFailures(stderr, retcode)
raise errors.Resource.CreationError(stderr)