b24ddae935324dfe07f2f7bda5e4431930d2c246
3 from typing
import TextIO
, Generator
, Any
7 from pathlib
import Path
10 if os
.name
!= 'posix':
12 f
"{sys.platform} is not supported by pytest-output-to-files")
16 def __init__(self
, target
, line_limit
=5000, chunk_size
=1 << 16):
17 # type: (TextIO, int, int) -> None
18 self
.__target
= target
19 self
.__old
_target
_fd
= os
.dup(target
.fileno())
20 self
.__file
_path
= None # type: None | Path
21 self
.__file
= None # type: None | io.FileIO
22 self
.__line
_limit
= line_limit
23 self
.__buf
= memoryview(bytearray(chunk_size
))
27 assert self
.__file
is not None, \
28 "resume called without calling start and pause"
29 assert not self
.active
, "resume called without calling pause"
31 os
.dup2(self
.__file
.fileno(), self
.__target
.fileno())
42 return self
.__file
is not None
45 assert self
.started
, "pause called without calling start"
46 assert self
.active
, "pause called without calling resume"
48 os
.dup2(self
.__old
_target
_fd
, self
.__target
.fileno())
51 def start(self
, file_path
):
52 # type: (Path) -> None
53 assert not self
.started
, "start called without calling stop"
54 self
.__file
_path
= file_path
55 self
.__file
= file_path
.open("wb+", buffering
=0)
58 def __read_chunk_at(self
, pos
, required_len
):
59 # type: (int, int) -> memoryview
60 assert self
.__file
is not None, "can't be called without file"
63 while filled
< len(self
.__buf
):
64 amount
= self
.__file
.readinto(self
.__buf
[filled
:])
66 raise BlockingIOError(errno
.EAGAIN
)
70 if filled
< required_len
:
71 raise ValueError(f
"failed to read full {required_len:#x} byte "
72 f
"chunk starting at offset {pos:#x}")
73 return self
.__buf
[:filled
]
75 def __read_lines_at(self
, line_limit
, pos
, backwards
):
76 # type: (int, int, bool) -> tuple[bytes, bool]
77 chunks
= [] # type: list[bytes]
80 while lines
< line_limit
:
86 required_len
= min(pos
, len(self
.__buf
))
88 chunk
= bytes(self
.__read
_chunk
_at
(pos
, required_len
))
95 lines
+= chunk
.count(b
"\n")
96 extra_lines
= lines
- line_limit
98 retval
= b
"".join(reversed(chunks
))
100 retval
= self
.__remove
_lines
_at
_start
(retval
, extra_lines
)
101 return retval
, hit_eof
102 retval
= b
"".join(chunks
)
104 retval
= self
.__remove
_lines
_at
_end
(retval
, extra_lines
)
105 return retval
, hit_eof
107 def __remove_lines_at_end(self
, b
, count
):
108 # type: (bytes, int) -> bytes
110 for _
in range(count
):
111 trim_end
= b
.rindex(b
"\n", None, trim_end
)
114 def __lines_from_start(self
, b
, count
):
115 # type: (bytes, int) -> int
117 for _
in range(count
):
118 trim_start
= b
.index(b
"\n", trim_start
) + 1
121 def __remove_lines_at_start(self
, b
, count
):
122 # type: (bytes, int) -> bytes
123 return b
[self
.__lines
_from
_start
(b
, count
):]
125 def __read_output_str(self
):
127 assert self
.__file
is not None, "can't be called without file"
128 start_lines
, start_hit_eof
= self
.__read
_lines
_at
(
129 line_limit
=self
.__line
_limit
* 2, pos
=0, backwards
=False)
131 return start_lines
.decode("utf-8", errors
="replace")
132 p
= self
.__lines
_from
_start
(start_lines
, self
.__line
_limit
)
133 start_lines
= start_lines
[:p
]
134 file_length
= self
.__file
.seek(0, os
.SEEK_END
)
135 end_lines
, _
= self
.__read
_lines
_at
(
136 line_limit
=self
.__line
_limit
, pos
=file_length
, backwards
=True)
138 trimmed_msg
= f
"Output Trimmed, Full output in: {self.__file_path}"
142 start_lines
.decode("utf-8", errors
="replace"),
146 end_lines
.decode("utf-8", errors
="replace"),
150 return "\n".join(retval
)
153 if self
.__file
is None:
158 self
.__file
_path
, self
.__file
= None, None
161 assert self
.__file
is not None, "stop called without calling start"
165 print(self
.__read
_output
_str
(), file=self
.__target
)
171 class __OutputToFilesPlugin
:
172 def __init__(self
, output_dir
):
173 # type: (str) -> None
174 self
.output_dir
= Path(output_dir
)
176 "stdout.txt": __Capture(sys
.stdout
),
177 "stderr.txt": __Capture(sys
.stderr
),
182 return f
"<OutputToFilesPlugin output_dir={str(self.output_dir)!r}>"
184 def __start(self
, item
):
185 # type: (pytest.Item) -> None
186 path
= self
.output_dir
187 for part
in item
.nodeid
.split('::'):
188 path
/= part
.replace(".", "_")
189 path
.mkdir(0o775, parents
=True, exist_ok
=True)
190 for name
, capture
in self
.__captures
.items():
191 capture
.start(path
/ name
)
193 @pytest.hookimpl(tryfirst
=True)
194 def pytest_keyboard_interrupt(self
, excinfo
):
195 for capture
in self
.__captures
.values():
198 @pytest.hookimpl(tryfirst
=True)
199 def pytest_internalerror(self
, excinfo
):
200 for capture
in self
.__captures
.values():
203 @pytest.hookimpl(hookwrapper
=True, trylast
=True)
204 def pytest_runtest_setup(self
, item
):
205 # type: (pytest.Item) -> Generator[Any, Any, Any]
208 for capture
in self
.__captures
.values():
211 @pytest.hookimpl(hookwrapper
=True, trylast
=True)
212 def pytest_runtest_call(self
, item
):
213 # type: (pytest.Item) -> Generator[Any, Any, Any]
214 for capture
in self
.__captures
.values():
217 for capture
in self
.__captures
.values():
220 @pytest.hookimpl(hookwrapper
=True, trylast
=True)
221 def pytest_runtest_teardown(self
, item
):
222 # type: (pytest.Item) -> Generator[Any, Any, Any]
223 for capture
in self
.__captures
.values():
226 for capture
in self
.__captures
.values():
230 def pytest_addoption(parser
):
231 # type: (pytest.Parser) -> None
232 group
= parser
.getgroup("output_to_files", "shortening output")
234 '--shorten-output-dir',
238 help=('shorten test outputs by storing them in files in DIR and '
239 'returning just the first/last few lines'))
242 'shorten-output-dir',
244 help=('shorten test outputs by storing them in files in DIR and '
245 'returning just the first/last few lines'))
248 def pytest_configure(config
):
249 # type: (pytest.Config) -> None
250 output_dir
= config
.getoption('--shorten-output-dir')
252 output_dir
= config
.getini('shorten-output-dir')
254 assert isinstance(output_dir
, str), "invalid shorten-output-dir"
255 config
.pluginmanager
.register(__OutputToFilesPlugin(output_dir
))