def copy()

in eden/scm/edenscm/mercurial/cmdutil.py [0:0]


def copy(ui, repo, pats, opts, rename=False):
    # called with the repo lock held
    #
    # hgsep => pathname that uses "/" to separate directories
    # ossep => pathname that uses os.sep to separate directories
    cwd = repo.getcwd()
    targets = {}
    after = opts.get("after")
    dryrun = opts.get("dry_run")
    wctx = repo[None]

    def walkpat(pat):
        srcs = []
        if after:
            badstates = "?"
        else:
            badstates = "?r"
        m = scmutil.match(wctx, [pat], opts, globbed=True)
        for abs in wctx.walk(m):
            state = repo.dirstate[abs]
            rel = m.rel(abs)
            exact = m.exact(abs)
            if state in badstates:
                if exact and state == "?":
                    ui.warn(_("%s: not copying - file is not managed\n") % rel)
                if exact and state == "r":
                    ui.warn(
                        _("%s: not copying - file has been marked for" " remove\n")
                        % rel
                    )
                continue
            # abs: hgsep
            # rel: ossep
            srcs.append((abs, rel, exact))
        return srcs

    # abssrc: hgsep
    # relsrc: ossep
    # otarget: ossep
    def copyfile(abssrc, relsrc, otarget, exact):
        abstarget = pathutil.canonpath(repo.root, cwd, otarget)
        if "/" in abstarget:
            # We cannot normalize abstarget itself, this would prevent
            # case only renames, like a => A.
            abspath, absname = abstarget.rsplit("/", 1)
            abstarget = repo.dirstate.normalize(abspath) + "/" + absname
        reltarget = repo.pathto(abstarget, cwd)
        target = repo.wjoin(abstarget)
        src = repo.wjoin(abssrc)
        state = repo.dirstate[abstarget]

        scmutil.checkportable(ui, abstarget)

        # check for collisions
        prevsrc = targets.get(abstarget)
        if prevsrc is not None:
            ui.warn(
                _("%s: not overwriting - %s collides with %s\n")
                % (reltarget, repo.pathto(abssrc, cwd), repo.pathto(prevsrc, cwd))
            )
            return

        # check for overwrites
        exists = os.path.lexists(target)
        samefile = False
        if exists and abssrc != abstarget:
            if repo.dirstate.normalize(abssrc) == repo.dirstate.normalize(abstarget):
                if not rename:
                    ui.warn(_("%s: can't copy - same file\n") % reltarget)
                    return
                exists = False
                samefile = True

        if not after and exists or after and state in "mn":
            if not opts["force"]:
                if state in "mn":
                    msg = _("%s: not overwriting - file already committed\n")
                    if after:
                        flags = "--after --force"
                    else:
                        flags = "--force"
                    if rename:
                        hint = (
                            _(
                                "(hg rename %s to replace the file by "
                                "recording a rename)\n"
                            )
                            % flags
                        )
                    else:
                        hint = (
                            _(
                                "(hg copy %s to replace the file by "
                                "recording a copy)\n"
                            )
                            % flags
                        )
                else:
                    msg = _("%s: not overwriting - file exists\n")
                    if rename:
                        hint = _("(hg rename --after to record the rename)\n")
                    else:
                        hint = _("(hg copy --after to record the copy)\n")
                ui.warn(msg % reltarget)
                ui.warn(hint)
                return

        if after:
            if not exists:
                if rename:
                    ui.warn(
                        _("%s: not recording move - %s does not exist\n")
                        % (relsrc, reltarget)
                    )
                else:
                    ui.warn(
                        _("%s: not recording copy - %s does not exist\n")
                        % (relsrc, reltarget)
                    )
                return
        elif not dryrun:
            try:
                if exists:
                    os.unlink(target)
                targetdir = os.path.dirname(target) or "."
                if not os.path.isdir(targetdir):
                    os.makedirs(targetdir)
                if samefile:
                    tmp = target + "~hgrename"
                    os.rename(src, tmp)
                    os.rename(tmp, target)
                else:
                    util.copyfile(src, target)
                srcexists = True
            except IOError as inst:
                if inst.errno == errno.ENOENT:
                    ui.warn(_("%s: deleted in working directory\n") % relsrc)
                    srcexists = False
                else:
                    ui.warn(
                        _("%s: cannot copy - %s\n")
                        % (relsrc, encoding.strtolocal(inst.strerror))
                    )
                    return True  # report a failure

        if ui.verbose or not exact:
            if rename:
                ui.status(_("moving %s to %s\n") % (relsrc, reltarget))
            else:
                ui.status(_("copying %s to %s\n") % (relsrc, reltarget))

        targets[abstarget] = abssrc

        # fix up dirstate
        scmutil.dirstatecopy(ui, repo, wctx, abssrc, abstarget, dryrun=dryrun, cwd=cwd)
        if rename and not dryrun:
            if not after and srcexists and not samefile:
                repo.wvfs.unlinkpath(abssrc)
            wctx.forget([abssrc])

    # pat: ossep
    # dest ossep
    # srcs: list of (hgsep, hgsep, ossep, bool)
    # return: function that takes hgsep and returns ossep
    def targetpathfn(pat, dest, srcs):
        if os.path.isdir(pat):
            abspfx = pathutil.canonpath(repo.root, cwd, pat)
            abspfx = util.localpath(abspfx)
            if destdirexists:
                striplen = len(os.path.split(abspfx)[0])
            else:
                striplen = len(abspfx)
            if striplen:
                striplen += len(pycompat.ossep)
            res = lambda p: os.path.join(dest, util.localpath(p)[striplen:])
        elif destdirexists:
            res = lambda p: os.path.join(dest, os.path.basename(util.localpath(p)))
        else:
            res = lambda p: dest
        return res

    # pat: ossep
    # dest ossep
    # srcs: list of (hgsep, hgsep, ossep, bool)
    # return: function that takes hgsep and returns ossep
    def targetpathafterfn(pat, dest, srcs):
        if matchmod.patkind(pat):
            # a mercurial pattern
            res = lambda p: os.path.join(dest, os.path.basename(util.localpath(p)))
        else:
            abspfx = pathutil.canonpath(repo.root, cwd, pat)
            if len(abspfx) < len(srcs[0][0]):
                # A directory. Either the target path contains the last
                # component of the source path or it does not.
                def evalpath(striplen):
                    score = 0
                    for s in srcs:
                        t = os.path.join(dest, util.localpath(s[0])[striplen:])
                        if os.path.lexists(t):
                            score += 1
                    return score

                abspfx = util.localpath(abspfx)
                striplen = len(abspfx)
                if striplen:
                    striplen += len(pycompat.ossep)
                if os.path.isdir(os.path.join(dest, os.path.split(abspfx)[1])):
                    score = evalpath(striplen)
                    striplen1 = len(os.path.split(abspfx)[0])
                    if striplen1:
                        striplen1 += len(pycompat.ossep)
                    if evalpath(striplen1) > score:
                        striplen = striplen1
                res = lambda p: os.path.join(dest, util.localpath(p)[striplen:])
            else:
                # a file
                if destdirexists:
                    res = lambda p: os.path.join(
                        dest, os.path.basename(util.localpath(p))
                    )
                else:
                    res = lambda p: dest
        return res

    pats = scmutil.expandpats(pats)
    if not pats:
        raise error.Abort(_("no source or destination specified"))
    if len(pats) == 1:
        raise error.Abort(_("no destination specified"))
    dest = pats.pop()
    destdirexists = os.path.isdir(dest) and not os.path.islink(dest)
    if not destdirexists:
        if len(pats) > 1 or matchmod.patkind(pats[0]):
            raise error.Abort(
                _("with multiple sources, destination must be an " "existing directory")
            )
        if util.endswithsep(dest):
            raise error.Abort(_("destination %s is not a directory") % dest)

    tfn = targetpathfn
    if after:
        tfn = targetpathafterfn
    copylist = []
    for pat in pats:
        srcs = walkpat(pat)
        if not srcs:
            continue
        copylist.append((tfn(pat, dest, srcs), srcs))
    if not copylist:
        raise error.Abort(_("no files to copy"))

    errors = 0
    for targetpath, srcs in copylist:
        for abssrc, relsrc, exact in srcs:
            if copyfile(abssrc, relsrc, targetpath(abssrc), exact):
                errors += 1

    if errors:
        ui.warn(_("(consider using --after)\n"))

    return errors != 0