3 from typing
import TextIO
, Generator
, Any
8 from pathlib
import Path
11 if os
.name
!= 'posix':
13 f
"{sys.platform} is not supported by pytest-output-to-files")
15 _DEFAULT_LINE_LIMIT
= 1000
19 def __init__(self
, target_attr
, line_limit
,
21 # type: (str, int, int) -> None
22 self
.__target
_attr
= target_attr
23 self
.__old
_target
= self
.__target
25 self
.__target
_fd
= self
.__target
.fileno()
26 except io
.UnsupportedOperation
:
27 self
.__target
_fd
= None
28 if self
.__target
_fd
is not None:
29 self
.__old
_target
_fd
= os
.dup(self
.__target
_fd
)
30 self
.__file
_dup
= None # type: None | TextIO
31 self
.__file
_path
= None # type: None | Path
32 self
.__file
= None # type: None | io.FileIO
33 self
.__line
_limit
= line_limit
34 self
.__buf
= memoryview(bytearray(chunk_size
))
40 return getattr(sys
, self
.__target
_attr
)
43 def __target(self
, v
):
44 # type: (TextIO) -> None
45 setattr(sys
, self
.__target
_attr
, v
)
48 assert self
.__file
is not None, \
49 "resume called without calling start and pause"
50 assert not self
.active
, "resume called without calling pause"
52 if self
.__target
_fd
is None:
53 assert self
.__file
_dup
is not None, "inconsistent state"
54 self
.__target
= self
.__file
_dup
56 os
.dup2(self
.__file
.fileno(), self
.__target
_fd
)
67 return self
.__file
is not None
70 assert self
.started
, "pause called without calling start"
71 assert self
.active
, "pause called without calling resume"
73 if self
.__target
_fd
is None:
74 self
.__target
= self
.__old
_target
76 os
.dup2(self
.__old
_target
_fd
, self
.__target
.fileno())
79 def start(self
, file_path
):
80 # type: (Path) -> None
81 assert not self
.started
, "start called without calling stop"
82 self
.__file
_path
= file_path
83 self
.__file
= file_path
.open("wb+", buffering
=0)
84 if self
.__target
_fd
is None:
85 self
.__file
_dup
= os
.fdopen(
86 os
.dup(self
.__file
.fileno()), "w", encoding
="utf-8")
89 def __read_chunk_at(self
, pos
, required_len
):
90 # type: (int, int) -> memoryview
91 assert self
.__file
is not None, "can't be called without file"
94 while filled
< len(self
.__buf
):
95 amount
= self
.__file
.readinto(self
.__buf
[filled
:])
97 raise BlockingIOError(errno
.EAGAIN
)
101 if filled
< required_len
:
102 raise ValueError(f
"failed to read full {required_len:#x} byte "
103 f
"chunk starting at offset {pos:#x}")
104 return self
.__buf
[:filled
]
106 def __read_lines_at(self
, line_limit
, pos
, backwards
):
107 # type: (int, int, bool) -> tuple[bytes, bool]
108 chunks
= [] # type: list[bytes]
111 while lines
< line_limit
:
117 required_len
= min(pos
, len(self
.__buf
))
119 chunk
= bytes(self
.__read
_chunk
_at
(pos
, required_len
))
126 lines
+= chunk
.count(b
"\n")
127 extra_lines
= lines
- line_limit
129 retval
= b
"".join(reversed(chunks
))
131 retval
= self
.__remove
_lines
_at
_start
(retval
, extra_lines
)
132 return retval
, hit_eof
133 retval
= b
"".join(chunks
)
135 retval
= self
.__remove
_lines
_at
_end
(retval
, extra_lines
)
136 return retval
, hit_eof
138 def __remove_lines_at_end(self
, b
, count
):
139 # type: (bytes, int) -> bytes
141 for _
in range(count
):
142 trim_end
= b
.rindex(b
"\n", None, trim_end
)
145 def __lines_from_start(self
, b
, count
):
146 # type: (bytes, int) -> int
148 for _
in range(count
):
149 trim_start
= b
.index(b
"\n", trim_start
) + 1
152 def __remove_lines_at_start(self
, b
, count
):
153 # type: (bytes, int) -> bytes
154 return b
[self
.__lines
_from
_start
(b
, count
):]
156 def __read_output_str(self
):
158 assert self
.__file
is not None, "can't be called without file"
159 start_lines
, start_hit_eof
= self
.__read
_lines
_at
(
160 line_limit
=self
.__line
_limit
, pos
=0, backwards
=False)
162 return start_lines
.decode("utf-8", errors
="replace")
163 p
= self
.__lines
_from
_start
(start_lines
, (self
.__line
_limit
+ 1) // 2)
164 start_lines
= start_lines
[:p
]
165 file_length
= self
.__file
.seek(0, os
.SEEK_END
)
166 end_lines
, _
= self
.__read
_lines
_at
(
167 line_limit
=self
.__line
_limit
// 2, pos
=file_length
, backwards
=True)
168 if start_lines
.endswith(b
"\n"):
169 start_lines
= start_lines
[:-1]
170 if end_lines
.endswith(b
"\n"):
171 end_lines
= end_lines
[:-1]
173 trimmed_msg
= f
"Output Trimmed, Full output in: {self.__file_path}"
177 start_lines
.decode("utf-8", errors
="replace"),
181 end_lines
.decode("utf-8", errors
="replace"),
185 return "\n".join(retval
)
188 if self
.__file
is None:
192 if self
.__file
_dup
is not None:
193 self
.__file
_dup
.close()
195 self
.__file
_path
= None
197 self
.__file
_dup
= None
201 assert self
.__file
is not None, "stop called without calling start"
205 retval
= self
.__read
_output
_str
()
211 class _OutputToFilesPlugin
:
212 def __init__(self
, output_dir
, line_limit
):
213 # type: (str) -> None
214 self
.output_dir
= Path(output_dir
)
216 "stdout": _Capture("stdout", line_limit
=line_limit
),
217 "stderr": _Capture("stderr", line_limit
=line_limit
),
222 return f
"<OutputToFilesPlugin output_dir={str(self.output_dir)!r}>"
224 def __start(self
, item
, when
):
225 # type: (pytest.Item, str) -> None
226 path
= self
.output_dir
227 for part
in item
.nodeid
.split('::'):
228 path
/= part
.replace(".", "_")
229 path
.mkdir(0o775, parents
=True, exist_ok
=True)
230 for name
, capture
in self
.__captures
.items():
231 capture
.start(path
/ f
"{when}-{name}.txt")
233 def __stop(self
, item
, when
):
234 # type: (pytest.Item, str) -> None
235 for name
, capture
in self
.__captures
.items():
236 item
.add_report_section(when
, name
, capture
.stop())
239 for capture
in self
.__captures
.values():
242 @contextlib.contextmanager
243 def __capture_item(self
, item
, when
):
244 # type: (pytest.Item, str) -> Generator[Any, Any, Any]
245 builtin_capman
= item
.config
.pluginmanager
.getplugin("capturemanager")
246 if builtin_capman
is not None:
247 builtin_capman
.suspend_global_capture()
249 self
.__start
(item
, when
)
251 self
.__stop
(item
, when
)
254 if builtin_capman
is not None:
255 builtin_capman
.resume_global_capture()
257 @pytest.hookimpl(tryfirst
=True)
258 def pytest_keyboard_interrupt(self
, excinfo
):
261 @pytest.hookimpl(tryfirst
=True)
262 def pytest_internalerror(self
, excinfo
):
265 @pytest.hookimpl(hookwrapper
=True, trylast
=True)
266 def pytest_runtest_setup(self
, item
):
267 # type: (pytest.Item) -> Generator[Any, Any, Any]
268 with self
.__capture
_item
(item
, "setup"):
271 @pytest.hookimpl(hookwrapper
=True, trylast
=True)
272 def pytest_runtest_call(self
, item
):
273 # type: (pytest.Item) -> Generator[Any, Any, Any]
274 with self
.__capture
_item
(item
, "call"):
277 @pytest.hookimpl(hookwrapper
=True, trylast
=True)
278 def pytest_runtest_teardown(self
, item
):
279 # type: (pytest.Item) -> Generator[Any, Any, Any]
280 with self
.__capture
_item
(item
, "teardown"):
284 def pytest_addoption(parser
):
285 # type: (pytest.Parser) -> None
286 group
= parser
.getgroup("output_to_files", "shortening output")
288 '--shorten-output-dir',
291 help=('shorten test outputs by storing them in files in DIR and '
292 'returning just the first/last few lines. disable by '
293 'using --shorten-output-dir=""'))
296 'shorten-output-dir',
298 help=('shorten test outputs by storing them in files in DIR and '
299 'returning just the first/last few lines'))
302 '--shorten-output-lines',
305 help=('change the number of lines shown by the\n'
306 '--shorten-output-dir option'))
309 'shorten-output-lines',
310 default
=str(_DEFAULT_LINE_LIMIT
),
311 help=('change the number of lines shown by the\n'
312 '--shorten-output-dir option'))
315 def pytest_configure(config
):
316 # type: (pytest.Config) -> None
317 if config
.option
.capture
== "no":
319 output_dir
= config
.getoption('--shorten-output-dir')
320 if output_dir
is None:
321 output_dir
= config
.getini('shorten-output-dir')
322 line_limit
= config
.getoption('--shorten-output-lines')
323 if line_limit
is None:
324 line_limit
= config
.getini('shorten-output-lines')
325 assert isinstance(line_limit
, str), "invalid shorten-output-lines"
327 line_limit
= int(line_limit
)
328 except ValueError as e
:
329 raise ValueError("invalid shorten-output-lines") from e
331 assert isinstance(output_dir
, str), "invalid shorten-output-dir"
332 config
.pluginmanager
.register(_OutputToFilesPlugin(
333 output_dir
, line_limit
=line_limit
))