def CopyFunc()

in gslib/commands/cp.py [0:0]


  def CopyFunc(self, copy_object_info, thread_state=None, preserve_posix=False):
    """Worker function for performing the actual copy (and rm, for mv)."""
    gsutil_api = GetCloudApiInstance(self, thread_state=thread_state)

    copy_helper_opts = copy_helper.GetCopyHelperOpts()
    if copy_helper_opts.perform_mv:
      cmd_name = 'mv'
    else:
      cmd_name = self.command_name
    src_url = copy_object_info.source_storage_url
    exp_src_url = copy_object_info.expanded_storage_url
    src_url_names_container = copy_object_info.names_container
    have_multiple_srcs = copy_object_info.is_multi_source_request

    if src_url.IsCloudUrl() and src_url.IsProvider():
      raise CommandException(
          'The %s command does not allow provider-only source URLs (%s)' %
          (cmd_name, src_url))
    if preserve_posix and src_url.IsFileUrl() and src_url.IsStream():
      raise CommandException('Cannot preserve POSIX attributes with a stream.')
    if self.parallel_operations and src_url.IsFileUrl() and src_url.IsStream():
      raise CommandException(
          'Cannot upload from a stream when using gsutil -m option.')
    if have_multiple_srcs:
      copy_helper.InsistDstUrlNamesContainer(
          copy_object_info.exp_dst_url,
          copy_object_info.have_existing_dst_container, cmd_name)

    # Various GUI tools (like the GCS web console) create placeholder objects
    # ending with '/' when the user creates an empty directory. Normally these
    # tools should delete those placeholders once objects have been written
    # "under" the directory, but sometimes the placeholders are left around. We
    # need to filter them out here, otherwise if the user tries to rsync from
    # GCS to a local directory it will result in a directory/file conflict
    # (e.g., trying to download an object called "mydata/" where the local
    # directory "mydata" exists).
    if IsCloudSubdirPlaceholder(exp_src_url):
      # We used to output the message 'Skipping cloud sub-directory placeholder
      # object...' but we no longer do so because it caused customer confusion.
      return

    if copy_helper_opts.use_manifest and self.manifest.WasSuccessful(
        exp_src_url.url_string):
      return

    if copy_helper_opts.perform_mv and copy_object_info.names_container:
      # Use recursion_requested when performing name expansion for the
      # directory mv case so we can determine if any of the source URLs are
      # directories (and then use cp -r and rm -r to perform the move, to
      # match the behavior of Linux mv (which when moving a directory moves
      # all the contained files).
      self.recursion_requested = True

    if (copy_object_info.exp_dst_url.IsFileUrl() and
        not os.path.exists(copy_object_info.exp_dst_url.object_name) and
        have_multiple_srcs):

      try:
        os.makedirs(copy_object_info.exp_dst_url.object_name)
      except OSError as e:
        if e.errno != errno.EEXIST:
          raise

    dst_url = copy_helper.ConstructDstUrl(
        src_url,
        exp_src_url,
        src_url_names_container,
        have_multiple_srcs,
        copy_object_info.is_multi_top_level_source_request,
        copy_object_info.exp_dst_url,
        copy_object_info.have_existing_dst_container,
        self.recursion_requested,
        preserve_posix=preserve_posix)
    dst_url = copy_helper.FixWindowsNaming(src_url, dst_url)

    copy_helper.CheckForDirFileConflict(exp_src_url, dst_url)
    if copy_helper.SrcDstSame(exp_src_url, dst_url):
      raise CommandException('%s: "%s" and "%s" are the same file - '
                             'abort.' % (cmd_name, exp_src_url, dst_url))

    if dst_url.IsCloudUrl() and dst_url.HasGeneration():
      raise CommandException('%s: a version-specific URL\n(%s)\ncannot be '
                             'the destination for gsutil cp - abort.' %
                             (cmd_name, dst_url))

    if not dst_url.IsCloudUrl() and copy_helper_opts.dest_storage_class:
      raise CommandException('Cannot specify storage class for a non-cloud '
                             'destination: %s' % dst_url)

    src_obj_metadata = None
    if copy_object_info.expanded_result:
      src_obj_metadata = encoding.JsonToMessage(
          apitools_messages.Object, copy_object_info.expanded_result)

    if src_url.IsFileUrl() and preserve_posix:
      if not src_obj_metadata:
        src_obj_metadata = apitools_messages.Object()
      mode, _, _, _, uid, gid, _, atime, mtime, _ = os.stat(
          exp_src_url.object_name)
      mode = ConvertModeToBase8(mode)
      posix_attrs = POSIXAttributes(atime=atime,
                                    mtime=mtime,
                                    uid=uid,
                                    gid=gid,
                                    mode=mode)
      custom_metadata = apitools_messages.Object.MetadataValue(
          additionalProperties=[])
      SerializeFileAttributesToObjectMetadata(posix_attrs,
                                              custom_metadata,
                                              preserve_posix=preserve_posix)
      src_obj_metadata.metadata = custom_metadata

    if src_obj_metadata and dst_url.IsFileUrl():
      posix_attrs = DeserializeFileAttributesFromObjectMetadata(
          src_obj_metadata, src_url.url_string)
      mode = posix_attrs.mode.permissions
      valid, err = ValidateFilePermissionAccess(src_url.url_string,
                                                uid=posix_attrs.uid,
                                                gid=posix_attrs.gid,
                                                mode=mode)
      if preserve_posix and not valid:
        logging.getLogger().critical(err)
        raise CommandException('This sync will orphan file(s), please fix their'
                               ' permissions before trying again.')

    bytes_transferred = 0
    try:
      if copy_helper_opts.use_manifest:
        self.manifest.Initialize(exp_src_url.url_string, dst_url.url_string)

      if (self.recursion_requested and
          copy_object_info.exp_dst_url.object_name and dst_url.IsFileUrl()):

        # exp_dst_url is the wildcard-expanded path passed by the user:
        #   exp_dst_url => ~/dir
        #   container => /usr/name/dir
        container = os.path.abspath(copy_object_info.exp_dst_url.object_name)

        # dst_url holds the complete path of the object's destination:
        #   dst_url => /usr/name/dir/../file.txt
        #   abspath => /usr/name/file.txt
        #
        # Taking the common path of this and container yields: /usr/name,
        # which does not start with container when the inclusion of '..' strings
        # results in a copy outside of the container.
        if not os.path.commonpath([
            container, os.path.abspath(dst_url.object_name)
        ]).startswith(container):
          self.logger.warn(
              'Skipping copy of source URL %s because it would be copied '
              'outside the expected destination directory: %s.' %
              (exp_src_url, container))
          if copy_helper_opts.use_manifest:
            self.manifest.SetResult(
                exp_src_url.url_string, 0, 'skip',
                'Would have copied outside the destination directory.')
          return

      _, bytes_transferred, result_url, md5 = copy_helper.PerformCopy(
          self.logger,
          exp_src_url,
          dst_url,
          gsutil_api,
          self,
          _CopyExceptionHandler,
          src_obj_metadata=src_obj_metadata,
          allow_splitting=True,
          headers=self.headers,
          manifest=self.manifest,
          gzip_encoded=self.gzip_encoded,
          gzip_exts=self.gzip_exts,
          preserve_posix=preserve_posix,
          use_stet=self.use_stet)
      if copy_helper_opts.use_manifest:
        if md5:
          self.manifest.Set(exp_src_url.url_string, 'md5', md5)
        self.manifest.SetResult(exp_src_url.url_string, bytes_transferred, 'OK')
      if copy_helper_opts.print_ver:
        # Some cases don't return a version-specific URL (e.g., if destination
        # is a file).
        self.logger.info('Created: %s', result_url)
    except ItemExistsError:
      message = 'Skipping existing item: %s' % dst_url
      self.logger.info(message)
      if copy_helper_opts.use_manifest:
        self.manifest.SetResult(exp_src_url.url_string, 0, 'skip', message)
    except SkipUnsupportedObjectError as e:
      message = ('Skipping item %s with unsupported object type %s' %
                 (exp_src_url.url_string, e.unsupported_type))
      self.logger.info(message)
      if copy_helper_opts.use_manifest:
        self.manifest.SetResult(exp_src_url.url_string, 0, 'skip', message)
    except copy_helper.FileConcurrencySkipError as e:
      self.logger.warn(
          'Skipping copy of source URL %s because destination URL '
          '%s is already being copied by another gsutil process '
          'or thread (did you specify the same source URL twice?) ' %
          (src_url, dst_url))
    except Exception as e:  # pylint: disable=broad-except
      if (copy_helper_opts.no_clobber and
          copy_helper.IsNoClobberServerException(e)):
        message = 'Rejected (noclobber): %s' % dst_url
        self.logger.info(message)
        if copy_helper_opts.use_manifest:
          self.manifest.SetResult(exp_src_url.url_string, 0, 'skip', message)
      elif self.continue_on_error:
        message = 'Error copying %s: %s' % (src_url, str(e))
        self.op_failure_count += 1
        self.logger.error(message)
        if copy_helper_opts.use_manifest:
          self.manifest.SetResult(exp_src_url.url_string, 0, 'error',
                                  RemoveCRLFFromString(message))
      else:
        if copy_helper_opts.use_manifest:
          self.manifest.SetResult(exp_src_url.url_string, 0, 'error', str(e))
        raise
    else:
      if copy_helper_opts.perform_mv:
        self.logger.info('Removing %s...', exp_src_url)
        if exp_src_url.IsCloudUrl():
          gsutil_api.DeleteObject(exp_src_url.bucket_name,
                                  exp_src_url.object_name,
                                  generation=exp_src_url.generation,
                                  provider=exp_src_url.scheme)
        else:
          os.unlink(exp_src_url.object_name)

    with self.stats_lock:
      # TODO: Remove stats_lock; we should be able to calculate bytes
      # transferred from StatusMessages posted by operations within PerformCopy.
      self.total_bytes_transferred += bytes_transferred