# HG changeset patch # User Pierre-Yves David # Date 1457966320 0 # Node ID 4907f2aed9240b7e101a24ef4a84dd5c758eda4d # Parent bd5c2922a8ad5db217f013b694654a94106b023c test: drop custom run-tests.py We should use the one provided by Mercurial core directly. diff -r bd5c2922a8ad -r 4907f2aed924 tests/run-tests.py --- a/tests/run-tests.py Mon Mar 14 20:11:47 2016 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,2212 +0,0 @@ -#!/usr/bin/env python -# -# run-tests.py - Run a set of tests on Mercurial -# -# Copyright 2006 Matt Mackall -# -# This software may be used and distributed according to the terms of the -# GNU General Public License version 2 or any later version. - -# Modifying this script is tricky because it has many modes: -# - serial (default) vs parallel (-jN, N > 1) -# - no coverage (default) vs coverage (-c, -C, -s) -# - temp install (default) vs specific hg script (--with-hg, --local) -# - tests are a mix of shell scripts and Python scripts -# -# If you change this script, it is recommended that you ensure you -# haven't broken it by running it in various modes with a representative -# sample of test scripts. For example: -# -# 1) serial, no coverage, temp install: -# ./run-tests.py test-s* -# 2) serial, no coverage, local hg: -# ./run-tests.py --local test-s* -# 3) serial, coverage, temp install: -# ./run-tests.py -c test-s* -# 4) serial, coverage, local hg: -# ./run-tests.py -c --local test-s* # unsupported -# 5) parallel, no coverage, temp install: -# ./run-tests.py -j2 test-s* -# 6) parallel, no coverage, local hg: -# ./run-tests.py -j2 --local test-s* -# 7) parallel, coverage, temp install: -# ./run-tests.py -j2 -c test-s* # currently broken -# 8) parallel, coverage, local install: -# ./run-tests.py -j2 -c --local test-s* # unsupported (and broken) -# 9) parallel, custom tmp dir: -# ./run-tests.py -j2 --tmpdir /tmp/myhgtests -# -# (You could use any subset of the tests: test-s* happens to match -# enough that it's worth doing parallel runs, few enough that it -# completes fairly quickly, includes both shell and Python scripts, and -# includes some scripts that run daemon processes.) - -from __future__ import print_function - -from distutils import version -import difflib -import errno -import optparse -import os -import shutil -import subprocess -import signal -import socket -import sys -import tempfile -import time -import random -import re -import threading -import killdaemons as killmod -try: - import Queue as queue -except ImportError: - import queue -from xml.dom import minidom -import unittest - -osenvironb = getattr(os, 'environb', os.environ) - -try: - import json -except ImportError: - try: - import simplejson as json - except ImportError: - json = None - -processlock = threading.Lock() - -if sys.version_info > (3, 5, 0): - PYTHON3 = True - xrange = range # we use xrange in one place, and we'd rather not use range - def _bytespath(p): - return p.encode('utf-8') - - def _strpath(p): - return p.decode('utf-8') - -elif sys.version_info >= (3, 0, 0): - print('%s is only supported on Python 3.5+ and 2.6-2.7, not %s' % - (sys.argv[0], '.'.join(str(v) for v in sys.version_info[:3]))) - sys.exit(70) # EX_SOFTWARE from `man 3 sysexit` -else: - PYTHON3 = False - - # In python 2.x, path operations are generally done using - # bytestrings by default, so we don't have to do any extra - # fiddling there. We define the wrapper functions anyway just to - # help keep code consistent between platforms. - def _bytespath(p): - return p - - _strpath = _bytespath - -# For Windows support -wifexited = getattr(os, "WIFEXITED", lambda x: False) - -def checkportisavailable(port): - """return true if a port seems free to bind on localhost""" - try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.bind(('localhost', port)) - s.close() - return True - except socket.error as exc: - if not exc.errno == errno.EADDRINUSE: - raise - return False - -closefds = os.name == 'posix' -def Popen4(cmd, wd, timeout, env=None): - processlock.acquire() - p = subprocess.Popen(cmd, shell=True, bufsize=-1, cwd=wd, env=env, - close_fds=closefds, - stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - processlock.release() - - p.fromchild = p.stdout - p.tochild = p.stdin - p.childerr = p.stderr - - p.timeout = False - if timeout: - def t(): - start = time.time() - while time.time() - start < timeout and p.returncode is None: - time.sleep(.1) - p.timeout = True - if p.returncode is None: - terminate(p) - threading.Thread(target=t).start() - - return p - -PYTHON = _bytespath(sys.executable.replace('\\', '/')) -IMPL_PATH = b'PYTHONPATH' -if 'java' in sys.platform: - IMPL_PATH = b'JYTHONPATH' - -defaults = { - 'jobs': ('HGTEST_JOBS', 1), - 'timeout': ('HGTEST_TIMEOUT', 180), - 'port': ('HGTEST_PORT', 20059), - 'shell': ('HGTEST_SHELL', 'sh'), -} - -def parselistfiles(files, listtype, warn=True): - entries = dict() - for filename in files: - try: - path = os.path.expanduser(os.path.expandvars(filename)) - f = open(path, "rb") - except IOError as err: - if err.errno != errno.ENOENT: - raise - if warn: - print("warning: no such %s file: %s" % (listtype, filename)) - continue - - for line in f.readlines(): - line = line.split(b'#', 1)[0].strip() - if line: - entries[line] = filename - - f.close() - return entries - -def getparser(): - """Obtain the OptionParser used by the CLI.""" - parser = optparse.OptionParser("%prog [options] [tests]") - - # keep these sorted - parser.add_option("--blacklist", action="append", - help="skip tests listed in the specified blacklist file") - parser.add_option("--whitelist", action="append", - help="always run tests listed in the specified whitelist file") - parser.add_option("--changed", type="string", - help="run tests that are changed in parent rev or working directory") - parser.add_option("-C", "--annotate", action="store_true", - help="output files annotated with coverage") - parser.add_option("-c", "--cover", action="store_true", - help="print a test coverage report") - parser.add_option("-d", "--debug", action="store_true", - help="debug mode: write output of test scripts to console" - " rather than capturing and diffing it (disables timeout)") - parser.add_option("-f", "--first", action="store_true", - help="exit on the first test failure") - parser.add_option("-H", "--htmlcov", action="store_true", - help="create an HTML report of the coverage of the files") - parser.add_option("-i", "--interactive", action="store_true", - help="prompt to accept changed output") - parser.add_option("-j", "--jobs", type="int", - help="number of jobs to run in parallel" - " (default: $%s or %d)" % defaults['jobs']) - parser.add_option("--keep-tmpdir", action="store_true", - help="keep temporary directory after running tests") - parser.add_option("-k", "--keywords", - help="run tests matching keywords") - parser.add_option("-l", "--local", action="store_true", - help="shortcut for --with-hg=/../hg") - parser.add_option("--loop", action="store_true", - help="loop tests repeatedly") - parser.add_option("--runs-per-test", type="int", dest="runs_per_test", - help="run each test N times (default=1)", default=1) - parser.add_option("-n", "--nodiff", action="store_true", - help="skip showing test changes") - parser.add_option("-p", "--port", type="int", - help="port on which servers should listen" - " (default: $%s or %d)" % defaults['port']) - parser.add_option("--compiler", type="string", - help="compiler to build with") - parser.add_option("--pure", action="store_true", - help="use pure Python code instead of C extensions") - parser.add_option("-R", "--restart", action="store_true", - help="restart at last error") - parser.add_option("-r", "--retest", action="store_true", - help="retest failed tests") - parser.add_option("-S", "--noskips", action="store_true", - help="don't report skip tests verbosely") - parser.add_option("--shell", type="string", - help="shell to use (default: $%s or %s)" % defaults['shell']) - parser.add_option("-t", "--timeout", type="int", - help="kill errant tests after TIMEOUT seconds" - " (default: $%s or %d)" % defaults['timeout']) - parser.add_option("--time", action="store_true", - help="time how long each test takes") - parser.add_option("--json", action="store_true", - help="store test result data in 'report.json' file") - parser.add_option("--tmpdir", type="string", - help="run tests in the given temporary directory" - " (implies --keep-tmpdir)") - parser.add_option("-v", "--verbose", action="store_true", - help="output verbose messages") - parser.add_option("--xunit", type="string", - help="record xunit results at specified path") - parser.add_option("--view", type="string", - help="external diff viewer") - parser.add_option("--with-hg", type="string", - metavar="HG", - help="test using specified hg script rather than a " - "temporary installation") - parser.add_option("-3", "--py3k-warnings", action="store_true", - help="enable Py3k warnings on Python 2.6+") - parser.add_option('--extra-config-opt', action="append", - help='set the given config opt in the test hgrc') - parser.add_option('--random', action="store_true", - help='run tests in random order') - parser.add_option('--profile-runner', action='store_true', - help='run statprof on run-tests') - - for option, (envvar, default) in defaults.items(): - defaults[option] = type(default)(os.environ.get(envvar, default)) - parser.set_defaults(**defaults) - - return parser - -def parseargs(args, parser): - """Parse arguments with our OptionParser and validate results.""" - (options, args) = parser.parse_args(args) - - # jython is always pure - if 'java' in sys.platform or '__pypy__' in sys.modules: - options.pure = True - - if options.with_hg: - options.with_hg = os.path.expanduser(options.with_hg) - if not (os.path.isfile(options.with_hg) and - os.access(options.with_hg, os.X_OK)): - parser.error('--with-hg must specify an executable hg script') - if not os.path.basename(options.with_hg) == 'hg': - sys.stderr.write('warning: --with-hg should specify an hg script\n') - if options.local: - testdir = os.path.dirname(_bytespath(os.path.realpath(sys.argv[0]))) - hgbin = os.path.join(os.path.dirname(testdir), b'hg') - if os.name != 'nt' and not os.access(hgbin, os.X_OK): - parser.error('--local specified, but %r not found or not executable' - % hgbin) - options.with_hg = hgbin - - options.anycoverage = options.cover or options.annotate or options.htmlcov - if options.anycoverage: - try: - import coverage - covver = version.StrictVersion(coverage.__version__).version - if covver < (3, 3): - parser.error('coverage options require coverage 3.3 or later') - except ImportError: - parser.error('coverage options now require the coverage package') - - if options.anycoverage and options.local: - # this needs some path mangling somewhere, I guess - parser.error("sorry, coverage options do not work when --local " - "is specified") - - if options.anycoverage and options.with_hg: - parser.error("sorry, coverage options do not work when --with-hg " - "is specified") - - global verbose - if options.verbose: - verbose = '' - - if options.tmpdir: - options.tmpdir = os.path.expanduser(options.tmpdir) - - if options.jobs < 1: - parser.error('--jobs must be positive') - if options.interactive and options.debug: - parser.error("-i/--interactive and -d/--debug are incompatible") - if options.debug: - if options.timeout != defaults['timeout']: - sys.stderr.write( - 'warning: --timeout option ignored with --debug\n') - options.timeout = 0 - if options.py3k_warnings: - if PYTHON3: - parser.error( - '--py3k-warnings can only be used on Python 2.6 and 2.7') - if options.blacklist: - options.blacklist = parselistfiles(options.blacklist, 'blacklist') - if options.whitelist: - options.whitelisted = parselistfiles(options.whitelist, 'whitelist') - else: - options.whitelisted = {} - - return (options, args) - -def rename(src, dst): - """Like os.rename(), trade atomicity and opened files friendliness - for existing destination support. - """ - shutil.copy(src, dst) - os.remove(src) - -_unified_diff = difflib.unified_diff -if PYTHON3: - import functools - _unified_diff = functools.partial(difflib.diff_bytes, difflib.unified_diff) - -def getdiff(expected, output, ref, err): - servefail = False - lines = [] - for line in _unified_diff(expected, output, ref, err): - if line.startswith(b'+++') or line.startswith(b'---'): - line = line.replace(b'\\', b'/') - if line.endswith(b' \n'): - line = line[:-2] + b'\n' - lines.append(line) - if not servefail and line.startswith( - b'+ abort: child process failed to start'): - servefail = True - - return servefail, lines - -verbose = False -def vlog(*msg): - """Log only when in verbose mode.""" - if verbose is False: - return - - return log(*msg) - -# Bytes that break XML even in a CDATA block: control characters 0-31 -# sans \t, \n and \r -CDATA_EVIL = re.compile(br"[\000-\010\013\014\016-\037]") - -def cdatasafe(data): - """Make a string safe to include in a CDATA block. - - Certain control characters are illegal in a CDATA block, and - there's no way to include a ]]> in a CDATA either. This function - replaces illegal bytes with ? and adds a space between the ]] so - that it won't break the CDATA block. - """ - return CDATA_EVIL.sub(b'?', data).replace(b']]>', b'] ]>') - -def log(*msg): - """Log something to stdout. - - Arguments are strings to print. - """ - with iolock: - if verbose: - print(verbose, end=' ') - for m in msg: - print(m, end=' ') - print() - sys.stdout.flush() - -def terminate(proc): - """Terminate subprocess (with fallback for Python versions < 2.6)""" - vlog('# Terminating process %d' % proc.pid) - try: - getattr(proc, 'terminate', lambda : os.kill(proc.pid, signal.SIGTERM))() - except OSError: - pass - -def killdaemons(pidfile): - return killmod.killdaemons(pidfile, tryhard=False, remove=True, - logfn=vlog) - -class Test(unittest.TestCase): - """Encapsulates a single, runnable test. - - While this class conforms to the unittest.TestCase API, it differs in that - instances need to be instantiated manually. (Typically, unittest.TestCase - classes are instantiated automatically by scanning modules.) - """ - - # Status code reserved for skipped tests (used by hghave). - SKIPPED_STATUS = 80 - - def __init__(self, path, tmpdir, keeptmpdir=False, - debug=False, - timeout=defaults['timeout'], - startport=defaults['port'], extraconfigopts=None, - py3kwarnings=False, shell=None): - """Create a test from parameters. - - path is the full path to the file defining the test. - - tmpdir is the main temporary directory to use for this test. - - keeptmpdir determines whether to keep the test's temporary directory - after execution. It defaults to removal (False). - - debug mode will make the test execute verbosely, with unfiltered - output. - - timeout controls the maximum run time of the test. It is ignored when - debug is True. - - startport controls the starting port number to use for this test. Each - test will reserve 3 port numbers for execution. It is the caller's - responsibility to allocate a non-overlapping port range to Test - instances. - - extraconfigopts is an iterable of extra hgrc config options. Values - must have the form "key=value" (something understood by hgrc). Values - of the form "foo.key=value" will result in "[foo] key=value". - - py3kwarnings enables Py3k warnings. - - shell is the shell to execute tests in. - """ - self.path = path - self.bname = os.path.basename(path) - self.name = _strpath(self.bname) - self._testdir = os.path.dirname(path) - self.errpath = os.path.join(self._testdir, b'%s.err' % self.bname) - - self._threadtmp = tmpdir - self._keeptmpdir = keeptmpdir - self._debug = debug - self._timeout = timeout - self._startport = startport - self._extraconfigopts = extraconfigopts or [] - self._py3kwarnings = py3kwarnings - self._shell = _bytespath(shell) - - self._aborted = False - self._daemonpids = [] - self._finished = None - self._ret = None - self._out = None - self._skipped = None - self._testtmp = None - - # If we're not in --debug mode and reference output file exists, - # check test output against it. - if debug: - self._refout = None # to match "out is None" - elif os.path.exists(self.refpath): - f = open(self.refpath, 'rb') - self._refout = f.read().splitlines(True) - f.close() - else: - self._refout = [] - - # needed to get base class __repr__ running - @property - def _testMethodName(self): - return self.name - - def __str__(self): - return self.name - - def shortDescription(self): - return self.name - - def setUp(self): - """Tasks to perform before run().""" - self._finished = False - self._ret = None - self._out = None - self._skipped = None - - try: - os.mkdir(self._threadtmp) - except OSError as e: - if e.errno != errno.EEXIST: - raise - - self._testtmp = os.path.join(self._threadtmp, - os.path.basename(self.path)) - os.mkdir(self._testtmp) - - # Remove any previous output files. - if os.path.exists(self.errpath): - try: - os.remove(self.errpath) - except OSError as e: - # We might have raced another test to clean up a .err - # file, so ignore ENOENT when removing a previous .err - # file. - if e.errno != errno.ENOENT: - raise - - def run(self, result): - """Run this test and report results against a TestResult instance.""" - # This function is extremely similar to unittest.TestCase.run(). Once - # we require Python 2.7 (or at least its version of unittest), this - # function can largely go away. - self._result = result - result.startTest(self) - try: - try: - self.setUp() - except (KeyboardInterrupt, SystemExit): - self._aborted = True - raise - except Exception: - result.addError(self, sys.exc_info()) - return - - success = False - try: - self.runTest() - except KeyboardInterrupt: - self._aborted = True - raise - except SkipTest as e: - result.addSkip(self, str(e)) - # The base class will have already counted this as a - # test we "ran", but we want to exclude skipped tests - # from those we count towards those run. - result.testsRun -= 1 - except IgnoreTest as e: - result.addIgnore(self, str(e)) - # As with skips, ignores also should be excluded from - # the number of tests executed. - result.testsRun -= 1 - except WarnTest as e: - result.addWarn(self, str(e)) - except self.failureException as e: - # This differs from unittest in that we don't capture - # the stack trace. This is for historical reasons and - # this decision could be revisited in the future, - # especially for PythonTest instances. - if result.addFailure(self, str(e)): - success = True - except Exception: - result.addError(self, sys.exc_info()) - else: - success = True - - try: - self.tearDown() - except (KeyboardInterrupt, SystemExit): - self._aborted = True - raise - except Exception: - result.addError(self, sys.exc_info()) - success = False - - if success: - result.addSuccess(self) - finally: - result.stopTest(self, interrupted=self._aborted) - - def runTest(self): - """Run this test instance. - - This will return a tuple describing the result of the test. - """ - env = self._getenv() - self._daemonpids.append(env['DAEMON_PIDS']) - self._createhgrc(env['HGRCPATH']) - - vlog('# Test', self.name) - - ret, out = self._run(env) - self._finished = True - self._ret = ret - self._out = out - - def describe(ret): - if ret < 0: - return 'killed by signal: %d' % -ret - return 'returned error code %d' % ret - - self._skipped = False - - if ret == self.SKIPPED_STATUS: - if out is None: # Debug mode, nothing to parse. - missing = ['unknown'] - failed = None - else: - missing, failed = TTest.parsehghaveoutput(out) - - if not missing: - missing = ['skipped'] - - if failed: - self.fail('hg have failed checking for %s' % failed[-1]) - else: - self._skipped = True - raise SkipTest(missing[-1]) - elif ret == 'timeout': - self.fail('timed out') - elif ret is False: - raise WarnTest('no result code from test') - elif out != self._refout: - # Diff generation may rely on written .err file. - if (ret != 0 or out != self._refout) and not self._skipped \ - and not self._debug: - f = open(self.errpath, 'wb') - for line in out: - f.write(line) - f.close() - - # The result object handles diff calculation for us. - if self._result.addOutputMismatch(self, ret, out, self._refout): - # change was accepted, skip failing - return - - if ret: - msg = 'output changed and ' + describe(ret) - else: - msg = 'output changed' - - self.fail(msg) - elif ret: - self.fail(describe(ret)) - - def tearDown(self): - """Tasks to perform after run().""" - for entry in self._daemonpids: - killdaemons(entry) - self._daemonpids = [] - - if not self._keeptmpdir: - shutil.rmtree(self._testtmp, True) - shutil.rmtree(self._threadtmp, True) - - if (self._ret != 0 or self._out != self._refout) and not self._skipped \ - and not self._debug and self._out: - f = open(self.errpath, 'wb') - for line in self._out: - f.write(line) - f.close() - - vlog("# Ret was:", self._ret, '(%s)' % self.name) - - def _run(self, env): - # This should be implemented in child classes to run tests. - raise SkipTest('unknown test type') - - def abort(self): - """Terminate execution of this test.""" - self._aborted = True - - def _getreplacements(self): - """Obtain a mapping of text replacements to apply to test output. - - Test output needs to be normalized so it can be compared to expected - output. This function defines how some of that normalization will - occur. - """ - r = [ - (br':%d\b' % self._startport, b':$HGPORT'), - (br':%d\b' % (self._startport + 1), b':$HGPORT1'), - (br':%d\b' % (self._startport + 2), b':$HGPORT2'), - (br'(?m)^(saved backup bundle to .*\.hg)( \(glob\))?$', - br'\1 (glob)'), - ] - - if os.name == 'nt': - r.append( - (b''.join(c.isalpha() and b'[%s%s]' % (c.lower(), c.upper()) or - c in b'/\\' and br'[/\\]' or c.isdigit() and c or b'\\' + c - for c in self._testtmp), b'$TESTTMP')) - else: - r.append((re.escape(self._testtmp), b'$TESTTMP')) - - return r - - def _getenv(self): - """Obtain environment variables to use during test execution.""" - env = os.environ.copy() - env['TESTTMP'] = self._testtmp - env['HOME'] = self._testtmp - env["HGPORT"] = str(self._startport) - env["HGPORT1"] = str(self._startport + 1) - env["HGPORT2"] = str(self._startport + 2) - env["HGRCPATH"] = os.path.join(self._threadtmp, b'.hgrc') - env["DAEMON_PIDS"] = os.path.join(self._threadtmp, b'daemon.pids') - env["HGEDITOR"] = ('"' + sys.executable + '"' - + ' -c "import sys; sys.exit(0)"') - env["HGMERGE"] = "internal:merge" - env["HGUSER"] = "test" - env["HGENCODING"] = "ascii" - env["HGENCODINGMODE"] = "strict" - - # Reset some environment variables to well-known values so that - # the tests produce repeatable output. - env['LANG'] = env['LC_ALL'] = env['LANGUAGE'] = 'C' - env['TZ'] = 'GMT' - env["EMAIL"] = "Foo Bar " - env['COLUMNS'] = '80' - env['TERM'] = 'xterm' - - for k in ('HG HGPROF CDPATH GREP_OPTIONS http_proxy no_proxy ' + - 'NO_PROXY').split(): - if k in env: - del env[k] - - # unset env related to hooks - for k in env.keys(): - if k.startswith('HG_'): - del env[k] - - return env - - def _createhgrc(self, path): - """Create an hgrc file for this test.""" - hgrc = open(path, 'wb') - hgrc.write(b'[ui]\n') - hgrc.write(b'slash = True\n') - hgrc.write(b'interactive = False\n') - hgrc.write(b'mergemarkers = detailed\n') - hgrc.write(b'promptecho = True\n') - hgrc.write(b'[defaults]\n') - hgrc.write(b'backout = -d "0 0"\n') - hgrc.write(b'commit = -d "0 0"\n') - hgrc.write(b'shelve = --date "0 0"\n') - hgrc.write(b'tag = -d "0 0"\n') - hgrc.write(b'[devel]\n') - hgrc.write(b'all = true\n') - hgrc.write(b'[largefiles]\n') - hgrc.write(b'usercache = %s\n' % - (os.path.join(self._testtmp, b'.cache/largefiles'))) - - for opt in self._extraconfigopts: - section, key = opt.split('.', 1) - assert '=' in key, ('extra config opt %s must ' - 'have an = for assignment' % opt) - hgrc.write(b'[%s]\n%s\n' % (section, key)) - hgrc.close() - - def fail(self, msg): - # unittest differentiates between errored and failed. - # Failed is denoted by AssertionError (by default at least). - raise AssertionError(msg) - - def _runcommand(self, cmd, env, normalizenewlines=False): - """Run command in a sub-process, capturing the output (stdout and - stderr). - - Return a tuple (exitcode, output). output is None in debug mode. - """ - if self._debug: - proc = subprocess.Popen(cmd, shell=True, cwd=self._testtmp, - env=env) - ret = proc.wait() - return (ret, None) - - proc = Popen4(cmd, self._testtmp, self._timeout, env) - def cleanup(): - terminate(proc) - ret = proc.wait() - if ret == 0: - ret = signal.SIGTERM << 8 - killdaemons(env['DAEMON_PIDS']) - return ret - - output = '' - proc.tochild.close() - - try: - output = proc.fromchild.read() - except KeyboardInterrupt: - vlog('# Handling keyboard interrupt') - cleanup() - raise - - ret = proc.wait() - if wifexited(ret): - ret = os.WEXITSTATUS(ret) - - if proc.timeout: - ret = 'timeout' - - if ret: - killdaemons(env['DAEMON_PIDS']) - - for s, r in self._getreplacements(): - output = re.sub(s, r, output) - - if normalizenewlines: - output = output.replace('\r\n', '\n') - - return ret, output.splitlines(True) - -class PythonTest(Test): - """A Python-based test.""" - - @property - def refpath(self): - return os.path.join(self._testdir, b'%s.out' % self.bname) - - def _run(self, env): - py3kswitch = self._py3kwarnings and b' -3' or b'' - cmd = b'%s%s "%s"' % (PYTHON, py3kswitch, self.path) - vlog("# Running", cmd) - normalizenewlines = os.name == 'nt' - result = self._runcommand(cmd, env, - normalizenewlines=normalizenewlines) - if self._aborted: - raise KeyboardInterrupt() - - return result - -# This script may want to drop globs from lines matching these patterns on -# Windows, but check-code.py wants a glob on these lines unconditionally. Don't -# warn if that is the case for anything matching these lines. -checkcodeglobpats = [ - re.compile(br'^pushing to \$TESTTMP/.*[^)]$'), - re.compile(br'^moving \S+/.*[^)]$'), - re.compile(br'^pulling from \$TESTTMP/.*[^)]$') -] - -bchr = chr -if PYTHON3: - bchr = lambda x: bytes([x]) - -class TTest(Test): - """A "t test" is a test backed by a .t file.""" - - SKIPPED_PREFIX = 'skipped: ' - FAILED_PREFIX = 'hghave check failed: ' - NEEDESCAPE = re.compile(br'[\x00-\x08\x0b-\x1f\x7f-\xff]').search - - ESCAPESUB = re.compile(br'[\x00-\x08\x0b-\x1f\\\x7f-\xff]').sub - ESCAPEMAP = dict((bchr(i), br'\x%02x' % i) for i in range(256)) - ESCAPEMAP.update({b'\\': b'\\\\', b'\r': br'\r'}) - - @property - def refpath(self): - return os.path.join(self._testdir, self.bname) - - def _run(self, env): - f = open(self.path, 'rb') - lines = f.readlines() - f.close() - - salt, script, after, expected = self._parsetest(lines) - - # Write out the generated script. - fname = b'%s.sh' % self._testtmp - f = open(fname, 'wb') - for l in script: - f.write(l) - f.close() - - cmd = b'%s "%s"' % (self._shell, fname) - vlog("# Running", cmd) - - exitcode, output = self._runcommand(cmd, env) - - if self._aborted: - raise KeyboardInterrupt() - - # Do not merge output if skipped. Return hghave message instead. - # Similarly, with --debug, output is None. - if exitcode == self.SKIPPED_STATUS or output is None: - return exitcode, output - - return self._processoutput(exitcode, output, salt, after, expected) - - def _hghave(self, reqs): - # TODO do something smarter when all other uses of hghave are gone. - tdir = self._testdir.replace(b'\\', b'/') - proc = Popen4(b'%s -c "%s/hghave %s"' % - (self._shell, tdir, b' '.join(reqs)), - self._testtmp, 0, self._getenv()) - stdout, stderr = proc.communicate() - ret = proc.wait() - if wifexited(ret): - ret = os.WEXITSTATUS(ret) - if ret == 2: - print(stdout) - sys.exit(1) - - return ret == 0 - - def _parsetest(self, lines): - # We generate a shell script which outputs unique markers to line - # up script results with our source. These markers include input - # line number and the last return code. - salt = b"SALT%d" % time.time() - def addsalt(line, inpython): - if inpython: - script.append(b'%s %d 0\n' % (salt, line)) - else: - script.append(b'echo %s %d $?\n' % (salt, line)) - - script = [] - - # After we run the shell script, we re-unify the script output - # with non-active parts of the source, with synchronization by our - # SALT line number markers. The after table contains the non-active - # components, ordered by line number. - after = {} - - # Expected shell script output. - expected = {} - - pos = prepos = -1 - - # True or False when in a true or false conditional section - skipping = None - - # We keep track of whether or not we're in a Python block so we - # can generate the surrounding doctest magic. - inpython = False - - if self._debug: - script.append(b'set -x\n') - if os.getenv('MSYSTEM'): - script.append(b'alias pwd="pwd -W"\n') - - for n, l in enumerate(lines): - if not l.endswith(b'\n'): - l += b'\n' - if l.startswith(b'#require'): - lsplit = l.split() - if len(lsplit) < 2 or lsplit[0] != b'#require': - after.setdefault(pos, []).append(' !!! invalid #require\n') - if not self._hghave(lsplit[1:]): - script = [b"exit 80\n"] - break - after.setdefault(pos, []).append(l) - elif l.startswith(b'#if'): - lsplit = l.split() - if len(lsplit) < 2 or lsplit[0] != b'#if': - after.setdefault(pos, []).append(' !!! invalid #if\n') - if skipping is not None: - after.setdefault(pos, []).append(' !!! nested #if\n') - skipping = not self._hghave(lsplit[1:]) - after.setdefault(pos, []).append(l) - elif l.startswith(b'#else'): - if skipping is None: - after.setdefault(pos, []).append(' !!! missing #if\n') - skipping = not skipping - after.setdefault(pos, []).append(l) - elif l.startswith(b'#endif'): - if skipping is None: - after.setdefault(pos, []).append(' !!! missing #if\n') - skipping = None - after.setdefault(pos, []).append(l) - elif skipping: - after.setdefault(pos, []).append(l) - elif l.startswith(b' >>> '): # python inlines - after.setdefault(pos, []).append(l) - prepos = pos - pos = n - if not inpython: - # We've just entered a Python block. Add the header. - inpython = True - addsalt(prepos, False) # Make sure we report the exit code. - script.append(b'%s -m heredoctest < '): # continuations - after.setdefault(prepos, []).append(l) - script.append(l[4:]) - elif l.startswith(b' '): # results - # Queue up a list of expected results. - expected.setdefault(pos, []).append(l[2:]) - else: - if inpython: - script.append(b'EOF\n') - inpython = False - # Non-command/result. Queue up for merged output. - after.setdefault(pos, []).append(l) - - if inpython: - script.append(b'EOF\n') - if skipping is not None: - after.setdefault(pos, []).append(' !!! missing #endif\n') - addsalt(n + 1, False) - - return salt, script, after, expected - - def _processoutput(self, exitcode, output, salt, after, expected): - # Merge the script output back into a unified test. - warnonly = 1 # 1: not yet; 2: yes; 3: for sure not - if exitcode != 0: - warnonly = 3 - - pos = -1 - postout = [] - for l in output: - lout, lcmd = l, None - if salt in l: - lout, lcmd = l.split(salt, 1) - - if lout: - if not lout.endswith(b'\n'): - lout += b' (no-eol)\n' - - # Find the expected output at the current position. - el = None - if expected.get(pos, None): - el = expected[pos].pop(0) - - r = TTest.linematch(el, lout) - if isinstance(r, str): - if r == '+glob': - lout = el[:-1] + ' (glob)\n' - r = '' # Warn only this line. - elif r == '-glob': - lout = ''.join(el.rsplit(' (glob)', 1)) - r = '' # Warn only this line. - else: - log('\ninfo, unknown linematch result: %r\n' % r) - r = False - if r: - postout.append(b' ' + el) - else: - if self.NEEDESCAPE(lout): - lout = TTest._stringescape(b'%s (esc)\n' % - lout.rstrip(b'\n')) - postout.append(b' ' + lout) # Let diff deal with it. - if r != '': # If line failed. - warnonly = 3 # for sure not - elif warnonly == 1: # Is "not yet" and line is warn only. - warnonly = 2 # Yes do warn. - - if lcmd: - # Add on last return code. - ret = int(lcmd.split()[1]) - if ret != 0: - postout.append(b' [%d]\n' % ret) - if pos in after: - # Merge in non-active test bits. - postout += after.pop(pos) - pos = int(lcmd.split()[0]) - - if pos in after: - postout += after.pop(pos) - - if warnonly == 2: - exitcode = False # Set exitcode to warned. - - return exitcode, postout - - @staticmethod - def rematch(el, l): - try: - # use \Z to ensure that the regex matches to the end of the string - if os.name == 'nt': - return re.match(el + br'\r?\n\Z', l) - return re.match(el + br'\n\Z', l) - except re.error: - # el is an invalid regex - return False - - @staticmethod - def globmatch(el, l): - # The only supported special characters are * and ? plus / which also - # matches \ on windows. Escaping of these characters is supported. - if el + b'\n' == l: - if os.altsep: - # matching on "/" is not needed for this line - for pat in checkcodeglobpats: - if pat.match(el): - return True - return b'-glob' - return True - i, n = 0, len(el) - res = b'' - while i < n: - c = el[i:i + 1] - i += 1 - if c == b'\\' and i < n and el[i:i + 1] in b'*?\\/': - res += el[i - 1:i + 1] - i += 1 - elif c == b'*': - res += b'.*' - elif c == b'?': - res += b'.' - elif c == b'/' and os.altsep: - res += b'[/\\\\]' - else: - res += re.escape(c) - return TTest.rematch(res, l) - - @staticmethod - def linematch(el, l): - if el == l: # perfect match (fast) - return True - if el: - if el.endswith(b" (esc)\n"): - if PYTHON3: - el = el[:-7].decode('unicode_escape') + '\n' - el = el.encode('utf-8') - else: - el = el[:-7].decode('string-escape') + '\n' - if el == l or os.name == 'nt' and el[:-1] + b'\r\n' == l: - return True - if el.endswith(b" (re)\n"): - return TTest.rematch(el[:-6], l) - if el.endswith(b" (glob)\n"): - # ignore '(glob)' added to l by 'replacements' - if l.endswith(b" (glob)\n"): - l = l[:-8] + b"\n" - return TTest.globmatch(el[:-8], l) - if os.altsep and l.replace(b'\\', b'/') == el: - return b'+glob' - return False - - @staticmethod - def parsehghaveoutput(lines): - '''Parse hghave log lines. - - Return tuple of lists (missing, failed): - * the missing/unknown features - * the features for which existence check failed''' - missing = [] - failed = [] - for line in lines: - if line.startswith(TTest.SKIPPED_PREFIX): - line = line.splitlines()[0] - missing.append(line[len(TTest.SKIPPED_PREFIX):]) - elif line.startswith(TTest.FAILED_PREFIX): - line = line.splitlines()[0] - failed.append(line[len(TTest.FAILED_PREFIX):]) - - return missing, failed - - @staticmethod - def _escapef(m): - return TTest.ESCAPEMAP[m.group(0)] - - @staticmethod - def _stringescape(s): - return TTest.ESCAPESUB(TTest._escapef, s) - -iolock = threading.RLock() - -class SkipTest(Exception): - """Raised to indicate that a test is to be skipped.""" - -class IgnoreTest(Exception): - """Raised to indicate that a test is to be ignored.""" - -class WarnTest(Exception): - """Raised to indicate that a test warned.""" - -class TestResult(unittest._TextTestResult): - """Holds results when executing via unittest.""" - # Don't worry too much about accessing the non-public _TextTestResult. - # It is relatively common in Python testing tools. - def __init__(self, options, *args, **kwargs): - super(TestResult, self).__init__(*args, **kwargs) - - self._options = options - - # unittest.TestResult didn't have skipped until 2.7. We need to - # polyfill it. - self.skipped = [] - - # We have a custom "ignored" result that isn't present in any Python - # unittest implementation. It is very similar to skipped. It may make - # sense to map it into skip some day. - self.ignored = [] - - # We have a custom "warned" result that isn't present in any Python - # unittest implementation. It is very similar to failed. It may make - # sense to map it into fail some day. - self.warned = [] - - self.times = [] - self._firststarttime = None - # Data stored for the benefit of generating xunit reports. - self.successes = [] - self.faildata = {} - - def addFailure(self, test, reason): - self.failures.append((test, reason)) - - if self._options.first: - self.stop() - else: - with iolock: - if not self._options.nodiff: - self.stream.write('\nERROR: %s output changed\n' % test) - - self.stream.write('!') - self.stream.flush() - - def addSuccess(self, test): - with iolock: - super(TestResult, self).addSuccess(test) - self.successes.append(test) - - def addError(self, test, err): - super(TestResult, self).addError(test, err) - if self._options.first: - self.stop() - - # Polyfill. - def addSkip(self, test, reason): - self.skipped.append((test, reason)) - with iolock: - if self.showAll: - self.stream.writeln('skipped %s' % reason) - else: - self.stream.write('s') - self.stream.flush() - - def addIgnore(self, test, reason): - self.ignored.append((test, reason)) - with iolock: - if self.showAll: - self.stream.writeln('ignored %s' % reason) - else: - if reason not in ('not retesting', "doesn't match keyword"): - self.stream.write('i') - else: - self.testsRun += 1 - self.stream.flush() - - def addWarn(self, test, reason): - self.warned.append((test, reason)) - - if self._options.first: - self.stop() - - with iolock: - if self.showAll: - self.stream.writeln('warned %s' % reason) - else: - self.stream.write('~') - self.stream.flush() - - def addOutputMismatch(self, test, ret, got, expected): - """Record a mismatch in test output for a particular test.""" - if self.shouldStop: - # don't print, some other test case already failed and - # printed, we're just stale and probably failed due to our - # temp dir getting cleaned up. - return - - accepted = False - failed = False - lines = [] - - with iolock: - if self._options.nodiff: - pass - elif self._options.view: - v = self._options.view - if PYTHON3: - v = _bytespath(v) - os.system(b"%s %s %s" % - (v, test.refpath, test.errpath)) - else: - servefail, lines = getdiff(expected, got, - test.refpath, test.errpath) - if servefail: - self.addFailure( - test, - 'server failed to start (HGPORT=%s)' % test._startport) - else: - self.stream.write('\n') - for line in lines: - if PYTHON3: - self.stream.flush() - self.stream.buffer.write(line) - self.stream.buffer.flush() - else: - self.stream.write(line) - self.stream.flush() - - # handle interactive prompt without releasing iolock - if self._options.interactive: - self.stream.write('Accept this change? [n] ') - answer = sys.stdin.readline().strip() - if answer.lower() in ('y', 'yes'): - if test.name.endswith('.t'): - rename(test.errpath, test.path) - else: - rename(test.errpath, '%s.out' % test.path) - accepted = True - if not accepted and not failed: - self.faildata[test.name] = b''.join(lines) - - return accepted - - def startTest(self, test): - super(TestResult, self).startTest(test) - - # os.times module computes the user time and system time spent by - # child's processes along with real elapsed time taken by a process. - # This module has one limitation. It can only work for Linux user - # and not for Windows. - test.started = os.times() - if self._firststarttime is None: # thread racy but irrelevant - self._firststarttime = test.started[4] - - def stopTest(self, test, interrupted=False): - super(TestResult, self).stopTest(test) - - test.stopped = os.times() - - starttime = test.started - endtime = test.stopped - origin = self._firststarttime - self.times.append((test.name, - endtime[2] - starttime[2], # user space CPU time - endtime[3] - starttime[3], # sys space CPU time - endtime[4] - starttime[4], # real time - starttime[4] - origin, # start date in run context - endtime[4] - origin, # end date in run context - )) - - if interrupted: - with iolock: - self.stream.writeln('INTERRUPTED: %s (after %d seconds)' % ( - test.name, self.times[-1][3])) - -class TestSuite(unittest.TestSuite): - """Custom unittest TestSuite that knows how to execute Mercurial tests.""" - - def __init__(self, testdir, jobs=1, whitelist=None, blacklist=None, - retest=False, keywords=None, loop=False, runs_per_test=1, - loadtest=None, - *args, **kwargs): - """Create a new instance that can run tests with a configuration. - - testdir specifies the directory where tests are executed from. This - is typically the ``tests`` directory from Mercurial's source - repository. - - jobs specifies the number of jobs to run concurrently. Each test - executes on its own thread. Tests actually spawn new processes, so - state mutation should not be an issue. - - whitelist and blacklist denote tests that have been whitelisted and - blacklisted, respectively. These arguments don't belong in TestSuite. - Instead, whitelist and blacklist should be handled by the thing that - populates the TestSuite with tests. They are present to preserve - backwards compatible behavior which reports skipped tests as part - of the results. - - retest denotes whether to retest failed tests. This arguably belongs - outside of TestSuite. - - keywords denotes key words that will be used to filter which tests - to execute. This arguably belongs outside of TestSuite. - - loop denotes whether to loop over tests forever. - """ - super(TestSuite, self).__init__(*args, **kwargs) - - self._jobs = jobs - self._whitelist = whitelist - self._blacklist = blacklist - self._retest = retest - self._keywords = keywords - self._loop = loop - self._runs_per_test = runs_per_test - self._loadtest = loadtest - - def run(self, result): - # We have a number of filters that need to be applied. We do this - # here instead of inside Test because it makes the running logic for - # Test simpler. - tests = [] - num_tests = [0] - for test in self._tests: - def get(): - num_tests[0] += 1 - if getattr(test, 'should_reload', False): - return self._loadtest(test.bname, num_tests[0]) - return test - if not os.path.exists(test.path): - result.addSkip(test, "Doesn't exist") - continue - - if not (self._whitelist and test.name in self._whitelist): - if self._blacklist and test.bname in self._blacklist: - result.addSkip(test, 'blacklisted') - continue - - if self._retest and not os.path.exists(test.errpath): - result.addIgnore(test, 'not retesting') - continue - - if self._keywords: - f = open(test.path, 'rb') - t = f.read().lower() + test.bname.lower() - f.close() - ignored = False - for k in self._keywords.lower().split(): - if k not in t: - result.addIgnore(test, "doesn't match keyword") - ignored = True - break - - if ignored: - continue - for _ in xrange(self._runs_per_test): - tests.append(get()) - - runtests = list(tests) - done = queue.Queue() - running = 0 - - def job(test, result): - try: - test(result) - done.put(None) - except KeyboardInterrupt: - pass - except: # re-raises - done.put(('!', test, 'run-test raised an error, see traceback')) - raise - - stoppedearly = False - - try: - while tests or running: - if not done.empty() or running == self._jobs or not tests: - try: - done.get(True, 1) - running -= 1 - if result and result.shouldStop: - stoppedearly = True - break - except queue.Empty: - continue - if tests and not running == self._jobs: - test = tests.pop(0) - if self._loop: - if getattr(test, 'should_reload', False): - num_tests[0] += 1 - tests.append( - self._loadtest(test.name, num_tests[0])) - else: - tests.append(test) - t = threading.Thread(target=job, name=test.name, - args=(test, result)) - t.start() - running += 1 - - # If we stop early we still need to wait on started tests to - # finish. Otherwise, there is a race between the test completing - # and the test's cleanup code running. This could result in the - # test reporting incorrect. - if stoppedearly: - while running: - try: - done.get(True, 1) - running -= 1 - except queue.Empty: - continue - except KeyboardInterrupt: - for test in runtests: - test.abort() - - return result - -class TextTestRunner(unittest.TextTestRunner): - """Custom unittest test runner that uses appropriate settings.""" - - def __init__(self, runner, *args, **kwargs): - super(TextTestRunner, self).__init__(*args, **kwargs) - - self._runner = runner - - def run(self, test): - result = TestResult(self._runner.options, self.stream, - self.descriptions, self.verbosity) - - test(result) - - failed = len(result.failures) - warned = len(result.warned) - skipped = len(result.skipped) - ignored = len(result.ignored) - - with iolock: - self.stream.writeln('') - - if not self._runner.options.noskips: - for test, msg in result.skipped: - self.stream.writeln('Skipped %s: %s' % (test.name, msg)) - for test, msg in result.warned: - self.stream.writeln('Warned %s: %s' % (test.name, msg)) - for test, msg in result.failures: - self.stream.writeln('Failed %s: %s' % (test.name, msg)) - for test, msg in result.errors: - self.stream.writeln('Errored %s: %s' % (test.name, msg)) - - if self._runner.options.xunit: - xuf = open(self._runner.options.xunit, 'wb') - try: - timesd = dict((t[0], t[3]) for t in result.times) - doc = minidom.Document() - s = doc.createElement('testsuite') - s.setAttribute('name', 'run-tests') - s.setAttribute('tests', str(result.testsRun)) - s.setAttribute('errors', "0") # TODO - s.setAttribute('failures', str(failed)) - s.setAttribute('skipped', str(skipped + ignored)) - doc.appendChild(s) - for tc in result.successes: - t = doc.createElement('testcase') - t.setAttribute('name', tc.name) - t.setAttribute('time', '%.3f' % timesd[tc.name]) - s.appendChild(t) - for tc, err in sorted(result.faildata.items()): - t = doc.createElement('testcase') - t.setAttribute('name', tc) - t.setAttribute('time', '%.3f' % timesd[tc]) - # createCDATASection expects a unicode or it will - # convert using default conversion rules, which will - # fail if string isn't ASCII. - err = cdatasafe(err).decode('utf-8', 'replace') - cd = doc.createCDATASection(err) - t.appendChild(cd) - s.appendChild(t) - xuf.write(doc.toprettyxml(indent=' ', encoding='utf-8')) - finally: - xuf.close() - - if self._runner.options.json: - if json is None: - raise ImportError("json module not installed") - jsonpath = os.path.join(self._runner._testdir, 'report.json') - fp = open(jsonpath, 'w') - try: - timesd = {} - for tdata in result.times: - test = tdata[0] - timesd[test] = tdata[1:] - - outcome = {} - groups = [('success', ((tc, None) - for tc in result.successes)), - ('failure', result.failures), - ('skip', result.skipped)] - for res, testcases in groups: - for tc, __ in testcases: - tres = {'result': res, - 'time': ('%0.3f' % timesd[tc.name][2]), - 'cuser': ('%0.3f' % timesd[tc.name][0]), - 'csys': ('%0.3f' % timesd[tc.name][1]), - 'start': ('%0.3f' % timesd[tc.name][3]), - 'end': ('%0.3f' % timesd[tc.name][4])} - outcome[tc.name] = tres - jsonout = json.dumps(outcome, sort_keys=True, indent=4) - fp.writelines(("testreport =", jsonout)) - finally: - fp.close() - - self._runner._checkhglib('Tested') - - self.stream.writeln( - '# Ran %d tests, %d skipped, %d warned, %d failed.' - % (result.testsRun, - skipped + ignored, warned, failed)) - if failed: - self.stream.writeln('python hash seed: %s' % - os.environ['PYTHONHASHSEED']) - if self._runner.options.time: - self.printtimes(result.times) - - return result - - def printtimes(self, times): - # iolock held by run - self.stream.writeln('# Producing time report') - times.sort(key=lambda t: (t[3])) - cols = '%7.3f %7.3f %7.3f %7.3f %7.3f %s' - self.stream.writeln('%-7s %-7s %-7s %-7s %-7s %s' % - ('start', 'end', 'cuser', 'csys', 'real', 'Test')) - for tdata in times: - test = tdata[0] - cuser, csys, real, start, end = tdata[1:6] - self.stream.writeln(cols % (start, end, cuser, csys, real, test)) - -class TestRunner(object): - """Holds context for executing tests. - - Tests rely on a lot of state. This object holds it for them. - """ - - # Programs required to run tests. - REQUIREDTOOLS = [ - os.path.basename(_bytespath(sys.executable)), - b'diff', - b'grep', - b'unzip', - b'gunzip', - b'bunzip2', - b'sed', - ] - - # Maps file extensions to test class. - TESTTYPES = [ - (b'.py', PythonTest), - (b'.t', TTest), - ] - - def __init__(self): - self.options = None - self._hgroot = None - self._testdir = None - self._hgtmp = None - self._installdir = None - self._bindir = None - self._tmpbinddir = None - self._pythondir = None - self._coveragefile = None - self._createdfiles = [] - self._hgpath = None - self._portoffset = 0 - self._ports = {} - - def run(self, args, parser=None): - """Run the test suite.""" - oldmask = os.umask(0o22) - try: - parser = parser or getparser() - options, args = parseargs(args, parser) - # positional arguments are paths to test files to run, so - # we make sure they're all bytestrings - args = [_bytespath(a) for a in args] - self.options = options - - self._checktools() - tests = self.findtests(args) - if options.profile_runner: - import statprof - statprof.start() - result = self._run(tests) - if options.profile_runner: - statprof.stop() - statprof.display() - return result - - finally: - os.umask(oldmask) - - def _run(self, tests): - if self.options.random: - random.shuffle(tests) - else: - # keywords for slow tests - slow = {b'svn': 10, - b'gendoc': 10, - b'check-code-hg': 100, - } - def sortkey(f): - # run largest tests first, as they tend to take the longest - try: - val = -os.stat(f).st_size - except OSError as e: - if e.errno != errno.ENOENT: - raise - return -1e9 # file does not exist, tell early - for kw, mul in slow.iteritems(): - if kw in f: - val *= mul - return val - tests.sort(key=sortkey) - - self._testdir = osenvironb[b'TESTDIR'] = getattr( - os, 'getcwdb', os.getcwd)() - - if 'PYTHONHASHSEED' not in os.environ: - # use a random python hash seed all the time - # we do the randomness ourself to know what seed is used - os.environ['PYTHONHASHSEED'] = str(random.getrandbits(32)) - - if self.options.tmpdir: - self.options.keep_tmpdir = True - tmpdir = _bytespath(self.options.tmpdir) - if os.path.exists(tmpdir): - # Meaning of tmpdir has changed since 1.3: we used to create - # HGTMP inside tmpdir; now HGTMP is tmpdir. So fail if - # tmpdir already exists. - print("error: temp dir %r already exists" % tmpdir) - return 1 - - # Automatically removing tmpdir sounds convenient, but could - # really annoy anyone in the habit of using "--tmpdir=/tmp" - # or "--tmpdir=$HOME". - #vlog("# Removing temp dir", tmpdir) - #shutil.rmtree(tmpdir) - os.makedirs(tmpdir) - else: - d = None - if os.name == 'nt': - # without this, we get the default temp dir location, but - # in all lowercase, which causes troubles with paths (issue3490) - d = osenvironb.get(b'TMP', None) - # FILE BUG: mkdtemp works only on unicode in Python 3 - tmpdir = tempfile.mkdtemp('', 'hgtests.', d and _strpath(d)) - tmpdir = _bytespath(tmpdir) - - self._hgtmp = osenvironb[b'HGTMP'] = ( - os.path.realpath(tmpdir)) - - if self.options.with_hg: - self._installdir = None - whg = self.options.with_hg - # If --with-hg is not specified, we have bytes already, - # but if it was specified in python3 we get a str, so we - # have to encode it back into a bytes. - if PYTHON3: - if not isinstance(whg, bytes): - whg = _bytespath(whg) - self._bindir = os.path.dirname(os.path.realpath(whg)) - assert isinstance(self._bindir, bytes) - self._tmpbindir = os.path.join(self._hgtmp, b'install', b'bin') - os.makedirs(self._tmpbindir) - - # This looks redundant with how Python initializes sys.path from - # the location of the script being executed. Needed because the - # "hg" specified by --with-hg is not the only Python script - # executed in the test suite that needs to import 'mercurial' - # ... which means it's not really redundant at all. - self._pythondir = self._bindir - else: - self._installdir = os.path.join(self._hgtmp, b"install") - self._bindir = osenvironb[b"BINDIR"] = \ - os.path.join(self._installdir, b"bin") - self._tmpbindir = self._bindir - self._pythondir = os.path.join(self._installdir, b"lib", b"python") - - osenvironb[b"BINDIR"] = self._bindir - osenvironb[b"PYTHON"] = PYTHON - - fileb = _bytespath(__file__) - runtestdir = os.path.abspath(os.path.dirname(fileb)) - if PYTHON3: - sepb = _bytespath(os.pathsep) - else: - sepb = os.pathsep - path = [self._bindir, runtestdir] + osenvironb[b"PATH"].split(sepb) - if os.path.islink(__file__): - # test helper will likely be at the end of the symlink - realfile = os.path.realpath(fileb) - realdir = os.path.abspath(os.path.dirname(realfile)) - path.insert(2, realdir) - if self._tmpbindir != self._bindir: - path = [self._tmpbindir] + path - osenvironb[b"PATH"] = sepb.join(path) - - # Include TESTDIR in PYTHONPATH so that out-of-tree extensions - # can run .../tests/run-tests.py test-foo where test-foo - # adds an extension to HGRC. Also include run-test.py directory to - # import modules like heredoctest. - pypath = [self._pythondir, self._testdir, runtestdir] - # We have to augment PYTHONPATH, rather than simply replacing - # it, in case external libraries are only available via current - # PYTHONPATH. (In particular, the Subversion bindings on OS X - # are in /opt/subversion.) - oldpypath = osenvironb.get(IMPL_PATH) - if oldpypath: - pypath.append(oldpypath) - osenvironb[IMPL_PATH] = sepb.join(pypath) - - if self.options.pure: - os.environ["HGTEST_RUN_TESTS_PURE"] = "--pure" - - self._coveragefile = os.path.join(self._testdir, b'.coverage') - - vlog("# Using TESTDIR", self._testdir) - vlog("# Using HGTMP", self._hgtmp) - vlog("# Using PATH", os.environ["PATH"]) - vlog("# Using", IMPL_PATH, osenvironb[IMPL_PATH]) - - try: - return self._runtests(tests) or 0 - finally: - time.sleep(.1) - self._cleanup() - - def findtests(self, args): - """Finds possible test files from arguments. - - If you wish to inject custom tests into the test harness, this would - be a good function to monkeypatch or override in a derived class. - """ - if not args: - if self.options.changed: - proc = Popen4('hg st --rev "%s" -man0 .' % - self.options.changed, None, 0) - stdout, stderr = proc.communicate() - args = stdout.strip(b'\0').split(b'\0') - else: - args = os.listdir(b'.') - - return [t for t in args - if os.path.basename(t).startswith(b'test-') - and (t.endswith(b'.py') or t.endswith(b'.t'))] - - def _runtests(self, tests): - try: - if self._installdir: - self._installhg() - self._checkhglib("Testing") - else: - self._usecorrectpython() - - if self.options.restart: - orig = list(tests) - while tests: - if os.path.exists(tests[0] + ".err"): - break - tests.pop(0) - if not tests: - print("running all tests") - tests = orig - - tests = [self._gettest(t, i) for i, t in enumerate(tests)] - - failed = False - warned = False - kws = self.options.keywords - if kws is not None and PYTHON3: - kws = kws.encode('utf-8') - - suite = TestSuite(self._testdir, - jobs=self.options.jobs, - whitelist=self.options.whitelisted, - blacklist=self.options.blacklist, - retest=self.options.retest, - keywords=kws, - loop=self.options.loop, - runs_per_test=self.options.runs_per_test, - tests=tests, loadtest=self._gettest) - verbosity = 1 - if self.options.verbose: - verbosity = 2 - runner = TextTestRunner(self, verbosity=verbosity) - result = runner.run(suite) - - if result.failures: - failed = True - if result.warned: - warned = True - - if self.options.anycoverage: - self._outputcoverage() - except KeyboardInterrupt: - failed = True - print("\ninterrupted!") - - if failed: - return 1 - if warned: - return 80 - - def _getport(self, count): - port = self._ports.get(count) # do we have a cached entry? - if port is None: - port = self.options.port + self._portoffset - portneeded = 3 - # above 100 tries we just give up and let test reports failure - for tries in xrange(100): - allfree = True - for idx in xrange(portneeded): - if not checkportisavailable(port + idx): - allfree = False - break - self._portoffset += portneeded - if allfree: - break - self._ports[count] = port - return port - - def _gettest(self, test, count): - """Obtain a Test by looking at its filename. - - Returns a Test instance. The Test may not be runnable if it doesn't - map to a known type. - """ - lctest = test.lower() - testcls = Test - - for ext, cls in self.TESTTYPES: - if lctest.endswith(ext): - testcls = cls - break - - refpath = os.path.join(self._testdir, test) - tmpdir = os.path.join(self._hgtmp, b'child%d' % count) - - t = testcls(refpath, tmpdir, - keeptmpdir=self.options.keep_tmpdir, - debug=self.options.debug, - timeout=self.options.timeout, - startport=self._getport(count), - extraconfigopts=self.options.extra_config_opt, - py3kwarnings=self.options.py3k_warnings, - shell=self.options.shell) - t.should_reload = True - return t - - def _cleanup(self): - """Clean up state from this test invocation.""" - - if self.options.keep_tmpdir: - return - - vlog("# Cleaning up HGTMP", self._hgtmp) - shutil.rmtree(self._hgtmp, True) - for f in self._createdfiles: - try: - os.remove(f) - except OSError: - pass - - def _usecorrectpython(self): - """Configure the environment to use the appropriate Python in tests.""" - # Tests must use the same interpreter as us or bad things will happen. - pyexename = sys.platform == 'win32' and b'python.exe' or b'python' - if getattr(os, 'symlink', None): - vlog("# Making python executable in test path a symlink to '%s'" % - sys.executable) - mypython = os.path.join(self._tmpbindir, pyexename) - try: - if os.readlink(mypython) == sys.executable: - return - os.unlink(mypython) - except OSError as err: - if err.errno != errno.ENOENT: - raise - if self._findprogram(pyexename) != sys.executable: - try: - os.symlink(sys.executable, mypython) - self._createdfiles.append(mypython) - except OSError as err: - # child processes may race, which is harmless - if err.errno != errno.EEXIST: - raise - else: - exedir, exename = os.path.split(sys.executable) - vlog("# Modifying search path to find %s as %s in '%s'" % - (exename, pyexename, exedir)) - path = os.environ['PATH'].split(os.pathsep) - while exedir in path: - path.remove(exedir) - os.environ['PATH'] = os.pathsep.join([exedir] + path) - if not self._findprogram(pyexename): - print("WARNING: Cannot find %s in search path" % pyexename) - - def _installhg(self): - """Install hg into the test environment. - - This will also configure hg with the appropriate testing settings. - """ - vlog("# Performing temporary installation of HG") - installerrs = os.path.join(b"tests", b"install.err") - compiler = '' - if self.options.compiler: - compiler = '--compiler ' + self.options.compiler - if self.options.pure: - pure = b"--pure" - else: - pure = b"" - py3 = '' - - # Run installer in hg root - script = os.path.realpath(sys.argv[0]) - exe = sys.executable - if PYTHON3: - py3 = b'--c2to3' - compiler = _bytespath(compiler) - script = _bytespath(script) - exe = _bytespath(exe) - hgroot = os.path.dirname(os.path.dirname(script)) - self._hgroot = hgroot - os.chdir(hgroot) - nohome = b'--home=""' - if os.name == 'nt': - # The --home="" trick works only on OS where os.sep == '/' - # because of a distutils convert_path() fast-path. Avoid it at - # least on Windows for now, deal with .pydistutils.cfg bugs - # when they happen. - nohome = b'' - cmd = (b'%(exe)s setup.py %(py3)s %(pure)s clean --all' - b' build %(compiler)s --build-base="%(base)s"' - b' install --force --prefix="%(prefix)s"' - b' --install-lib="%(libdir)s"' - b' --install-scripts="%(bindir)s" %(nohome)s >%(logfile)s 2>&1' - % {b'exe': exe, b'py3': py3, b'pure': pure, - b'compiler': compiler, - b'base': os.path.join(self._hgtmp, b"build"), - b'prefix': self._installdir, b'libdir': self._pythondir, - b'bindir': self._bindir, - b'nohome': nohome, b'logfile': installerrs}) - - # setuptools requires install directories to exist. - def makedirs(p): - try: - os.makedirs(p) - except OSError as e: - if e.errno != errno.EEXIST: - raise - makedirs(self._pythondir) - makedirs(self._bindir) - - vlog("# Running", cmd) - if os.system(cmd) == 0: - if not self.options.verbose: - os.remove(installerrs) - else: - f = open(installerrs, 'rb') - for line in f: - if PYTHON3: - sys.stdout.buffer.write(line) - else: - sys.stdout.write(line) - f.close() - sys.exit(1) - os.chdir(self._testdir) - - self._usecorrectpython() - - if self.options.py3k_warnings and not self.options.anycoverage: - vlog("# Updating hg command to enable Py3k Warnings switch") - f = open(os.path.join(self._bindir, 'hg'), 'rb') - lines = [line.rstrip() for line in f] - lines[0] += ' -3' - f.close() - f = open(os.path.join(self._bindir, 'hg'), 'wb') - for line in lines: - f.write(line + '\n') - f.close() - - hgbat = os.path.join(self._bindir, b'hg.bat') - if os.path.isfile(hgbat): - # hg.bat expects to be put in bin/scripts while run-tests.py - # installation layout put it in bin/ directly. Fix it - f = open(hgbat, 'rb') - data = f.read() - f.close() - if b'"%~dp0..\python" "%~dp0hg" %*' in data: - data = data.replace(b'"%~dp0..\python" "%~dp0hg" %*', - b'"%~dp0python" "%~dp0hg" %*') - f = open(hgbat, 'wb') - f.write(data) - f.close() - else: - print('WARNING: cannot fix hg.bat reference to python.exe') - - if self.options.anycoverage: - custom = os.path.join(self._testdir, 'sitecustomize.py') - target = os.path.join(self._pythondir, 'sitecustomize.py') - vlog('# Installing coverage trigger to %s' % target) - shutil.copyfile(custom, target) - rc = os.path.join(self._testdir, '.coveragerc') - vlog('# Installing coverage rc to %s' % rc) - os.environ['COVERAGE_PROCESS_START'] = rc - covdir = os.path.join(self._installdir, '..', 'coverage') - try: - os.mkdir(covdir) - except OSError as e: - if e.errno != errno.EEXIST: - raise - - os.environ['COVERAGE_DIR'] = covdir - - def _checkhglib(self, verb): - """Ensure that the 'mercurial' package imported by python is - the one we expect it to be. If not, print a warning to stderr.""" - if ((self._bindir == self._pythondir) and - (self._bindir != self._tmpbindir)): - # The pythondir has been inferred from --with-hg flag. - # We cannot expect anything sensible here. - return - expecthg = os.path.join(self._pythondir, b'mercurial') - actualhg = self._gethgpath() - if os.path.abspath(actualhg) != os.path.abspath(expecthg): - sys.stderr.write('warning: %s with unexpected mercurial lib: %s\n' - ' (expected %s)\n' - % (verb, actualhg, expecthg)) - def _gethgpath(self): - """Return the path to the mercurial package that is actually found by - the current Python interpreter.""" - if self._hgpath is not None: - return self._hgpath - - cmd = b'%s -c "import mercurial; print (mercurial.__path__[0])"' - cmd = cmd % PYTHON - if PYTHON3: - cmd = _strpath(cmd) - pipe = os.popen(cmd) - try: - self._hgpath = _bytespath(pipe.read().strip()) - finally: - pipe.close() - - return self._hgpath - - def _outputcoverage(self): - """Produce code coverage output.""" - from coverage import coverage - - vlog('# Producing coverage report') - # chdir is the easiest way to get short, relative paths in the - # output. - os.chdir(self._hgroot) - covdir = os.path.join(self._installdir, '..', 'coverage') - cov = coverage(data_file=os.path.join(covdir, 'cov')) - - # Map install directory paths back to source directory. - cov.config.paths['srcdir'] = ['.', self._pythondir] - - cov.combine() - - omit = [os.path.join(x, '*') for x in [self._bindir, self._testdir]] - cov.report(ignore_errors=True, omit=omit) - - if self.options.htmlcov: - htmldir = os.path.join(self._testdir, 'htmlcov') - cov.html_report(directory=htmldir, omit=omit) - if self.options.annotate: - adir = os.path.join(self._testdir, 'annotated') - if not os.path.isdir(adir): - os.mkdir(adir) - cov.annotate(directory=adir, omit=omit) - - def _findprogram(self, program): - """Search PATH for a executable program""" - dpb = _bytespath(os.defpath) - sepb = _bytespath(os.pathsep) - for p in osenvironb.get(b'PATH', dpb).split(sepb): - name = os.path.join(p, program) - if os.name == 'nt' or os.access(name, os.X_OK): - return name - return None - - def _checktools(self): - """Ensure tools required to run tests are present.""" - for p in self.REQUIREDTOOLS: - if os.name == 'nt' and not p.endswith('.exe'): - p += '.exe' - found = self._findprogram(p) - if found: - vlog("# Found prerequisite", p, "at", found) - else: - print("WARNING: Did not find prerequisite tool: %s " % p) - -if __name__ == '__main__': - runner = TestRunner() - - try: - import msvcrt - msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY) - msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) - msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) - except ImportError: - pass - - sys.exit(runner.run(sys.argv[1:]))