def run()

in pylib/mercurial-support/run-tests.py [0:0]


    def run(self, result):
        # We have a number of filters that need to be applied. We do this
        # here instead of inside Test because it makes the running logic for
        # Test simpler.
        tests = []
        num_tests = [0]
        for test in self._tests:

            def get():
                num_tests[0] += 1
                if getattr(test, 'should_reload', False):
                    return self._loadtest(test, num_tests[0])
                return test

            if not os.path.exists(test.path):
                result.addSkip(test, "Doesn't exist")
                continue

            if not (self._whitelist and test.bname in self._whitelist):
                if self._blacklist and test.bname in self._blacklist:
                    result.addSkip(test, 'blacklisted')
                    continue

                if self._retest and not os.path.exists(test.errpath):
                    result.addIgnore(test, 'not retesting')
                    continue

                if self._keywords:
                    with open(test.path, 'rb') as f:
                        t = f.read().lower() + test.bname.lower()
                    ignored = False
                    for k in self._keywords.lower().split():
                        if k not in t:
                            result.addIgnore(test, "doesn't match keyword")
                            ignored = True
                            break

                    if ignored:
                        continue
            for _ in xrange(self._runs_per_test):
                tests.append(get())

        runtests = list(tests)
        done = queue.Queue()
        running = 0

        channels = [""] * self._jobs

        def job(test, result):
            for n, v in enumerate(channels):
                if not v:
                    channel = n
                    break
            else:
                raise ValueError('Could not find output channel')
            channels[channel] = "=" + test.name[5:].split(".")[0]
            try:
                test(result)
                done.put(None)
            except KeyboardInterrupt:
                pass
            except:  # re-raises
                done.put(('!', test, 'run-test raised an error, see traceback'))
                raise
            finally:
                try:
                    channels[channel] = ''
                except IndexError:
                    pass

        def stat():
            count = 0
            while channels:
                d = '\n%03s  ' % count
                for n, v in enumerate(channels):
                    if v:
                        d += v[0]
                        channels[n] = v[1:] or '.'
                    else:
                        d += ' '
                    d += ' '
                with iolock:
                    sys.stdout.write(d + '  ')
                    sys.stdout.flush()
                for x in xrange(10):
                    if channels:
                        time.sleep(0.1)
                count += 1

        stoppedearly = False

        if self._showchannels:
            statthread = threading.Thread(target=stat, name="stat")
            statthread.start()

        try:
            while tests or running:
                if not done.empty() or running == self._jobs or not tests:
                    try:
                        done.get(True, 1)
                        running -= 1
                        if result and result.shouldStop:
                            stoppedearly = True
                            break
                    except queue.Empty:
                        continue
                if tests and not running == self._jobs:
                    test = tests.pop(0)
                    if self._loop:
                        if getattr(test, 'should_reload', False):
                            num_tests[0] += 1
                            tests.append(self._loadtest(test, num_tests[0]))
                        else:
                            tests.append(test)
                    if self._jobs == 1:
                        job(test, result)
                    else:
                        t = threading.Thread(
                            target=job, name=test.name, args=(test, result)
                        )
                        t.start()
                    running += 1

            # If we stop early we still need to wait on started tests to
            # finish. Otherwise, there is a race between the test completing
            # and the test's cleanup code running. This could result in the
            # test reporting incorrect.
            if stoppedearly:
                while running:
                    try:
                        done.get(True, 1)
                        running -= 1
                    except queue.Empty:
                        continue
        except KeyboardInterrupt:
            for test in runtests:
                test.abort()

        channels = []

        return result