Skip to content

Commit

Permalink
gh-109162: libregrtest: move code around
Browse files Browse the repository at this point in the history
* Move Regrtest.display_header() to utils.py.
* Move cleanup_temp_dir() to utils.py.
* Move list_cases() to findtests.py.
  • Loading branch information
vstinner committed Sep 11, 2023
1 parent 3b2ecbc commit b374559
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 93 deletions.
42 changes: 41 additions & 1 deletion Lib/test/libregrtest/findtests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import os
import sys
import unittest

from .utils import StrPath, TestName, TestList
from test import support

from .utils import (
StrPath, TestName, TestTuple, TestList, FilterTuple,
abs_module_name, count, printlist)


# If these test directories are encountered recurse into them and treat each
Expand Down Expand Up @@ -56,3 +62,37 @@ def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(),
else:
splitted.append(name)
return splitted


def _list_cases(suite):
for test in suite:
if isinstance(test, unittest.loader._FailedTest):
continue
if isinstance(test, unittest.TestSuite):
_list_cases(test)
elif isinstance(test, unittest.TestCase):
if support.match_test(test):
print(test.id())

def list_cases(tests: TestTuple, *,
match_tests: FilterTuple | None = None,
ignore_tests: FilterTuple | None = None,
test_dir: StrPath | None = None):
support.verbose = False
support.set_match_tests(match_tests, ignore_tests)

skipped = []
for test_name in tests:
module_name = abs_module_name(test_name, test_dir)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
_list_cases(suite)
except unittest.SkipTest:
skipped.append(test_name)

if skipped:
sys.stdout.flush()
stderr = sys.stderr
print(file=stderr)
print(count(len(skipped), "test"), "skipped:", file=stderr)
printlist(skipped, file=stderr)
101 changes: 9 additions & 92 deletions Lib/test/libregrtest/main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import locale
import os
import platform
import random
import re
import sys
import time
import unittest

from test import support
from test.support import os_helper

from .cmdline import _parse_args, Namespace
from .findtests import findtests, split_test_packages
from .findtests import findtests, split_test_packages, list_cases
from .logger import Logger
from .result import State
from .runtests import RunTests, HuntRefleak
Expand All @@ -22,8 +19,8 @@
from .utils import (
StrPath, StrJSON, TestName, TestList, TestTuple, FilterTuple,
strip_py_suffix, count, format_duration,
printlist, get_build_info, get_temp_dir, get_work_dir, exit_timeout,
abs_module_name)
printlist, get_temp_dir, get_work_dir, exit_timeout,
display_header, cleanup_temp_dir)


class Regrtest:
Expand Down Expand Up @@ -214,36 +211,6 @@ def list_tests(tests: TestTuple):
for name in tests:
print(name)

def _list_cases(self, suite):
for test in suite:
if isinstance(test, unittest.loader._FailedTest):
continue
if isinstance(test, unittest.TestSuite):
self._list_cases(test)
elif isinstance(test, unittest.TestCase):
if support.match_test(test):
print(test.id())

def list_cases(self, tests: TestTuple):
support.verbose = False
support.set_match_tests(self.match_tests, self.ignore_tests)

skipped = []
for test_name in tests:
module_name = abs_module_name(test_name, self.test_dir)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
self._list_cases(suite)
except unittest.SkipTest:
skipped.append(test_name)

if skipped:
sys.stdout.flush()
stderr = sys.stderr
print(file=stderr)
print(count(len(skipped), "test"), "skipped:", file=stderr)
printlist(skipped, file=stderr)

def _rerun_failed_tests(self, runtests: RunTests):
# Configure the runner to re-run tests
if self.num_workers == 0:
Expand Down Expand Up @@ -363,45 +330,6 @@ def run_tests_sequentially(self, runtests):

return tracer

@staticmethod
def display_header():
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
print("==", platform.platform(aliased=True),
"%s-endian" % sys.byteorder)
print("== Python build:", ' '.join(get_build_info()))
print("== cwd:", os.getcwd())
cpu_count = os.cpu_count()
if cpu_count:
print("== CPU count:", cpu_count)
print("== encodings: locale=%s, FS=%s"
% (locale.getencoding(), sys.getfilesystemencoding()))

# This makes it easier to remember what to set in your local
# environment when trying to reproduce a sanitizer failure.
asan = support.check_sanitizer(address=True)
msan = support.check_sanitizer(memory=True)
ubsan = support.check_sanitizer(ub=True)
sanitizers = []
if asan:
sanitizers.append("address")
if msan:
sanitizers.append("memory")
if ubsan:
sanitizers.append("undefined behavior")
if not sanitizers:
return

print(f"== sanitizers: {', '.join(sanitizers)}")
for sanitizer, env_var in (
(asan, "ASAN_OPTIONS"),
(msan, "MSAN_OPTIONS"),
(ubsan, "UBSAN_OPTIONS"),
):
options= os.environ.get(env_var)
if sanitizer and options is not None:
print(f"== {env_var}={options!r}")

def get_state(self):
state = self.results.get_state(self.fail_env_changed)
if self.first_state:
Expand Down Expand Up @@ -445,20 +373,6 @@ def display_summary(self):
state = self.get_state()
print(f"Result: {state}")

@staticmethod
def cleanup_temp_dir(tmp_dir: StrPath):
import glob

path = os.path.join(glob.escape(tmp_dir), 'test_python_*')
print("Cleanup %s directory" % tmp_dir)
for name in glob.glob(path):
if os.path.isdir(name):
print("Remove directory: %s" % name)
os_helper.rmtree(name)
else:
print("Remove file: %s" % name)
os_helper.unlink(name)

def create_run_tests(self, tests: TestTuple):
return RunTests(
tests,
Expand Down Expand Up @@ -496,7 +410,7 @@ def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
if (self.want_header
or not(self.pgo or self.quiet or self.single_test_run
or tests or self.cmdline_args)):
self.display_header()
display_header()

if self.randomize:
print("Using random seed", self.random_seed)
Expand Down Expand Up @@ -554,7 +468,7 @@ def main(self, tests: TestList | None = None):
self.tmp_dir = get_temp_dir(self.tmp_dir)

if self.want_cleanup:
self.cleanup_temp_dir(self.tmp_dir)
cleanup_temp_dir(self.tmp_dir)
sys.exit(0)

if self.want_wait:
Expand All @@ -567,7 +481,10 @@ def main(self, tests: TestList | None = None):
if self.want_list_tests:
self.list_tests(selected)
elif self.want_list_cases:
self.list_cases(selected)
list_cases(selected,
match_tests=self.match_tests,
ignore_tests=self.ignore_tests,
test_dir=self.test_dir)
else:
exitcode = self.run_tests(selected, tests)

Expand Down
55 changes: 55 additions & 0 deletions Lib/test/libregrtest/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import atexit
import contextlib
import faulthandler
import locale
import math
import os.path
import platform
import random
import sys
import sysconfig
Expand Down Expand Up @@ -524,3 +526,56 @@ def adjust_rlimit_nofile():
except (ValueError, OSError) as err:
print_warning(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to "
f"{new_fd_limit}: {err}.")


def display_header():
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
print("==", platform.platform(aliased=True),
"%s-endian" % sys.byteorder)
print("== Python build:", ' '.join(get_build_info()))
print("== cwd:", os.getcwd())
cpu_count = os.cpu_count()
if cpu_count:
print("== CPU count:", cpu_count)
print("== encodings: locale=%s, FS=%s"
% (locale.getencoding(), sys.getfilesystemencoding()))

# This makes it easier to remember what to set in your local
# environment when trying to reproduce a sanitizer failure.
asan = support.check_sanitizer(address=True)
msan = support.check_sanitizer(memory=True)
ubsan = support.check_sanitizer(ub=True)
sanitizers = []
if asan:
sanitizers.append("address")
if msan:
sanitizers.append("memory")
if ubsan:
sanitizers.append("undefined behavior")
if not sanitizers:
return

print(f"== sanitizers: {', '.join(sanitizers)}")
for sanitizer, env_var in (
(asan, "ASAN_OPTIONS"),
(msan, "MSAN_OPTIONS"),
(ubsan, "UBSAN_OPTIONS"),
):
options= os.environ.get(env_var)
if sanitizer and options is not None:
print(f"== {env_var}={options!r}")


def cleanup_temp_dir(tmp_dir: StrPath):
import glob

path = os.path.join(glob.escape(tmp_dir), 'test_python_*')
print("Cleanup %s directory" % tmp_dir)
for name in glob.glob(path):
if os.path.isdir(name):
print("Remove directory: %s" % name)
os_helper.rmtree(name)
else:
print("Remove file: %s" % name)
os_helper.unlink(name)

0 comments on commit b374559

Please sign in to comment.