add --shorten-output-lines option so users can change how much is shown
[pytest-output-to-files.git] / pytest_output_to_files.py
1 import pytest
2 import os
3 from typing import TextIO, Generator, Any
4 import io
5 import sys
6 import errno
7 import contextlib
8 from pathlib import Path
9
10
11 if os.name != 'posix':
12 raise ValueError(
13 f"{sys.platform} is not supported by pytest-output-to-files")
14
15 _DEFAULT_LINE_LIMIT = 1000
16
17
18 class _Capture:
19 def __init__(self, target_attr, line_limit,
20 chunk_size=1 << 16):
21 # type: (str, int, int) -> None
22 self.__target_attr = target_attr
23 self.__old_target = self.__target
24 try:
25 self.__target_fd = self.__target.fileno()
26 except io.UnsupportedOperation:
27 self.__target_fd = None
28 if self.__target_fd is not None:
29 self.__old_target_fd = os.dup(self.__target_fd)
30 self.__file_dup = None # type: None | TextIO
31 self.__file_path = None # type: None | Path
32 self.__file = None # type: None | io.FileIO
33 self.__line_limit = line_limit
34 self.__buf = memoryview(bytearray(chunk_size))
35 self.__active = False
36
37 @property
38 def __target(self):
39 # type: () -> TextIO
40 return getattr(sys, self.__target_attr)
41
42 @__target.setter
43 def __target(self, v):
44 # type: (TextIO) -> None
45 setattr(sys, self.__target_attr, v)
46
47 def resume(self):
48 assert self.__file is not None, \
49 "resume called without calling start and pause"
50 assert not self.active, "resume called without calling pause"
51 self.__target.flush()
52 if self.__target_fd is None:
53 assert self.__file_dup is not None, "inconsistent state"
54 self.__target = self.__file_dup
55 else:
56 os.dup2(self.__file.fileno(), self.__target_fd)
57 self.__active = True
58
59 @property
60 def active(self):
61 # type: () -> bool
62 return self.__active
63
64 @property
65 def started(self):
66 # type: () -> bool
67 return self.__file is not None
68
69 def pause(self):
70 assert self.started, "pause called without calling start"
71 assert self.active, "pause called without calling resume"
72 self.__target.flush()
73 if self.__target_fd is None:
74 self.__target = self.__old_target
75 else:
76 os.dup2(self.__old_target_fd, self.__target.fileno())
77 self.__active = False
78
79 def start(self, file_path):
80 # type: (Path) -> None
81 assert not self.started, "start called without calling stop"
82 self.__file_path = file_path
83 self.__file = file_path.open("wb+", buffering=0)
84 if self.__target_fd is None:
85 self.__file_dup = os.fdopen(
86 os.dup(self.__file.fileno()), "w", encoding="utf-8")
87 self.resume()
88
89 def __read_chunk_at(self, pos, required_len):
90 # type: (int, int) -> memoryview
91 assert self.__file is not None, "can't be called without file"
92 self.__file.seek(pos)
93 filled = 0
94 while filled < len(self.__buf):
95 amount = self.__file.readinto(self.__buf[filled:])
96 if amount is None:
97 raise BlockingIOError(errno.EAGAIN)
98 if amount == 0:
99 break
100 filled += amount
101 if filled < required_len:
102 raise ValueError(f"failed to read full {required_len:#x} byte "
103 f"chunk starting at offset {pos:#x}")
104 return self.__buf[:filled]
105
106 def __read_lines_at(self, line_limit, pos, backwards):
107 # type: (int, int, bool) -> tuple[bytes, bool]
108 chunks = [] # type: list[bytes]
109 lines = 0
110 hit_eof = False
111 while lines < line_limit:
112 required_len = 0
113 if backwards:
114 if pos <= 0:
115 hit_eof = True
116 break
117 required_len = min(pos, len(self.__buf))
118 pos -= required_len
119 chunk = bytes(self.__read_chunk_at(pos, required_len))
120 if chunk == b"":
121 hit_eof = True
122 break
123 chunks.append(chunk)
124 if not backwards:
125 pos += len(chunk)
126 lines += chunk.count(b"\n")
127 extra_lines = lines - line_limit
128 if backwards:
129 retval = b"".join(reversed(chunks))
130 if extra_lines > 0:
131 retval = self.__remove_lines_at_start(retval, extra_lines)
132 return retval, hit_eof
133 retval = b"".join(chunks)
134 if extra_lines > 0:
135 retval = self.__remove_lines_at_end(retval, extra_lines)
136 return retval, hit_eof
137
138 def __remove_lines_at_end(self, b, count):
139 # type: (bytes, int) -> bytes
140 trim_end = len(b)
141 for _ in range(count):
142 trim_end = b.rindex(b"\n", None, trim_end)
143 return b[:trim_end]
144
145 def __lines_from_start(self, b, count):
146 # type: (bytes, int) -> int
147 trim_start = 0
148 for _ in range(count):
149 trim_start = b.index(b"\n", trim_start) + 1
150 return trim_start
151
152 def __remove_lines_at_start(self, b, count):
153 # type: (bytes, int) -> bytes
154 return b[self.__lines_from_start(b, count):]
155
156 def __read_output_str(self):
157 # type: () -> str
158 assert self.__file is not None, "can't be called without file"
159 start_lines, start_hit_eof = self.__read_lines_at(
160 line_limit=self.__line_limit, pos=0, backwards=False)
161 if start_hit_eof:
162 return start_lines.decode("utf-8", errors="replace")
163 p = self.__lines_from_start(start_lines, (self.__line_limit + 1) // 2)
164 start_lines = start_lines[:p]
165 file_length = self.__file.seek(0, os.SEEK_END)
166 end_lines, _ = self.__read_lines_at(
167 line_limit=self.__line_limit // 2, pos=file_length, backwards=True)
168 if start_lines.endswith(b"\n"):
169 start_lines = start_lines[:-1]
170 if end_lines.endswith(b"\n"):
171 end_lines = end_lines[:-1]
172 hr = '-' * 50
173 trimmed_msg = f"Output Trimmed, Full output in: {self.__file_path}"
174 retval = [
175 trimmed_msg,
176 hr,
177 start_lines.decode("utf-8", errors="replace"),
178 hr,
179 trimmed_msg,
180 hr,
181 end_lines.decode("utf-8", errors="replace"),
182 hr,
183 trimmed_msg,
184 ]
185 return "\n".join(retval)
186
187 def abort(self):
188 if self.__file is None:
189 return
190 if self.active:
191 self.pause()
192 if self.__file_dup is not None:
193 self.__file_dup.close()
194 self.__file.close()
195 self.__file_path = None
196 self.__file = None
197 self.__file_dup = None
198
199 def stop(self):
200 # type: () -> str
201 assert self.__file is not None, "stop called without calling start"
202 if self.active:
203 self.pause()
204 try:
205 retval = self.__read_output_str()
206 finally:
207 self.abort()
208 return retval
209
210
211 class _OutputToFilesPlugin:
212 def __init__(self, output_dir, line_limit):
213 # type: (str) -> None
214 self.output_dir = Path(output_dir)
215 self.__captures = {
216 "stdout": _Capture("stdout", line_limit=line_limit),
217 "stderr": _Capture("stderr", line_limit=line_limit),
218 }
219
220 def __repr__(self):
221 # type: () -> str
222 return f"<OutputToFilesPlugin output_dir={str(self.output_dir)!r}>"
223
224 def __start(self, item, when):
225 # type: (pytest.Item, str) -> None
226 path = self.output_dir
227 for part in item.nodeid.split('::'):
228 path /= part.replace(".", "_")
229 path.mkdir(0o775, parents=True, exist_ok=True)
230 for name, capture in self.__captures.items():
231 capture.start(path / f"{when}-{name}.txt")
232
233 def __stop(self, item, when):
234 # type: (pytest.Item, str) -> None
235 for name, capture in self.__captures.items():
236 item.add_report_section(when, name, capture.stop())
237
238 def __abort(self):
239 for capture in self.__captures.values():
240 capture.abort()
241
242 @contextlib.contextmanager
243 def __capture_item(self, item, when):
244 # type: (pytest.Item, str) -> Generator[Any, Any, Any]
245 builtin_capman = item.config.pluginmanager.getplugin("capturemanager")
246 if builtin_capman is not None:
247 builtin_capman.suspend_global_capture()
248 try:
249 self.__start(item, when)
250 yield
251 self.__stop(item, when)
252 finally:
253 self.__abort()
254 if builtin_capman is not None:
255 builtin_capman.resume_global_capture()
256
257 @pytest.hookimpl(tryfirst=True)
258 def pytest_keyboard_interrupt(self, excinfo):
259 self.__abort()
260
261 @pytest.hookimpl(tryfirst=True)
262 def pytest_internalerror(self, excinfo):
263 self.__abort()
264
265 @pytest.hookimpl(hookwrapper=True, trylast=True)
266 def pytest_runtest_setup(self, item):
267 # type: (pytest.Item) -> Generator[Any, Any, Any]
268 with self.__capture_item(item, "setup"):
269 yield
270
271 @pytest.hookimpl(hookwrapper=True, trylast=True)
272 def pytest_runtest_call(self, item):
273 # type: (pytest.Item) -> Generator[Any, Any, Any]
274 with self.__capture_item(item, "call"):
275 yield
276
277 @pytest.hookimpl(hookwrapper=True, trylast=True)
278 def pytest_runtest_teardown(self, item):
279 # type: (pytest.Item) -> Generator[Any, Any, Any]
280 with self.__capture_item(item, "teardown"):
281 yield
282
283
284 def pytest_addoption(parser):
285 # type: (pytest.Parser) -> None
286 group = parser.getgroup("output_to_files", "shortening output")
287 group.addoption(
288 '--shorten-output-dir',
289 action='store',
290 metavar="DIR",
291 help=('shorten test outputs by storing them in files in DIR and '
292 'returning just the first/last few lines. disable by '
293 'using --shorten-output-dir=""'))
294
295 parser.addini(
296 'shorten-output-dir',
297 default="",
298 help=('shorten test outputs by storing them in files in DIR and '
299 'returning just the first/last few lines'))
300
301 group.addoption(
302 '--shorten-output-lines',
303 action='store',
304 metavar="LINES",
305 help=('change the number of lines shown by the\n'
306 '--shorten-output-dir option'))
307
308 parser.addini(
309 'shorten-output-lines',
310 default=str(_DEFAULT_LINE_LIMIT),
311 help=('change the number of lines shown by the\n'
312 '--shorten-output-dir option'))
313
314
315 def pytest_configure(config):
316 # type: (pytest.Config) -> None
317 if config.option.capture == "no":
318 return
319 output_dir = config.getoption('--shorten-output-dir')
320 if output_dir is None:
321 output_dir = config.getini('shorten-output-dir')
322 line_limit = config.getoption('--shorten-output-lines')
323 if line_limit is None:
324 line_limit = config.getini('shorten-output-lines')
325 assert isinstance(line_limit, str), "invalid shorten-output-lines"
326 try:
327 line_limit = int(line_limit)
328 except ValueError as e:
329 raise ValueError("invalid shorten-output-lines") from e
330 if output_dir != "":
331 assert isinstance(output_dir, str), "invalid shorten-output-dir"
332 config.pluginmanager.register(_OutputToFilesPlugin(
333 output_dir, line_limit=line_limit))