def _Create()

in gslib/commands/notification.py [0:0]


  def _Create(self):
    self.CheckArguments()

    # User-specified options
    pubsub_topic = None
    payload_format = None
    custom_attributes = {}
    event_types = []
    object_name_prefix = None
    should_setup_topic = True

    if self.sub_opts:
      for o, a in self.sub_opts:
        if o == '-e':
          event_types.append(a)
        elif o == '-f':
          payload_format = a
        elif o == '-m':
          if ':' not in a:
            raise CommandException(
                'Custom attributes specified with -m should be of the form '
                'key:value')
          key, value = a.split(':', 1)
          custom_attributes[key] = value
        elif o == '-p':
          object_name_prefix = a
        elif o == '-s':
          should_setup_topic = False
        elif o == '-t':
          pubsub_topic = a

    if payload_format not in PAYLOAD_FORMAT_MAP:
      raise CommandException(
          "Must provide a payload format with -f of either 'json' or 'none'")
    payload_format = PAYLOAD_FORMAT_MAP[payload_format]

    bucket_arg = self.args[-1]

    bucket_url = StorageUrlFromString(bucket_arg)
    if not bucket_url.IsCloudUrl() or not bucket_url.IsBucket():
      raise CommandException(
          "%s %s requires a GCS bucket name, but got '%s'" %
          (self.command_name, self.subcommand_name, bucket_arg))
    if bucket_url.scheme != 'gs':
      raise CommandException(
          'The %s command can only be used with gs:// bucket URLs.' %
          self.command_name)
    bucket_name = bucket_url.bucket_name
    self.logger.debug('Creating notification for bucket %s', bucket_url)

    # Find the project this bucket belongs to
    bucket_metadata = self.gsutil_api.GetBucket(bucket_name,
                                                fields=['projectNumber'],
                                                provider=bucket_url.scheme)
    bucket_project_number = bucket_metadata.projectNumber

    # If not specified, choose a sensible default for the Cloud Pub/Sub topic
    # name.
    if not pubsub_topic:
      pubsub_topic = 'projects/%s/topics/%s' % (PopulateProjectId(None),
                                                bucket_name)
    if not pubsub_topic.startswith('projects/'):
      # If a user picks a topic ID (mytopic) but doesn't pass the whole name (
      # projects/my-project/topics/mytopic ), pick a default project.
      pubsub_topic = 'projects/%s/topics/%s' % (PopulateProjectId(None),
                                                pubsub_topic)
    self.logger.debug('Using Cloud Pub/Sub topic %s', pubsub_topic)

    just_modified_topic_permissions = False
    if should_setup_topic:
      # Ask GCS for the email address that represents GCS's permission to
      # publish to a Cloud Pub/Sub topic from this project.
      service_account = self.gsutil_api.GetProjectServiceAccount(
          bucket_project_number, provider=bucket_url.scheme).email_address
      self.logger.debug('Service account for project %d: %s',
                        bucket_project_number, service_account)
      just_modified_topic_permissions = self._CreateTopic(
          pubsub_topic, service_account)

    for attempt_number in range(0, 2):
      try:
        create_response = self.gsutil_api.CreateNotificationConfig(
            bucket_name,
            pubsub_topic=pubsub_topic,
            payload_format=payload_format,
            custom_attributes=custom_attributes,
            event_types=event_types if event_types else None,
            object_name_prefix=object_name_prefix,
            provider=bucket_url.scheme)
        break
      except PublishPermissionDeniedException:
        if attempt_number == 0 and just_modified_topic_permissions:
          # If we have just set the IAM policy, it may take up to 10 seconds to
          # take effect.
          self.logger.info(
              'Retrying create notification in 10 seconds '
              '(new permissions may take up to 10 seconds to take effect.)')
          time.sleep(10)
        else:
          raise

    notification_name = 'projects/_/buckets/%s/notificationConfigs/%s' % (
        bucket_name, create_response.id)
    self.logger.info('Created notification config %s', notification_name)

    return 0