def _Create()

in perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc.py [0:0]


  def _Create(self):
    """Creates the cluster."""
    cmd = self.DataprocGcloudCommand('clusters', 'create', self.cluster_id)
    if self.project is not None:
      cmd.flags['project'] = self.project

    if self.spec.worker_count:
      # The number of worker machines in the cluster
      cmd.flags['num-workers'] = self.spec.worker_count
    else:
      cmd.flags['single-node'] = True

    # Initialize applications on the dataproc cluster
    if self.spec.applications:
      logging.info('Include the requested applications')
      cmd.flags['optional-components'] = ','.join(self.spec.applications)

    # Enable component gateway for debuggability. Does not impact performance.
    cmd.flags['enable-component-gateway'] = True

    # TODO(pclay): stop ignoring spec.master_group?
    for role in ['worker', 'master']:
      # Set machine type
      if self.spec.worker_group.vm_spec.machine_type:
        self._AddToCmd(
            cmd,
            '{}-machine-type'.format(role),
            self.spec.worker_group.vm_spec.machine_type,
        )
      # Set boot_disk_size
      if self.spec.worker_group.disk_spec.disk_size:
        size_in_gb = '{}GB'.format(
            str(self.spec.worker_group.disk_spec.disk_size)
        )
        self._AddToCmd(cmd, '{}-boot-disk-size'.format(role), size_in_gb)
      # Set boot_disk_type
      if self.spec.worker_group.disk_spec.disk_type:
        self._AddToCmd(
            cmd,
            '{}-boot-disk-type'.format(role),
            self.spec.worker_group.disk_spec.disk_type,
        )
      # Set ssd count
      if self.spec.worker_group.vm_spec.num_local_ssds:
        self._AddToCmd(
            cmd,
            'num-{}-local-ssds'.format(role),
            self.spec.worker_group.vm_spec.num_local_ssds,
        )
      # Set SSD interface
      if self.spec.worker_group.vm_spec.ssd_interface:
        self._AddToCmd(
            cmd,
            '{}-local-ssd-interface'.format(role),
            self.spec.worker_group.vm_spec.ssd_interface,
        )
    # Set zone
    cmd.flags['zone'] = self.dpb_service_zone
    if self.GetDpbVersion():
      cmd.flags['image-version'] = self.GetDpbVersion()

    if FLAGS.gcp_dataproc_image:
      cmd.flags['image'] = FLAGS.gcp_dataproc_image

    # http://cloud/dataproc/docs/guides/profiling#enable_profiling
    if FLAGS.gcloud_scopes:
      cmd.flags['scopes'] = ','.join(re.split(r'[,; ]', FLAGS.gcloud_scopes))

    if self.GetClusterProperties():
      cmd.flags['properties'] = ','.join(self.GetClusterProperties())

    if FLAGS.dpb_initialization_actions:
      cmd.flags['initialization-actions'] = FLAGS.dpb_initialization_actions

    # Ideally DpbServiceSpec would have a network spec, which we would create to
    # Resolve the name, but because EMR provisions its own VPC and we are
    # generally happy using pre-existing networks for Dataproc. Just use the
    # underlying flag instead.
    if FLAGS.gce_network_name:
      cmd.flags['network'] = FLAGS.gce_network_name[0]

    metadata = util.GetDefaultTags()
    metadata.update(flag_util.ParseKeyValuePairs(FLAGS.gcp_instance_metadata))
    if gcp_flags.SPARK_BIGQUERY_CONNECTOR.value:
      metadata['SPARK_BQ_CONNECTOR_URL'] = (
          gcp_flags.SPARK_BIGQUERY_CONNECTOR.value
      )
    cmd.flags['metadata'] = util.FormatTags(metadata)
    cmd.flags['labels'] = util.MakeFormattedDefaultTags()
    timeout = 900  # 15 min
    stdout, stderr, retcode = cmd.Issue(timeout=timeout, raise_on_failure=False)
    self._cluster_create_time, self._cluster_ready_time = (
        self._ParseClusterCreateTime(stdout)
    )
    if retcode:
      util.CheckGcloudResponseKnownFailures(stderr, retcode)
      raise errors.Resource.CreationError(stderr)