all tests pass!
authorJacob Lifshay <programmerjake@gmail.com>
Sat, 3 Jun 2023 02:15:49 +0000 (19:15 -0700)
committerJacob Lifshay <programmerjake@gmail.com>
Sat, 3 Jun 2023 02:15:49 +0000 (19:15 -0700)
pytest_output_to_files.py
tests/conftest.py [new file with mode: 0644]
tests/test_output_to_files.py [new file with mode: 0644]

index b24ddae935324dfe07f2f7bda5e4431930d2c246..91a893749ea5dceeda82ccaba4e193b2e4861f5e 100644 (file)
@@ -4,6 +4,7 @@ from typing import TextIO, Generator, Any
 import io
 import sys
 import errno
+import contextlib
 from pathlib import Path
 
 
@@ -11,24 +12,48 @@ if os.name != 'posix':
     raise ValueError(
         f"{sys.platform} is not supported by pytest-output-to-files")
 
+_DEFAULT_LINE_LIMIT = 5000
 
-class __Capture:
-    def __init__(self, target, line_limit=5000, chunk_size=1 << 16):
-        # type: (TextIO, int, int) -> None
-        self.__target = target
-        self.__old_target_fd = os.dup(target.fileno())
+
+class _Capture:
+    def __init__(self, target_attr, line_limit=_DEFAULT_LINE_LIMIT,
+                 chunk_size=1 << 16):
+        # type: (str, int, int) -> None
+        self.__target_attr = target_attr
+        self.__old_target = self.__target
+        try:
+            self.__target_fd = self.__target.fileno()
+        except io.UnsupportedOperation:
+            self.__target_fd = None
+        if self.__target_fd is not None:
+            self.__old_target_fd = os.dup(self.__target_fd)
+        self.__file_dup = None  # type: None | TextIO
         self.__file_path = None  # type: None | Path
         self.__file = None  # type: None | io.FileIO
         self.__line_limit = line_limit
         self.__buf = memoryview(bytearray(chunk_size))
         self.__active = False
 
+    @property
+    def __target(self):
+        # type: () -> TextIO
+        return getattr(sys, self.__target_attr)
+
+    @__target.setter
+    def __target(self, v):
+        # type: (TextIO) -> None
+        setattr(sys, self.__target_attr, v)
+
     def resume(self):
         assert self.__file is not None, \
             "resume called without calling start and pause"
         assert not self.active, "resume called without calling pause"
         self.__target.flush()
-        os.dup2(self.__file.fileno(), self.__target.fileno())
+        if self.__target_fd is None:
+            assert self.__file_dup is not None, "inconsistent state"
+            self.__target = self.__file_dup
+        else:
+            os.dup2(self.__file.fileno(), self.__target_fd)
         self.__active = True
 
     @property
@@ -45,7 +70,10 @@ class __Capture:
         assert self.started, "pause called without calling start"
         assert self.active, "pause called without calling resume"
         self.__target.flush()
-        os.dup2(self.__old_target_fd, self.__target.fileno())
+        if self.__target_fd is None:
+            self.__target = self.__old_target
+        else:
+            os.dup2(self.__old_target_fd, self.__target.fileno())
         self.__active = False
 
     def start(self, file_path):
@@ -53,6 +81,9 @@ class __Capture:
         assert not self.started, "start called without calling stop"
         self.__file_path = file_path
         self.__file = file_path.open("wb+", buffering=0)
+        if self.__target_fd is None:
+            self.__file_dup = os.fdopen(
+                os.dup(self.__file.fileno()), "w", encoding="utf-8")
         self.resume()
 
     def __read_chunk_at(self, pos, required_len):
@@ -134,6 +165,10 @@ class __Capture:
         file_length = self.__file.seek(0, os.SEEK_END)
         end_lines, _ = self.__read_lines_at(
             line_limit=self.__line_limit, pos=file_length, backwards=True)
+        if start_lines.endswith(b"\n"):
+            start_lines = start_lines[:-1]
+        if end_lines.endswith(b"\n"):
+            end_lines = end_lines[:-1]
         hr = '-' * 50
         trimmed_msg = f"Output Trimmed, Full output in: {self.__file_path}"
         retval = [
@@ -154,77 +189,91 @@ class __Capture:
             return
         if self.active:
             self.pause()
+        if self.__file_dup is not None:
+            self.__file_dup.close()
         self.__file.close()
-        self.__file_path, self.__file = None, None
+        self.__file_path = None
+        self.__file = None
+        self.__file_dup = None
 
     def stop(self):
+        # type: () -> str
         assert self.__file is not None, "stop called without calling start"
         if self.active:
             self.pause()
         try:
-            print(self.__read_output_str(), file=self.__target)
+            retval = self.__read_output_str()
         finally:
             self.abort()
-        return
+        return retval
 
 
-class __OutputToFilesPlugin:
+class _OutputToFilesPlugin:
     def __init__(self, output_dir):
         # type: (str) -> None
         self.output_dir = Path(output_dir)
         self.__captures = {
-            "stdout.txt": __Capture(sys.stdout),
-            "stderr.txt": __Capture(sys.stderr),
+            "stdout": _Capture("stdout"),
+            "stderr": _Capture("stderr"),
         }
 
     def __repr__(self):
         # type: () -> str
         return f"<OutputToFilesPlugin output_dir={str(self.output_dir)!r}>"
 
-    def __start(self, item):
-        # type: (pytest.Item) -> None
+    def __start(self, item, when):
+        # type: (pytest.Item, str) -> None
         path = self.output_dir
         for part in item.nodeid.split('::'):
             path /= part.replace(".", "_")
         path.mkdir(0o775, parents=True, exist_ok=True)
         for name, capture in self.__captures.items():
-            capture.start(path / name)
+            capture.start(path / f"{when}-{name}.txt")
 
-    @pytest.hookimpl(tryfirst=True)
-    def pytest_keyboard_interrupt(self, excinfo):
+    def __stop(self, item, when):
+        # type: (pytest.Item, str) -> None
+        for name, capture in self.__captures.items():
+            item.add_report_section(when, name, capture.stop())
+
+    def __abort(self):
         for capture in self.__captures.values():
             capture.abort()
 
+    @contextlib.contextmanager
+    def __capture_item(self, item, when):
+        # type: (pytest.Item, str) -> Generator[Any, Any, Any]
+        try:
+            self.__start(item, when)
+            yield
+            self.__stop(item, when)
+        finally:
+            self.__abort()
+
+    @pytest.hookimpl(tryfirst=True)
+    def pytest_keyboard_interrupt(self, excinfo):
+        self.__abort()
+
     @pytest.hookimpl(tryfirst=True)
     def pytest_internalerror(self, excinfo):
-        for capture in self.__captures.values():
-            capture.abort()
+        self.__abort()
 
     @pytest.hookimpl(hookwrapper=True, trylast=True)
     def pytest_runtest_setup(self, item):
         # type: (pytest.Item) -> Generator[Any, Any, Any]
-        self.__start(item)
-        yield
-        for capture in self.__captures.values():
-            capture.pause()
+        with self.__capture_item(item, "setup"):
+            yield
 
     @pytest.hookimpl(hookwrapper=True, trylast=True)
     def pytest_runtest_call(self, item):
         # type: (pytest.Item) -> Generator[Any, Any, Any]
-        for capture in self.__captures.values():
-            capture.resume()
-        yield
-        for capture in self.__captures.values():
-            capture.pause()
+        with self.__capture_item(item, "call"):
+            yield
 
     @pytest.hookimpl(hookwrapper=True, trylast=True)
     def pytest_runtest_teardown(self, item):
         # type: (pytest.Item) -> Generator[Any, Any, Any]
-        for capture in self.__captures.values():
-            capture.resume()
-        yield
-        for capture in self.__captures.values():
-            capture.stop()
+        with self.__capture_item(item, "teardown"):
+            yield
 
 
 def pytest_addoption(parser):
@@ -234,9 +283,9 @@ def pytest_addoption(parser):
         '--shorten-output-dir',
         action='store',
         metavar="DIR",
-        default="",
         help=('shorten test outputs by storing them in files in DIR and '
-              'returning just the first/last few lines'))
+              'returning just the first/last few lines. disable by '
+              'using --shorten-output-dir=""'))
 
     parser.addini(
         'shorten-output-dir',
@@ -247,9 +296,11 @@ def pytest_addoption(parser):
 
 def pytest_configure(config):
     # type: (pytest.Config) -> None
+    if config.option.capture == "no":
+        return
     output_dir = config.getoption('--shorten-output-dir')
-    if output_dir == "":
+    if output_dir is None:
         output_dir = config.getini('shorten-output-dir')
     if output_dir != "":
         assert isinstance(output_dir, str), "invalid shorten-output-dir"
-        config.pluginmanager.register(__OutputToFilesPlugin(output_dir))
+        config.pluginmanager.register(_OutputToFilesPlugin(output_dir))
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644 (file)
index 0000000..bc711e5
--- /dev/null
@@ -0,0 +1 @@
+pytest_plugins = 'pytester'
diff --git a/tests/test_output_to_files.py b/tests/test_output_to_files.py
new file mode 100644 (file)
index 0000000..13cf6e7
--- /dev/null
@@ -0,0 +1,249 @@
+import pytest
+import sys
+from pathlib import Path
+from pytest_output_to_files import _DEFAULT_LINE_LIMIT
+
+
+def test_help_message(testdir):
+    # type: (pytest.Testdir) -> None
+    result = testdir.runpytest(
+        '--help',
+    )
+    # fnmatch_lines does an assertion internally
+    result.stdout.fnmatch_lines([
+        'shortening output:',
+        '*--shorten-output-dir=DIR*',
+        '*shorten test outputs by storing them in files in DIR and*',
+        '*returning just the first/last few lines. disable by*',
+        '*using --shorten-output-dir=""*',
+    ])
+
+
+def do_stdout_stderr_check(testdir, additional_args, stdout_lines,
+                           stderr_lines, enabled):
+    # type: (pytest.Testdir, list[str], int, int, bool) -> pytest.RunResult
+    testdir.makepyfile(test_print=f"""
+        import sys
+
+        def test_print():
+            for i in range({stdout_lines}):
+                print(f'in stdout {{i}}')
+            for i in range({stderr_lines}):
+                print(f'in stderr {{i}}', file=sys.stderr)
+            assert False
+    """)
+
+    full_stdout = ''.join(f'in stdout {i}\n' for i in range(stdout_lines))
+    full_stderr = ''.join(f'in stderr {i}\n' for i in range(stderr_lines))
+
+    result = testdir.runpytest('-v', *additional_args)
+
+    test_out_path = Path(testdir.tmpdir)
+    test_out_path /= "test-out"
+    test_print_path = test_out_path / "test_print_py"
+    test_print_path /= "test_print"
+    call_stdout_path = test_print_path / "call-stdout.txt"
+    call_stderr_path = test_print_path / "call-stderr.txt"
+
+    lines = ['*--- Captured stdout call ---*']
+    hr = '-' * 50
+    if enabled and stdout_lines >= 2 * _DEFAULT_LINE_LIMIT:
+        trimmed_msg = ("Output Trimmed, Full output in: "
+                       "test-out/test_print_py/test_print/call-stdout.txt")
+        lines.append(trimmed_msg)
+        lines.append(hr)
+        for i in range(_DEFAULT_LINE_LIMIT):
+            lines.append(f'in stdout {i}')
+        lines.append(hr)
+        lines.append(trimmed_msg)
+        lines.append(hr)
+        for i in range(stdout_lines - _DEFAULT_LINE_LIMIT, stdout_lines):
+            lines.append(f'in stdout {i}')
+        lines.append(hr)
+        lines.append(trimmed_msg)
+    else:
+        for i in range(stdout_lines):
+            lines.append(f'in stdout {i}')
+    lines.append('*--- Captured stderr call ---*')
+    if enabled and stderr_lines >= 2 * _DEFAULT_LINE_LIMIT:
+        trimmed_msg = ("Output Trimmed, Full output in: "
+                       "test-out/test_print_py/test_print/call-stderr.txt")
+        lines.append(trimmed_msg)
+        lines.append(hr)
+        for i in range(_DEFAULT_LINE_LIMIT):
+            lines.append(f'in stderr {i}')
+        lines.append(hr)
+        lines.append(trimmed_msg)
+        lines.append(hr)
+        for i in range(stderr_lines - _DEFAULT_LINE_LIMIT, stderr_lines):
+            lines.append(f'in stderr {i}')
+        lines.append(hr)
+        lines.append(trimmed_msg)
+    else:
+        for i in range(stderr_lines):
+            lines.append(f'in stderr {i}')
+    lines.append("*====*")
+
+    result.stdout.fnmatch_lines(lines, consecutive=True)
+
+    result.stdout.fnmatch_lines([
+        'FAILED test_print.py::test_print *',
+    ])
+
+    if enabled:
+        for empty_file in ("setup-stdout.txt", "setup-stderr.txt",
+                           "teardown-stdout.txt", "teardown-stderr.txt"):
+            assert (test_print_path / empty_file).read_text("utf-8") == ""
+        assert call_stdout_path.read_text("utf-8") == full_stdout
+        assert call_stderr_path.read_text("utf-8") == full_stderr
+        call_stdout_path.unlink()  # remove big files
+        call_stderr_path.unlink()  # remove big files
+    else:
+        assert not test_out_path.exists()
+    assert result.ret != 0
+
+    return result
+
+
+def test_ini_setting(testdir):
+    # type: (pytest.Testdir) -> None
+    testdir.makeini("""
+        [pytest]
+        shorten-output-dir = test-out
+    """)
+
+    do_stdout_stderr_check(testdir, [], 1, 1, True)
+
+
+def test_nothing(testdir):
+    # type: (pytest.Testdir) -> None
+    do_stdout_stderr_check(testdir, [], 1, 1, False)
+
+
+def test_arg(testdir):
+    # type: (pytest.Testdir) -> None
+    do_stdout_stderr_check(
+        testdir, ["--shorten-output-dir=test-out"], 1, 1, True)
+
+
+def test_arg_override_ini(testdir):
+    # type: (pytest.Testdir) -> None
+    testdir.makeini("""
+        [pytest]
+        shorten-output-dir = test-out
+    """)
+
+    do_stdout_stderr_check(
+        testdir, ["--shorten-output-dir="], 1, 1, False)
+
+
+def test_disable_capture(testdir):
+    # type: (pytest.Testdir) -> None
+    testdir.makeini("""
+        [pytest]
+        shorten-output-dir = test-out
+    """)
+
+    testdir.makepyfile(test_print=f"""
+        import sys
+
+        def test_print():
+            print(f'in stdout')
+            print(f'in stderr', file=sys.stderr)
+            assert False
+    """)
+
+    result = testdir.runpytest('-v', '-s')
+
+    test_out_path = Path(testdir.tmpdir)
+    test_out_path /= "test-out"
+
+    assert not test_out_path.exists()
+
+    result.stdout.fnmatch_lines(['test_print.py::test_print*in stdout'])
+    result.stderr.fnmatch_lines(['in stderr'])
+
+    assert result.ret != 0
+
+
+def test_20k_disabled(testdir):
+    # type: (pytest.Testdir) -> None
+    do_stdout_stderr_check(testdir, [], 20000, 20000, False)
+
+
+def test_20k(testdir):
+    # type: (pytest.Testdir) -> None
+    do_stdout_stderr_check(
+        testdir, ["--shorten-output-dir=test-out"], 20000, 20000, True)
+
+
+def test_21k(testdir):
+    # type: (pytest.Testdir) -> None
+    do_stdout_stderr_check(
+        testdir, ["--shorten-output-dir=test-out"], 21000, 21000, True)
+
+
+def test_22k(testdir):
+    # type: (pytest.Testdir) -> None
+    do_stdout_stderr_check(
+        testdir, ["--shorten-output-dir=test-out"], 22000, 22000, True)
+
+
+def test_1x(testdir):
+    # type: (pytest.Testdir) -> None
+    lines = _DEFAULT_LINE_LIMIT
+    do_stdout_stderr_check(
+        testdir, ["--shorten-output-dir=test-out"], lines, lines, True)
+
+
+def test_50_percent_more(testdir):
+    # type: (pytest.Testdir) -> None
+    lines = _DEFAULT_LINE_LIMIT + _DEFAULT_LINE_LIMIT // 2
+    do_stdout_stderr_check(
+        testdir, ["--shorten-output-dir=test-out"], lines, lines, True)
+
+
+def test_2x_minus_two(testdir):
+    # type: (pytest.Testdir) -> None
+    lines = _DEFAULT_LINE_LIMIT * 2 - 2
+    do_stdout_stderr_check(
+        testdir, ["--shorten-output-dir=test-out"], lines, lines, True)
+
+
+def test_2x_minus_one(testdir):
+    # type: (pytest.Testdir) -> None
+    lines = _DEFAULT_LINE_LIMIT * 2 - 1
+    do_stdout_stderr_check(
+        testdir, ["--shorten-output-dir=test-out"], lines, lines, True)
+
+
+def test_2x(testdir):
+    # type: (pytest.Testdir) -> None
+    lines = _DEFAULT_LINE_LIMIT * 2
+    do_stdout_stderr_check(
+        testdir, ["--shorten-output-dir=test-out"], lines, lines, True)
+
+
+def test_2x_plus_one(testdir):
+    # type: (pytest.Testdir) -> None
+    lines = _DEFAULT_LINE_LIMIT * 2 + 1
+    do_stdout_stderr_check(
+        testdir, ["--shorten-output-dir=test-out"], lines, lines, True)
+
+
+def test_2x_plus_two(testdir):
+    # type: (pytest.Testdir) -> None
+    lines = _DEFAULT_LINE_LIMIT * 2 + 2
+    do_stdout_stderr_check(
+        testdir, ["--shorten-output-dir=test-out"], lines, lines, True)
+
+
+def test_1M(testdir):
+    # type: (pytest.Testdir) -> None
+    lines = 1_000_000
+    do_stdout_stderr_check(
+        testdir, ["--shorten-output-dir=test-out"], lines, lines, True)
+
+
+if __name__ == "__main__":
+    sys.exit(pytest.main())