@@ -540,23 +540,25 @@ def outputcoverage(options):
adir = os.path.join(TESTDIR, 'annotated')
if not os.path.isdir(adir):
os.mkdir(adir)
covrun('-i', '-a', '"--directory=%s"' % adir, '"--omit=%s"' % omit)
class Test(object):
"""Encapsulates a single, runnable test."""
- def __init__(self, path, options, threadtmp, count):
+ def __init__(self, path, options, count):
self._path = path
self._options = options
- self._threadtmp = threadtmp
- self.testtmp = os.path.join(threadtmp, os.path.basename(path))
- os.mkdir(self.testtmp)
+ self.threadtmp = os.path.join(HGTMP, 'child%d' % count)
+ os.mkdir(self.threadtmp)
+
+ self._testtmp = os.path.join(self.threadtmp, os.path.basename(path))
+ os.mkdir(self._testtmp)
self._setreplacements(count)
def run(self):
env = self._getenv()
createhgrc(env['HGRCPATH'], self._options)
try:
@@ -574,32 +576,32 @@ class Test(object):
(r':%s\b' % (port + 1), ':$HGPORT1'),
(r':%s\b' % (port + 2), ':$HGPORT2'),
]
if os.name == 'nt':
r.append(
(''.join(c.isalpha() and '[%s%s]' % (c.lower(), c.upper()) or
c in '/\\' and r'[/\\]' or c.isdigit() and c or '\\' + c
- for c in self.testtmp), '$TESTTMP'))
+ for c in self._testtmp), '$TESTTMP'))
else:
- r.append((re.escape(self.testtmp), '$TESTTMP'))
+ r.append((re.escape(self._testtmp), '$TESTTMP'))
self._replacements = r
self._port = port
def _getenv(self):
env = os.environ.copy()
- env['TESTTMP'] = self.testtmp
- env['HOME'] = self.testtmp
+ env['TESTTMP'] = self._testtmp
+ env['HOME'] = self._testtmp
env["HGPORT"] = str(self._port)
env["HGPORT1"] = str(self._port + 1)
env["HGPORT2"] = str(self._port + 2)
- env["HGRCPATH"] = os.path.join(self._threadtmp, '.hgrc')
- env["DAEMON_PIDS"] = os.path.join(self._threadtmp, 'daemon.pids')
+ env["HGRCPATH"] = os.path.join(self.threadtmp, '.hgrc')
+ env["DAEMON_PIDS"] = os.path.join(self.threadtmp, 'daemon.pids')
env["HGEDITOR"] = sys.executable + ' -c "import sys; sys.exit(0)"'
env["HGMERGE"] = "internal:merge"
env["HGUSER"] = "test"
env["HGENCODING"] = "ascii"
env["HGENCODINGMODE"] = "strict"
# Reset some environment variables to well-known values so that
# the tests produce repeatable output.
@@ -627,17 +629,17 @@ def pytest(test, wd, options, replacemen
vlog("# Running", cmd)
if os.name == 'nt':
replacements.append((r'\r\n', '\n'))
return run(cmd, wd, options, replacements, env)
class PythonTest(Test):
"""A Python-based test."""
def _run(self, replacements, env):
- return pytest(self._path, self.testtmp, self._options, replacements,
+ return pytest(self._path, self._testtmp, self._options, replacements,
env)
needescape = re.compile(r'[\x00-\x08\x0b-\x1f\x7f-\xff]').search
escapesub = re.compile(r'[\x00-\x08\x0b-\x1f\\\x7f-\xff]').sub
escapemap = dict((chr(i), r'\x%02x' % i) for i in range(256))
escapemap.update({'\\': '\\\\', '\r': r'\r'})
def escapef(m):
return escapemap[m.group(0)]
@@ -890,17 +892,17 @@ def tsttest(test, wd, options, replaceme
if warnonly == 2:
exitcode = False # set exitcode to warned
return exitcode, postout
class TTest(Test):
"""A "t test" is a test backed by a .t file."""
def _run(self, replacements, env):
- return tsttest(self._path, self.testtmp, self._options, replacements,
+ return tsttest(self._path, self._testtmp, self._options, replacements,
env)
wifexited = getattr(os, "WIFEXITED", lambda x: False)
def run(cmd, wd, options, replacements, env):
"""Run command in a sub-process, capturing the output (stdout and stderr).
Return a tuple (exitcode, output). output is None in debug mode."""
# TODO: Use subprocess.Popen if we're running on Python 2.4
if options.debug:
@@ -1015,21 +1017,17 @@ def runone(options, test, count):
else:
return skip("unknown test type")
vlog("# Test", test)
if os.path.exists(err):
os.remove(err) # Remove any previous output files
- # Make a tmp subdirectory to work in
- threadtmp = os.path.join(HGTMP, "child%d" % count)
- os.mkdir(threadtmp)
-
- t = runner(testpath, options, threadtmp, count)
+ t = runner(testpath, options, count)
starttime = time.time()
try:
ret, out = t.run()
except KeyboardInterrupt:
endtime = time.time()
log('INTERRUPTED: %s (after %d seconds)' % (test, endtime - starttime))
raise
@@ -1095,17 +1093,17 @@ def runone(options, test, count):
if not options.verbose:
iolock.acquire()
sys.stdout.write(result[0])
sys.stdout.flush()
iolock.release()
if not options.keep_tmpdir:
- shutil.rmtree(threadtmp, True)
+ shutil.rmtree(t.threadtmp, True)
return result
_hgpath = None
def _gethgpath():
"""Return the path to the mercurial package that is actually found by
the current Python interpreter."""
global _hgpath