b24ddae935324dfe07f2f7bda5e4431930d2c246
[pytest-output-to-files.git] / pytest_output_to_files.py
1 import pytest
2 import os
3 from typing import TextIO, Generator, Any
4 import io
5 import sys
6 import errno
7 from pathlib import Path
8
9
10 if os.name != 'posix':
11 raise ValueError(
12 f"{sys.platform} is not supported by pytest-output-to-files")
13
14
15 class __Capture:
16 def __init__(self, target, line_limit=5000, chunk_size=1 << 16):
17 # type: (TextIO, int, int) -> None
18 self.__target = target
19 self.__old_target_fd = os.dup(target.fileno())
20 self.__file_path = None # type: None | Path
21 self.__file = None # type: None | io.FileIO
22 self.__line_limit = line_limit
23 self.__buf = memoryview(bytearray(chunk_size))
24 self.__active = False
25
26 def resume(self):
27 assert self.__file is not None, \
28 "resume called without calling start and pause"
29 assert not self.active, "resume called without calling pause"
30 self.__target.flush()
31 os.dup2(self.__file.fileno(), self.__target.fileno())
32 self.__active = True
33
34 @property
35 def active(self):
36 # type: () -> bool
37 return self.__active
38
39 @property
40 def started(self):
41 # type: () -> bool
42 return self.__file is not None
43
44 def pause(self):
45 assert self.started, "pause called without calling start"
46 assert self.active, "pause called without calling resume"
47 self.__target.flush()
48 os.dup2(self.__old_target_fd, self.__target.fileno())
49 self.__active = False
50
51 def start(self, file_path):
52 # type: (Path) -> None
53 assert not self.started, "start called without calling stop"
54 self.__file_path = file_path
55 self.__file = file_path.open("wb+", buffering=0)
56 self.resume()
57
58 def __read_chunk_at(self, pos, required_len):
59 # type: (int, int) -> memoryview
60 assert self.__file is not None, "can't be called without file"
61 self.__file.seek(pos)
62 filled = 0
63 while filled < len(self.__buf):
64 amount = self.__file.readinto(self.__buf[filled:])
65 if amount is None:
66 raise BlockingIOError(errno.EAGAIN)
67 if amount == 0:
68 break
69 filled += amount
70 if filled < required_len:
71 raise ValueError(f"failed to read full {required_len:#x} byte "
72 f"chunk starting at offset {pos:#x}")
73 return self.__buf[:filled]
74
75 def __read_lines_at(self, line_limit, pos, backwards):
76 # type: (int, int, bool) -> tuple[bytes, bool]
77 chunks = [] # type: list[bytes]
78 lines = 0
79 hit_eof = False
80 while lines < line_limit:
81 required_len = 0
82 if backwards:
83 if pos <= 0:
84 hit_eof = True
85 break
86 required_len = min(pos, len(self.__buf))
87 pos -= required_len
88 chunk = bytes(self.__read_chunk_at(pos, required_len))
89 if chunk == b"":
90 hit_eof = True
91 break
92 chunks.append(chunk)
93 if not backwards:
94 pos += len(chunk)
95 lines += chunk.count(b"\n")
96 extra_lines = lines - line_limit
97 if backwards:
98 retval = b"".join(reversed(chunks))
99 if extra_lines > 0:
100 retval = self.__remove_lines_at_start(retval, extra_lines)
101 return retval, hit_eof
102 retval = b"".join(chunks)
103 if extra_lines > 0:
104 retval = self.__remove_lines_at_end(retval, extra_lines)
105 return retval, hit_eof
106
107 def __remove_lines_at_end(self, b, count):
108 # type: (bytes, int) -> bytes
109 trim_end = len(b)
110 for _ in range(count):
111 trim_end = b.rindex(b"\n", None, trim_end)
112 return b[:trim_end]
113
114 def __lines_from_start(self, b, count):
115 # type: (bytes, int) -> int
116 trim_start = 0
117 for _ in range(count):
118 trim_start = b.index(b"\n", trim_start) + 1
119 return trim_start
120
121 def __remove_lines_at_start(self, b, count):
122 # type: (bytes, int) -> bytes
123 return b[self.__lines_from_start(b, count):]
124
125 def __read_output_str(self):
126 # type: () -> str
127 assert self.__file is not None, "can't be called without file"
128 start_lines, start_hit_eof = self.__read_lines_at(
129 line_limit=self.__line_limit * 2, pos=0, backwards=False)
130 if start_hit_eof:
131 return start_lines.decode("utf-8", errors="replace")
132 p = self.__lines_from_start(start_lines, self.__line_limit)
133 start_lines = start_lines[:p]
134 file_length = self.__file.seek(0, os.SEEK_END)
135 end_lines, _ = self.__read_lines_at(
136 line_limit=self.__line_limit, pos=file_length, backwards=True)
137 hr = '-' * 50
138 trimmed_msg = f"Output Trimmed, Full output in: {self.__file_path}"
139 retval = [
140 trimmed_msg,
141 hr,
142 start_lines.decode("utf-8", errors="replace"),
143 hr,
144 trimmed_msg,
145 hr,
146 end_lines.decode("utf-8", errors="replace"),
147 hr,
148 trimmed_msg,
149 ]
150 return "\n".join(retval)
151
152 def abort(self):
153 if self.__file is None:
154 return
155 if self.active:
156 self.pause()
157 self.__file.close()
158 self.__file_path, self.__file = None, None
159
160 def stop(self):
161 assert self.__file is not None, "stop called without calling start"
162 if self.active:
163 self.pause()
164 try:
165 print(self.__read_output_str(), file=self.__target)
166 finally:
167 self.abort()
168 return
169
170
171 class __OutputToFilesPlugin:
172 def __init__(self, output_dir):
173 # type: (str) -> None
174 self.output_dir = Path(output_dir)
175 self.__captures = {
176 "stdout.txt": __Capture(sys.stdout),
177 "stderr.txt": __Capture(sys.stderr),
178 }
179
180 def __repr__(self):
181 # type: () -> str
182 return f"<OutputToFilesPlugin output_dir={str(self.output_dir)!r}>"
183
184 def __start(self, item):
185 # type: (pytest.Item) -> None
186 path = self.output_dir
187 for part in item.nodeid.split('::'):
188 path /= part.replace(".", "_")
189 path.mkdir(0o775, parents=True, exist_ok=True)
190 for name, capture in self.__captures.items():
191 capture.start(path / name)
192
193 @pytest.hookimpl(tryfirst=True)
194 def pytest_keyboard_interrupt(self, excinfo):
195 for capture in self.__captures.values():
196 capture.abort()
197
198 @pytest.hookimpl(tryfirst=True)
199 def pytest_internalerror(self, excinfo):
200 for capture in self.__captures.values():
201 capture.abort()
202
203 @pytest.hookimpl(hookwrapper=True, trylast=True)
204 def pytest_runtest_setup(self, item):
205 # type: (pytest.Item) -> Generator[Any, Any, Any]
206 self.__start(item)
207 yield
208 for capture in self.__captures.values():
209 capture.pause()
210
211 @pytest.hookimpl(hookwrapper=True, trylast=True)
212 def pytest_runtest_call(self, item):
213 # type: (pytest.Item) -> Generator[Any, Any, Any]
214 for capture in self.__captures.values():
215 capture.resume()
216 yield
217 for capture in self.__captures.values():
218 capture.pause()
219
220 @pytest.hookimpl(hookwrapper=True, trylast=True)
221 def pytest_runtest_teardown(self, item):
222 # type: (pytest.Item) -> Generator[Any, Any, Any]
223 for capture in self.__captures.values():
224 capture.resume()
225 yield
226 for capture in self.__captures.values():
227 capture.stop()
228
229
230 def pytest_addoption(parser):
231 # type: (pytest.Parser) -> None
232 group = parser.getgroup("output_to_files", "shortening output")
233 group.addoption(
234 '--shorten-output-dir',
235 action='store',
236 metavar="DIR",
237 default="",
238 help=('shorten test outputs by storing them in files in DIR and '
239 'returning just the first/last few lines'))
240
241 parser.addini(
242 'shorten-output-dir',
243 default="",
244 help=('shorten test outputs by storing them in files in DIR and '
245 'returning just the first/last few lines'))
246
247
248 def pytest_configure(config):
249 # type: (pytest.Config) -> None
250 output_dir = config.getoption('--shorten-output-dir')
251 if output_dir == "":
252 output_dir = config.getini('shorten-output-dir')
253 if output_dir != "":
254 assert isinstance(output_dir, str), "invalid shorten-output-dir"
255 config.pluginmanager.register(__OutputToFilesPlugin(output_dir))