in eden/scm/edenscm/mercurial/cmdutil.py [0:0]
def copy(ui, repo, pats, opts, rename=False):
# called with the repo lock held
#
# hgsep => pathname that uses "/" to separate directories
# ossep => pathname that uses os.sep to separate directories
cwd = repo.getcwd()
targets = {}
after = opts.get("after")
dryrun = opts.get("dry_run")
wctx = repo[None]
def walkpat(pat):
srcs = []
if after:
badstates = "?"
else:
badstates = "?r"
m = scmutil.match(wctx, [pat], opts, globbed=True)
for abs in wctx.walk(m):
state = repo.dirstate[abs]
rel = m.rel(abs)
exact = m.exact(abs)
if state in badstates:
if exact and state == "?":
ui.warn(_("%s: not copying - file is not managed\n") % rel)
if exact and state == "r":
ui.warn(
_("%s: not copying - file has been marked for" " remove\n")
% rel
)
continue
# abs: hgsep
# rel: ossep
srcs.append((abs, rel, exact))
return srcs
# abssrc: hgsep
# relsrc: ossep
# otarget: ossep
def copyfile(abssrc, relsrc, otarget, exact):
abstarget = pathutil.canonpath(repo.root, cwd, otarget)
if "/" in abstarget:
# We cannot normalize abstarget itself, this would prevent
# case only renames, like a => A.
abspath, absname = abstarget.rsplit("/", 1)
abstarget = repo.dirstate.normalize(abspath) + "/" + absname
reltarget = repo.pathto(abstarget, cwd)
target = repo.wjoin(abstarget)
src = repo.wjoin(abssrc)
state = repo.dirstate[abstarget]
scmutil.checkportable(ui, abstarget)
# check for collisions
prevsrc = targets.get(abstarget)
if prevsrc is not None:
ui.warn(
_("%s: not overwriting - %s collides with %s\n")
% (reltarget, repo.pathto(abssrc, cwd), repo.pathto(prevsrc, cwd))
)
return
# check for overwrites
exists = os.path.lexists(target)
samefile = False
if exists and abssrc != abstarget:
if repo.dirstate.normalize(abssrc) == repo.dirstate.normalize(abstarget):
if not rename:
ui.warn(_("%s: can't copy - same file\n") % reltarget)
return
exists = False
samefile = True
if not after and exists or after and state in "mn":
if not opts["force"]:
if state in "mn":
msg = _("%s: not overwriting - file already committed\n")
if after:
flags = "--after --force"
else:
flags = "--force"
if rename:
hint = (
_(
"(hg rename %s to replace the file by "
"recording a rename)\n"
)
% flags
)
else:
hint = (
_(
"(hg copy %s to replace the file by "
"recording a copy)\n"
)
% flags
)
else:
msg = _("%s: not overwriting - file exists\n")
if rename:
hint = _("(hg rename --after to record the rename)\n")
else:
hint = _("(hg copy --after to record the copy)\n")
ui.warn(msg % reltarget)
ui.warn(hint)
return
if after:
if not exists:
if rename:
ui.warn(
_("%s: not recording move - %s does not exist\n")
% (relsrc, reltarget)
)
else:
ui.warn(
_("%s: not recording copy - %s does not exist\n")
% (relsrc, reltarget)
)
return
elif not dryrun:
try:
if exists:
os.unlink(target)
targetdir = os.path.dirname(target) or "."
if not os.path.isdir(targetdir):
os.makedirs(targetdir)
if samefile:
tmp = target + "~hgrename"
os.rename(src, tmp)
os.rename(tmp, target)
else:
util.copyfile(src, target)
srcexists = True
except IOError as inst:
if inst.errno == errno.ENOENT:
ui.warn(_("%s: deleted in working directory\n") % relsrc)
srcexists = False
else:
ui.warn(
_("%s: cannot copy - %s\n")
% (relsrc, encoding.strtolocal(inst.strerror))
)
return True # report a failure
if ui.verbose or not exact:
if rename:
ui.status(_("moving %s to %s\n") % (relsrc, reltarget))
else:
ui.status(_("copying %s to %s\n") % (relsrc, reltarget))
targets[abstarget] = abssrc
# fix up dirstate
scmutil.dirstatecopy(ui, repo, wctx, abssrc, abstarget, dryrun=dryrun, cwd=cwd)
if rename and not dryrun:
if not after and srcexists and not samefile:
repo.wvfs.unlinkpath(abssrc)
wctx.forget([abssrc])
# pat: ossep
# dest ossep
# srcs: list of (hgsep, hgsep, ossep, bool)
# return: function that takes hgsep and returns ossep
def targetpathfn(pat, dest, srcs):
if os.path.isdir(pat):
abspfx = pathutil.canonpath(repo.root, cwd, pat)
abspfx = util.localpath(abspfx)
if destdirexists:
striplen = len(os.path.split(abspfx)[0])
else:
striplen = len(abspfx)
if striplen:
striplen += len(pycompat.ossep)
res = lambda p: os.path.join(dest, util.localpath(p)[striplen:])
elif destdirexists:
res = lambda p: os.path.join(dest, os.path.basename(util.localpath(p)))
else:
res = lambda p: dest
return res
# pat: ossep
# dest ossep
# srcs: list of (hgsep, hgsep, ossep, bool)
# return: function that takes hgsep and returns ossep
def targetpathafterfn(pat, dest, srcs):
if matchmod.patkind(pat):
# a mercurial pattern
res = lambda p: os.path.join(dest, os.path.basename(util.localpath(p)))
else:
abspfx = pathutil.canonpath(repo.root, cwd, pat)
if len(abspfx) < len(srcs[0][0]):
# A directory. Either the target path contains the last
# component of the source path or it does not.
def evalpath(striplen):
score = 0
for s in srcs:
t = os.path.join(dest, util.localpath(s[0])[striplen:])
if os.path.lexists(t):
score += 1
return score
abspfx = util.localpath(abspfx)
striplen = len(abspfx)
if striplen:
striplen += len(pycompat.ossep)
if os.path.isdir(os.path.join(dest, os.path.split(abspfx)[1])):
score = evalpath(striplen)
striplen1 = len(os.path.split(abspfx)[0])
if striplen1:
striplen1 += len(pycompat.ossep)
if evalpath(striplen1) > score:
striplen = striplen1
res = lambda p: os.path.join(dest, util.localpath(p)[striplen:])
else:
# a file
if destdirexists:
res = lambda p: os.path.join(
dest, os.path.basename(util.localpath(p))
)
else:
res = lambda p: dest
return res
pats = scmutil.expandpats(pats)
if not pats:
raise error.Abort(_("no source or destination specified"))
if len(pats) == 1:
raise error.Abort(_("no destination specified"))
dest = pats.pop()
destdirexists = os.path.isdir(dest) and not os.path.islink(dest)
if not destdirexists:
if len(pats) > 1 or matchmod.patkind(pats[0]):
raise error.Abort(
_("with multiple sources, destination must be an " "existing directory")
)
if util.endswithsep(dest):
raise error.Abort(_("destination %s is not a directory") % dest)
tfn = targetpathfn
if after:
tfn = targetpathafterfn
copylist = []
for pat in pats:
srcs = walkpat(pat)
if not srcs:
continue
copylist.append((tfn(pat, dest, srcs), srcs))
if not copylist:
raise error.Abort(_("no files to copy"))
errors = 0
for targetpath, srcs in copylist:
for abssrc, relsrc, exact in srcs:
if copyfile(abssrc, relsrc, targetpath(abssrc), exact):
errors += 1
if errors:
ui.warn(_("(consider using --after)\n"))
return errors != 0