def _parsetest()

in pylib/mercurial-support/run-tests.py [0:0]


    def _parsetest(self, lines):
        # We generate a shell script which outputs unique markers to line
        # up script results with our source. These markers include input
        # line number and the last return code.
        salt = b"SALT%d" % time.time()

        def addsalt(line, inpython):
            if inpython:
                script.append(b'%s %d 0\n' % (salt, line))
            else:
                script.append(b'echo %s %d $?\n' % (salt, line))

        activetrace = []
        session = str(uuid.uuid4())
        if PYTHON3:
            session = session.encode('ascii')
        hgcatapult = os.getenv('HGTESTCATAPULTSERVERPIPE') or os.getenv(
            'HGCATAPULTSERVERPIPE'
        )

        def toggletrace(cmd=None):
            if not hgcatapult or hgcatapult == os.devnull:
                return

            if activetrace:
                script.append(
                    b'echo END %s %s >> "$HGTESTCATAPULTSERVERPIPE"\n'
                    % (session, activetrace[0])
                )
            if cmd is None:
                return

            if isinstance(cmd, str):
                quoted = shellquote(cmd.strip())
            else:
                quoted = shellquote(cmd.strip().decode('utf8')).encode('utf8')
            quoted = quoted.replace(b'\\', b'\\\\')
            script.append(
                b'echo START %s %s >> "$HGTESTCATAPULTSERVERPIPE"\n'
                % (session, quoted)
            )
            activetrace[0:] = [quoted]

        script = []

        # After we run the shell script, we re-unify the script output
        # with non-active parts of the source, with synchronization by our
        # SALT line number markers. The after table contains the non-active
        # components, ordered by line number.
        after = {}

        # Expected shell script output.
        expected = {}

        pos = prepos = -1

        # True or False when in a true or false conditional section
        skipping = None

        # We keep track of whether or not we're in a Python block so we
        # can generate the surrounding doctest magic.
        inpython = False

        if self._debug:
            script.append(b'set -x\n')
        if self._hgcommand != b'hg':
            script.append(b'alias hg="%s"\n' % self._hgcommand)
        if os.getenv('MSYSTEM'):
            script.append(b'alias pwd="pwd -W"\n')

        if hgcatapult and hgcatapult != os.devnull:
            if PYTHON3:
                hgcatapult = hgcatapult.encode('utf8')
                cataname = self.name.encode('utf8')
            else:
                cataname = self.name

            # Kludge: use a while loop to keep the pipe from getting
            # closed by our echo commands. The still-running file gets
            # reaped at the end of the script, which causes the while
            # loop to exit and closes the pipe. Sigh.
            script.append(
                b'rtendtracing() {\n'
                b'  echo END %(session)s %(name)s >> %(catapult)s\n'
                b'  rm -f "$TESTTMP/.still-running"\n'
                b'}\n'
                b'trap "rtendtracing" 0\n'
                b'touch "$TESTTMP/.still-running"\n'
                b'while [ -f "$TESTTMP/.still-running" ]; do sleep 1; done '
                b'> %(catapult)s &\n'
                b'HGCATAPULTSESSION=%(session)s ; export HGCATAPULTSESSION\n'
                b'echo START %(session)s %(name)s >> %(catapult)s\n'
                % {
                    b'name': cataname,
                    b'session': session,
                    b'catapult': hgcatapult,
                }
            )

        if self._case:
            casestr = b'#'.join(self._case)
            if isinstance(casestr, str):
                quoted = shellquote(casestr)
            else:
                quoted = shellquote(casestr.decode('utf8')).encode('utf8')
            script.append(b'TESTCASE=%s\n' % quoted)
            script.append(b'export TESTCASE\n')

        n = 0
        for n, l in enumerate(lines):
            if not l.endswith(b'\n'):
                l += b'\n'
            if l.startswith(b'#require'):
                lsplit = l.split()
                if len(lsplit) < 2 or lsplit[0] != b'#require':
                    after.setdefault(pos, []).append(
                        b'  !!! invalid #require\n'
                    )
                if not skipping:
                    haveresult, message = self._hghave(lsplit[1:])
                    if not haveresult:
                        script = [b'echo "%s"\nexit 80\n' % message]
                        break
                after.setdefault(pos, []).append(l)
            elif l.startswith(b'#if'):
                lsplit = l.split()
                if len(lsplit) < 2 or lsplit[0] != b'#if':
                    after.setdefault(pos, []).append(b'  !!! invalid #if\n')
                if skipping is not None:
                    after.setdefault(pos, []).append(b'  !!! nested #if\n')
                skipping = not self._iftest(lsplit[1:])
                after.setdefault(pos, []).append(l)
            elif l.startswith(b'#else'):
                if skipping is None:
                    after.setdefault(pos, []).append(b'  !!! missing #if\n')
                skipping = not skipping
                after.setdefault(pos, []).append(l)
            elif l.startswith(b'#endif'):
                if skipping is None:
                    after.setdefault(pos, []).append(b'  !!! missing #if\n')
                skipping = None
                after.setdefault(pos, []).append(l)
            elif skipping:
                after.setdefault(pos, []).append(l)
            elif l.startswith(b'  >>> '):  # python inlines
                after.setdefault(pos, []).append(l)
                prepos = pos
                pos = n
                if not inpython:
                    # We've just entered a Python block. Add the header.
                    inpython = True
                    addsalt(prepos, False)  # Make sure we report the exit code.
                    script.append(b'"%s" -m heredoctest <<EOF\n' % PYTHON)
                addsalt(n, True)
                script.append(l[2:])
            elif l.startswith(b'  ... '):  # python inlines
                after.setdefault(prepos, []).append(l)
                script.append(l[2:])
            elif l.startswith(b'  $ '):  # commands
                if inpython:
                    script.append(b'EOF\n')
                    inpython = False
                after.setdefault(pos, []).append(l)
                prepos = pos
                pos = n
                addsalt(n, False)
                rawcmd = l[4:]
                cmd = rawcmd.split()
                toggletrace(rawcmd)
                if len(cmd) == 2 and cmd[0] == b'cd':
                    rawcmd = b'cd %s || exit 1\n' % cmd[1]
                script.append(rawcmd)
            elif l.startswith(b'  > '):  # continuations
                after.setdefault(prepos, []).append(l)
                script.append(l[4:])
            elif l.startswith(b'  '):  # results
                # Queue up a list of expected results.
                expected.setdefault(pos, []).append(l[2:])
            else:
                if inpython:
                    script.append(b'EOF\n')
                    inpython = False
                # Non-command/result. Queue up for merged output.
                after.setdefault(pos, []).append(l)

        if inpython:
            script.append(b'EOF\n')
        if skipping is not None:
            after.setdefault(pos, []).append(b'  !!! missing #endif\n')
        addsalt(n + 1, False)
        # Need to end any current per-command trace
        if activetrace:
            toggletrace()
        return salt, script, after, expected