From c18e34643ed6dfbc06a431fb38e3337962ee01b8 Mon Sep 17 00:00:00 2001 From: Jacob Lifshay Date: Fri, 2 Jun 2023 19:15:49 -0700 Subject: [PATCH] all tests pass! --- pytest_output_to_files.py | 127 +++++++++++------ tests/conftest.py | 1 + tests/test_output_to_files.py | 249 ++++++++++++++++++++++++++++++++++ 3 files changed, 339 insertions(+), 38 deletions(-) create mode 100644 tests/conftest.py create mode 100644 tests/test_output_to_files.py diff --git a/pytest_output_to_files.py b/pytest_output_to_files.py index b24ddae..91a8937 100644 --- a/pytest_output_to_files.py +++ b/pytest_output_to_files.py @@ -4,6 +4,7 @@ from typing import TextIO, Generator, Any import io import sys import errno +import contextlib from pathlib import Path @@ -11,24 +12,48 @@ if os.name != 'posix': raise ValueError( f"{sys.platform} is not supported by pytest-output-to-files") +_DEFAULT_LINE_LIMIT = 5000 -class __Capture: - def __init__(self, target, line_limit=5000, chunk_size=1 << 16): - # type: (TextIO, int, int) -> None - self.__target = target - self.__old_target_fd = os.dup(target.fileno()) + +class _Capture: + def __init__(self, target_attr, line_limit=_DEFAULT_LINE_LIMIT, + chunk_size=1 << 16): + # type: (str, int, int) -> None + self.__target_attr = target_attr + self.__old_target = self.__target + try: + self.__target_fd = self.__target.fileno() + except io.UnsupportedOperation: + self.__target_fd = None + if self.__target_fd is not None: + self.__old_target_fd = os.dup(self.__target_fd) + self.__file_dup = None # type: None | TextIO self.__file_path = None # type: None | Path self.__file = None # type: None | io.FileIO self.__line_limit = line_limit self.__buf = memoryview(bytearray(chunk_size)) self.__active = False + @property + def __target(self): + # type: () -> TextIO + return getattr(sys, self.__target_attr) + + @__target.setter + def __target(self, v): + # type: (TextIO) -> None + setattr(sys, self.__target_attr, v) + def resume(self): assert self.__file is not None, \ "resume called without calling start and pause" assert not self.active, "resume called without calling pause" self.__target.flush() - os.dup2(self.__file.fileno(), self.__target.fileno()) + if self.__target_fd is None: + assert self.__file_dup is not None, "inconsistent state" + self.__target = self.__file_dup + else: + os.dup2(self.__file.fileno(), self.__target_fd) self.__active = True @property @@ -45,7 +70,10 @@ class __Capture: assert self.started, "pause called without calling start" assert self.active, "pause called without calling resume" self.__target.flush() - os.dup2(self.__old_target_fd, self.__target.fileno()) + if self.__target_fd is None: + self.__target = self.__old_target + else: + os.dup2(self.__old_target_fd, self.__target.fileno()) self.__active = False def start(self, file_path): @@ -53,6 +81,9 @@ class __Capture: assert not self.started, "start called without calling stop" self.__file_path = file_path self.__file = file_path.open("wb+", buffering=0) + if self.__target_fd is None: + self.__file_dup = os.fdopen( + os.dup(self.__file.fileno()), "w", encoding="utf-8") self.resume() def __read_chunk_at(self, pos, required_len): @@ -134,6 +165,10 @@ class __Capture: file_length = self.__file.seek(0, os.SEEK_END) end_lines, _ = self.__read_lines_at( line_limit=self.__line_limit, pos=file_length, backwards=True) + if start_lines.endswith(b"\n"): + start_lines = start_lines[:-1] + if end_lines.endswith(b"\n"): + end_lines = end_lines[:-1] hr = '-' * 50 trimmed_msg = f"Output Trimmed, Full output in: {self.__file_path}" retval = [ @@ -154,77 +189,91 @@ class __Capture: return if self.active: self.pause() + if self.__file_dup is not None: + self.__file_dup.close() self.__file.close() - self.__file_path, self.__file = None, None + self.__file_path = None + self.__file = None + self.__file_dup = None def stop(self): + # type: () -> str assert self.__file is not None, "stop called without calling start" if self.active: self.pause() try: - print(self.__read_output_str(), file=self.__target) + retval = self.__read_output_str() finally: self.abort() - return + return retval -class __OutputToFilesPlugin: +class _OutputToFilesPlugin: def __init__(self, output_dir): # type: (str) -> None self.output_dir = Path(output_dir) self.__captures = { - "stdout.txt": __Capture(sys.stdout), - "stderr.txt": __Capture(sys.stderr), + "stdout": _Capture("stdout"), + "stderr": _Capture("stderr"), } def __repr__(self): # type: () -> str return f"" - def __start(self, item): - # type: (pytest.Item) -> None + def __start(self, item, when): + # type: (pytest.Item, str) -> None path = self.output_dir for part in item.nodeid.split('::'): path /= part.replace(".", "_") path.mkdir(0o775, parents=True, exist_ok=True) for name, capture in self.__captures.items(): - capture.start(path / name) + capture.start(path / f"{when}-{name}.txt") - @pytest.hookimpl(tryfirst=True) - def pytest_keyboard_interrupt(self, excinfo): + def __stop(self, item, when): + # type: (pytest.Item, str) -> None + for name, capture in self.__captures.items(): + item.add_report_section(when, name, capture.stop()) + + def __abort(self): for capture in self.__captures.values(): capture.abort() + @contextlib.contextmanager + def __capture_item(self, item, when): + # type: (pytest.Item, str) -> Generator[Any, Any, Any] + try: + self.__start(item, when) + yield + self.__stop(item, when) + finally: + self.__abort() + + @pytest.hookimpl(tryfirst=True) + def pytest_keyboard_interrupt(self, excinfo): + self.__abort() + @pytest.hookimpl(tryfirst=True) def pytest_internalerror(self, excinfo): - for capture in self.__captures.values(): - capture.abort() + self.__abort() @pytest.hookimpl(hookwrapper=True, trylast=True) def pytest_runtest_setup(self, item): # type: (pytest.Item) -> Generator[Any, Any, Any] - self.__start(item) - yield - for capture in self.__captures.values(): - capture.pause() + with self.__capture_item(item, "setup"): + yield @pytest.hookimpl(hookwrapper=True, trylast=True) def pytest_runtest_call(self, item): # type: (pytest.Item) -> Generator[Any, Any, Any] - for capture in self.__captures.values(): - capture.resume() - yield - for capture in self.__captures.values(): - capture.pause() + with self.__capture_item(item, "call"): + yield @pytest.hookimpl(hookwrapper=True, trylast=True) def pytest_runtest_teardown(self, item): # type: (pytest.Item) -> Generator[Any, Any, Any] - for capture in self.__captures.values(): - capture.resume() - yield - for capture in self.__captures.values(): - capture.stop() + with self.__capture_item(item, "teardown"): + yield def pytest_addoption(parser): @@ -234,9 +283,9 @@ def pytest_addoption(parser): '--shorten-output-dir', action='store', metavar="DIR", - default="", help=('shorten test outputs by storing them in files in DIR and ' - 'returning just the first/last few lines')) + 'returning just the first/last few lines. disable by ' + 'using --shorten-output-dir=""')) parser.addini( 'shorten-output-dir', @@ -247,9 +296,11 @@ def pytest_addoption(parser): def pytest_configure(config): # type: (pytest.Config) -> None + if config.option.capture == "no": + return output_dir = config.getoption('--shorten-output-dir') - if output_dir == "": + if output_dir is None: output_dir = config.getini('shorten-output-dir') if output_dir != "": assert isinstance(output_dir, str), "invalid shorten-output-dir" - config.pluginmanager.register(__OutputToFilesPlugin(output_dir)) + config.pluginmanager.register(_OutputToFilesPlugin(output_dir)) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..bc711e5 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1 @@ +pytest_plugins = 'pytester' diff --git a/tests/test_output_to_files.py b/tests/test_output_to_files.py new file mode 100644 index 0000000..13cf6e7 --- /dev/null +++ b/tests/test_output_to_files.py @@ -0,0 +1,249 @@ +import pytest +import sys +from pathlib import Path +from pytest_output_to_files import _DEFAULT_LINE_LIMIT + + +def test_help_message(testdir): + # type: (pytest.Testdir) -> None + result = testdir.runpytest( + '--help', + ) + # fnmatch_lines does an assertion internally + result.stdout.fnmatch_lines([ + 'shortening output:', + '*--shorten-output-dir=DIR*', + '*shorten test outputs by storing them in files in DIR and*', + '*returning just the first/last few lines. disable by*', + '*using --shorten-output-dir=""*', + ]) + + +def do_stdout_stderr_check(testdir, additional_args, stdout_lines, + stderr_lines, enabled): + # type: (pytest.Testdir, list[str], int, int, bool) -> pytest.RunResult + testdir.makepyfile(test_print=f""" + import sys + + def test_print(): + for i in range({stdout_lines}): + print(f'in stdout {{i}}') + for i in range({stderr_lines}): + print(f'in stderr {{i}}', file=sys.stderr) + assert False + """) + + full_stdout = ''.join(f'in stdout {i}\n' for i in range(stdout_lines)) + full_stderr = ''.join(f'in stderr {i}\n' for i in range(stderr_lines)) + + result = testdir.runpytest('-v', *additional_args) + + test_out_path = Path(testdir.tmpdir) + test_out_path /= "test-out" + test_print_path = test_out_path / "test_print_py" + test_print_path /= "test_print" + call_stdout_path = test_print_path / "call-stdout.txt" + call_stderr_path = test_print_path / "call-stderr.txt" + + lines = ['*--- Captured stdout call ---*'] + hr = '-' * 50 + if enabled and stdout_lines >= 2 * _DEFAULT_LINE_LIMIT: + trimmed_msg = ("Output Trimmed, Full output in: " + "test-out/test_print_py/test_print/call-stdout.txt") + lines.append(trimmed_msg) + lines.append(hr) + for i in range(_DEFAULT_LINE_LIMIT): + lines.append(f'in stdout {i}') + lines.append(hr) + lines.append(trimmed_msg) + lines.append(hr) + for i in range(stdout_lines - _DEFAULT_LINE_LIMIT, stdout_lines): + lines.append(f'in stdout {i}') + lines.append(hr) + lines.append(trimmed_msg) + else: + for i in range(stdout_lines): + lines.append(f'in stdout {i}') + lines.append('*--- Captured stderr call ---*') + if enabled and stderr_lines >= 2 * _DEFAULT_LINE_LIMIT: + trimmed_msg = ("Output Trimmed, Full output in: " + "test-out/test_print_py/test_print/call-stderr.txt") + lines.append(trimmed_msg) + lines.append(hr) + for i in range(_DEFAULT_LINE_LIMIT): + lines.append(f'in stderr {i}') + lines.append(hr) + lines.append(trimmed_msg) + lines.append(hr) + for i in range(stderr_lines - _DEFAULT_LINE_LIMIT, stderr_lines): + lines.append(f'in stderr {i}') + lines.append(hr) + lines.append(trimmed_msg) + else: + for i in range(stderr_lines): + lines.append(f'in stderr {i}') + lines.append("*====*") + + result.stdout.fnmatch_lines(lines, consecutive=True) + + result.stdout.fnmatch_lines([ + 'FAILED test_print.py::test_print *', + ]) + + if enabled: + for empty_file in ("setup-stdout.txt", "setup-stderr.txt", + "teardown-stdout.txt", "teardown-stderr.txt"): + assert (test_print_path / empty_file).read_text("utf-8") == "" + assert call_stdout_path.read_text("utf-8") == full_stdout + assert call_stderr_path.read_text("utf-8") == full_stderr + call_stdout_path.unlink() # remove big files + call_stderr_path.unlink() # remove big files + else: + assert not test_out_path.exists() + assert result.ret != 0 + + return result + + +def test_ini_setting(testdir): + # type: (pytest.Testdir) -> None + testdir.makeini(""" + [pytest] + shorten-output-dir = test-out + """) + + do_stdout_stderr_check(testdir, [], 1, 1, True) + + +def test_nothing(testdir): + # type: (pytest.Testdir) -> None + do_stdout_stderr_check(testdir, [], 1, 1, False) + + +def test_arg(testdir): + # type: (pytest.Testdir) -> None + do_stdout_stderr_check( + testdir, ["--shorten-output-dir=test-out"], 1, 1, True) + + +def test_arg_override_ini(testdir): + # type: (pytest.Testdir) -> None + testdir.makeini(""" + [pytest] + shorten-output-dir = test-out + """) + + do_stdout_stderr_check( + testdir, ["--shorten-output-dir="], 1, 1, False) + + +def test_disable_capture(testdir): + # type: (pytest.Testdir) -> None + testdir.makeini(""" + [pytest] + shorten-output-dir = test-out + """) + + testdir.makepyfile(test_print=f""" + import sys + + def test_print(): + print(f'in stdout') + print(f'in stderr', file=sys.stderr) + assert False + """) + + result = testdir.runpytest('-v', '-s') + + test_out_path = Path(testdir.tmpdir) + test_out_path /= "test-out" + + assert not test_out_path.exists() + + result.stdout.fnmatch_lines(['test_print.py::test_print*in stdout']) + result.stderr.fnmatch_lines(['in stderr']) + + assert result.ret != 0 + + +def test_20k_disabled(testdir): + # type: (pytest.Testdir) -> None + do_stdout_stderr_check(testdir, [], 20000, 20000, False) + + +def test_20k(testdir): + # type: (pytest.Testdir) -> None + do_stdout_stderr_check( + testdir, ["--shorten-output-dir=test-out"], 20000, 20000, True) + + +def test_21k(testdir): + # type: (pytest.Testdir) -> None + do_stdout_stderr_check( + testdir, ["--shorten-output-dir=test-out"], 21000, 21000, True) + + +def test_22k(testdir): + # type: (pytest.Testdir) -> None + do_stdout_stderr_check( + testdir, ["--shorten-output-dir=test-out"], 22000, 22000, True) + + +def test_1x(testdir): + # type: (pytest.Testdir) -> None + lines = _DEFAULT_LINE_LIMIT + do_stdout_stderr_check( + testdir, ["--shorten-output-dir=test-out"], lines, lines, True) + + +def test_50_percent_more(testdir): + # type: (pytest.Testdir) -> None + lines = _DEFAULT_LINE_LIMIT + _DEFAULT_LINE_LIMIT // 2 + do_stdout_stderr_check( + testdir, ["--shorten-output-dir=test-out"], lines, lines, True) + + +def test_2x_minus_two(testdir): + # type: (pytest.Testdir) -> None + lines = _DEFAULT_LINE_LIMIT * 2 - 2 + do_stdout_stderr_check( + testdir, ["--shorten-output-dir=test-out"], lines, lines, True) + + +def test_2x_minus_one(testdir): + # type: (pytest.Testdir) -> None + lines = _DEFAULT_LINE_LIMIT * 2 - 1 + do_stdout_stderr_check( + testdir, ["--shorten-output-dir=test-out"], lines, lines, True) + + +def test_2x(testdir): + # type: (pytest.Testdir) -> None + lines = _DEFAULT_LINE_LIMIT * 2 + do_stdout_stderr_check( + testdir, ["--shorten-output-dir=test-out"], lines, lines, True) + + +def test_2x_plus_one(testdir): + # type: (pytest.Testdir) -> None + lines = _DEFAULT_LINE_LIMIT * 2 + 1 + do_stdout_stderr_check( + testdir, ["--shorten-output-dir=test-out"], lines, lines, True) + + +def test_2x_plus_two(testdir): + # type: (pytest.Testdir) -> None + lines = _DEFAULT_LINE_LIMIT * 2 + 2 + do_stdout_stderr_check( + testdir, ["--shorten-output-dir=test-out"], lines, lines, True) + + +def test_1M(testdir): + # type: (pytest.Testdir) -> None + lines = 1_000_000 + do_stdout_stderr_check( + testdir, ["--shorten-output-dir=test-out"], lines, lines, True) + + +if __name__ == "__main__": + sys.exit(pytest.main()) -- 2.30.2