in pylib/mercurial-support/run-tests.py [0:0]
def _parsetest(self, lines):
# We generate a shell script which outputs unique markers to line
# up script results with our source. These markers include input
# line number and the last return code.
salt = b"SALT%d" % time.time()
def addsalt(line, inpython):
if inpython:
script.append(b'%s %d 0\n' % (salt, line))
else:
script.append(b'echo %s %d $?\n' % (salt, line))
activetrace = []
session = str(uuid.uuid4())
if PYTHON3:
session = session.encode('ascii')
hgcatapult = os.getenv('HGTESTCATAPULTSERVERPIPE') or os.getenv(
'HGCATAPULTSERVERPIPE'
)
def toggletrace(cmd=None):
if not hgcatapult or hgcatapult == os.devnull:
return
if activetrace:
script.append(
b'echo END %s %s >> "$HGTESTCATAPULTSERVERPIPE"\n'
% (session, activetrace[0])
)
if cmd is None:
return
if isinstance(cmd, str):
quoted = shellquote(cmd.strip())
else:
quoted = shellquote(cmd.strip().decode('utf8')).encode('utf8')
quoted = quoted.replace(b'\\', b'\\\\')
script.append(
b'echo START %s %s >> "$HGTESTCATAPULTSERVERPIPE"\n'
% (session, quoted)
)
activetrace[0:] = [quoted]
script = []
# After we run the shell script, we re-unify the script output
# with non-active parts of the source, with synchronization by our
# SALT line number markers. The after table contains the non-active
# components, ordered by line number.
after = {}
# Expected shell script output.
expected = {}
pos = prepos = -1
# True or False when in a true or false conditional section
skipping = None
# We keep track of whether or not we're in a Python block so we
# can generate the surrounding doctest magic.
inpython = False
if self._debug:
script.append(b'set -x\n')
if self._hgcommand != b'hg':
script.append(b'alias hg="%s"\n' % self._hgcommand)
if os.getenv('MSYSTEM'):
script.append(b'alias pwd="pwd -W"\n')
if hgcatapult and hgcatapult != os.devnull:
if PYTHON3:
hgcatapult = hgcatapult.encode('utf8')
cataname = self.name.encode('utf8')
else:
cataname = self.name
# Kludge: use a while loop to keep the pipe from getting
# closed by our echo commands. The still-running file gets
# reaped at the end of the script, which causes the while
# loop to exit and closes the pipe. Sigh.
script.append(
b'rtendtracing() {\n'
b' echo END %(session)s %(name)s >> %(catapult)s\n'
b' rm -f "$TESTTMP/.still-running"\n'
b'}\n'
b'trap "rtendtracing" 0\n'
b'touch "$TESTTMP/.still-running"\n'
b'while [ -f "$TESTTMP/.still-running" ]; do sleep 1; done '
b'> %(catapult)s &\n'
b'HGCATAPULTSESSION=%(session)s ; export HGCATAPULTSESSION\n'
b'echo START %(session)s %(name)s >> %(catapult)s\n'
% {
b'name': cataname,
b'session': session,
b'catapult': hgcatapult,
}
)
if self._case:
casestr = b'#'.join(self._case)
if isinstance(casestr, str):
quoted = shellquote(casestr)
else:
quoted = shellquote(casestr.decode('utf8')).encode('utf8')
script.append(b'TESTCASE=%s\n' % quoted)
script.append(b'export TESTCASE\n')
n = 0
for n, l in enumerate(lines):
if not l.endswith(b'\n'):
l += b'\n'
if l.startswith(b'#require'):
lsplit = l.split()
if len(lsplit) < 2 or lsplit[0] != b'#require':
after.setdefault(pos, []).append(
b' !!! invalid #require\n'
)
if not skipping:
haveresult, message = self._hghave(lsplit[1:])
if not haveresult:
script = [b'echo "%s"\nexit 80\n' % message]
break
after.setdefault(pos, []).append(l)
elif l.startswith(b'#if'):
lsplit = l.split()
if len(lsplit) < 2 or lsplit[0] != b'#if':
after.setdefault(pos, []).append(b' !!! invalid #if\n')
if skipping is not None:
after.setdefault(pos, []).append(b' !!! nested #if\n')
skipping = not self._iftest(lsplit[1:])
after.setdefault(pos, []).append(l)
elif l.startswith(b'#else'):
if skipping is None:
after.setdefault(pos, []).append(b' !!! missing #if\n')
skipping = not skipping
after.setdefault(pos, []).append(l)
elif l.startswith(b'#endif'):
if skipping is None:
after.setdefault(pos, []).append(b' !!! missing #if\n')
skipping = None
after.setdefault(pos, []).append(l)
elif skipping:
after.setdefault(pos, []).append(l)
elif l.startswith(b' >>> '): # python inlines
after.setdefault(pos, []).append(l)
prepos = pos
pos = n
if not inpython:
# We've just entered a Python block. Add the header.
inpython = True
addsalt(prepos, False) # Make sure we report the exit code.
script.append(b'"%s" -m heredoctest <<EOF\n' % PYTHON)
addsalt(n, True)
script.append(l[2:])
elif l.startswith(b' ... '): # python inlines
after.setdefault(prepos, []).append(l)
script.append(l[2:])
elif l.startswith(b' $ '): # commands
if inpython:
script.append(b'EOF\n')
inpython = False
after.setdefault(pos, []).append(l)
prepos = pos
pos = n
addsalt(n, False)
rawcmd = l[4:]
cmd = rawcmd.split()
toggletrace(rawcmd)
if len(cmd) == 2 and cmd[0] == b'cd':
rawcmd = b'cd %s || exit 1\n' % cmd[1]
script.append(rawcmd)
elif l.startswith(b' > '): # continuations
after.setdefault(prepos, []).append(l)
script.append(l[4:])
elif l.startswith(b' '): # results
# Queue up a list of expected results.
expected.setdefault(pos, []).append(l[2:])
else:
if inpython:
script.append(b'EOF\n')
inpython = False
# Non-command/result. Queue up for merged output.
after.setdefault(pos, []).append(l)
if inpython:
script.append(b'EOF\n')
if skipping is not None:
after.setdefault(pos, []).append(b' !!! missing #endif\n')
addsalt(n + 1, False)
# Need to end any current per-command trace
if activetrace:
toggletrace()
return salt, script, after, expected