11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
15 for directory
in (os
.getcwd(), os
.path
.dirname(__file__
)):
16 fullpath
= os
.path
.join(directory
, path
)
17 if os
.path
.exists(fullpath
):
21 def compile(args
, xlen
=32): # pylint: disable=redefined-builtin
22 cc
= os
.path
.expandvars("$RISCV/bin/riscv%d-unknown-elf-gcc" % xlen
)
25 found
= find_file(arg
)
30 process
= subprocess
.Popen(cmd
, stdout
=subprocess
.PIPE
,
31 stderr
=subprocess
.PIPE
)
32 stdout
, stderr
= process
.communicate()
33 if process
.returncode
:
35 header("Compile failed")
36 print "+", " ".join(cmd
)
40 raise Exception("Compile failed!")
43 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
45 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
47 port
= s
.getsockname()[1]
54 def __init__(self
, cmd
, binary
=None, halted
=False, with_gdb
=True,
55 timeout
=None, xlen
=64):
56 """Launch spike. Return tuple of its process and the port it's running
59 cmd
= shlex
.split(cmd
)
63 cmd
+= ["--isa", "RV32"]
66 cmd
= ["timeout", str(timeout
)] + cmd
71 self
.port
= unused_port()
72 cmd
+= ['--gdb-port', str(self
.port
)]
77 logfile
= open(self
.logname
, "w")
78 logfile
.write("+ %s\n" % " ".join(cmd
))
80 self
.process
= subprocess
.Popen(cmd
, stdin
=subprocess
.PIPE
,
81 stdout
=logfile
, stderr
=logfile
)
90 def wait(self
, *args
, **kwargs
):
91 return self
.process
.wait(*args
, **kwargs
)
94 def __init__(self
, simv
=None, debug
=False):
96 cmd
= shlex
.split(simv
)
99 cmd
+= ["+jtag_vpi_enable"]
101 cmd
[0] = cmd
[0] + "-debug"
102 cmd
+= ["+vcdplusfile=output/gdbserver.vpd"]
103 logfile
= open("simv.log", "w")
104 logfile
.write("+ %s\n" % " ".join(cmd
))
106 listenfile
= open("simv.log", "r")
107 listenfile
.seek(0, 2)
108 self
.process
= subprocess
.Popen(cmd
, stdin
=subprocess
.PIPE
,
109 stdout
=logfile
, stderr
=logfile
)
112 line
= listenfile
.readline()
115 match
= re
.match(r
"^Listening on port (\d+)$", line
)
118 self
.port
= int(match
.group(1))
119 os
.environ
['JTAG_VPI_PORT'] = str(self
.port
)
128 class Openocd(object):
129 logname
= "openocd.log"
131 def __init__(self
, cmd
=None, config
=None, debug
=False):
133 cmd
= shlex
.split(cmd
)
137 cmd
+= ["-f", find_file(config
)]
141 # This command needs to come before any config scripts on the command
142 # line, since they are executed in order.
144 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
147 # Disable tcl and telnet servers, since they are unused and because
148 # the port numbers will conflict if multiple OpenOCD processes are
149 # running on the same server.
153 "telnet_port disabled",
156 logfile
= open(Openocd
.logname
, "w")
157 logfile
.write("+ %s\n" % " ".join(cmd
))
159 self
.process
= subprocess
.Popen(cmd
, stdin
=subprocess
.PIPE
,
160 stdout
=logfile
, stderr
=logfile
)
162 # Wait for OpenOCD to have made it through riscv_examine(). When using
163 # OpenOCD to communicate with a simulator this may take a long time,
164 # and gdb will time out when trying to connect if we attempt too early.
168 log
= open(Openocd
.logname
).read()
169 if "Examined RISCV core" in log
:
171 if not self
.process
.poll() is None:
173 "OpenOCD exited before completing riscv_examine()")
174 if not messaged
and time
.time() - start
> 1:
176 print "Waiting for OpenOCD to examine RISCV core..."
178 self
.port
= self
._get
_gdb
_server
_port
()
180 def _get_gdb_server_port(self
):
181 """Get port that OpenOCD's gdb server is listening on."""
183 PORT_REGEX
= re
.compile(r
'(?P<port>\d+) \(LISTEN\)')
184 for _
in range(MAX_ATTEMPTS
):
185 with
open(os
.devnull
, 'w') as devnull
:
187 output
= subprocess
.check_output([
189 '-a', # Take the AND of the following selectors
190 '-p{}'.format(self
.process
.pid
), # Filter on PID
191 '-iTCP', # Filter only TCP sockets
193 except subprocess
.CalledProcessError
:
195 matches
= list(PORT_REGEX
.finditer(output
))
196 matches
= [m
for m
in matches
197 if m
.group('port') not in ('6666', '4444')]
201 "OpenOCD listening on multiple ports. Cannot uniquely "
202 "identify gdb server port.")
205 return int(match
.group('port'))
207 raise Exception("Timed out waiting for gdb server to obtain port.")
216 class OpenocdCli(object):
217 def __init__(self
, port
=4444):
218 self
.child
= pexpect
.spawn(
219 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port
)
220 self
.child
.expect("> ")
222 def command(self
, cmd
):
223 self
.child
.sendline(cmd
)
224 self
.child
.expect(cmd
)
225 self
.child
.expect("\n")
226 self
.child
.expect("> ")
227 return self
.child
.before
.strip("\t\r\n \0")
229 def reg(self
, reg
=''):
230 output
= self
.command("reg %s" % reg
)
231 matches
= re
.findall(r
"(\w+) \(/\d+\): (0x[0-9A-F]+)", output
)
232 values
= {r
: int(v
, 0) for r
, v
in matches
}
237 def load_image(self
, image
):
238 output
= self
.command("load_image %s" % image
)
239 if 'invalid ELF file, only 32bits files are supported' in output
:
240 raise TestNotApplicable(output
)
242 class CannotAccess(Exception):
243 def __init__(self
, address
):
244 Exception.__init
__(self
)
245 self
.address
= address
249 cmd
=os
.path
.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
250 self
.child
= pexpect
.spawn(cmd
)
251 self
.child
.logfile
= open("gdb.log", "w")
252 self
.child
.logfile
.write("+ %s\n" % cmd
)
254 self
.command("set confirm off")
255 self
.command("set width 0")
256 self
.command("set height 0")
258 self
.command("set print entry-values no")
261 """Wait for prompt."""
262 self
.child
.expect(r
"\(gdb\)")
264 def command(self
, command
, timeout
=-1):
265 self
.child
.sendline(command
)
266 self
.child
.expect("\n", timeout
=timeout
)
267 self
.child
.expect(r
"\(gdb\)", timeout
=timeout
)
268 return self
.child
.before
.strip()
270 def c(self
, wait
=True):
272 output
= self
.command("c")
273 assert "Continuing" in output
276 self
.child
.sendline("c")
277 self
.child
.expect("Continuing")
280 self
.child
.send("\003")
281 self
.child
.expect(r
"\(gdb\)", timeout
=60)
282 return self
.child
.before
.strip()
284 def x(self
, address
, size
='w'):
285 output
= self
.command("x/%s %s" % (size
, address
))
286 value
= int(output
.split(':')[1].strip(), 0)
289 def p_raw(self
, obj
):
290 output
= self
.command("p %s" % obj
)
291 m
= re
.search("Cannot access memory at address (0x[0-9a-f]+)", output
)
293 raise CannotAccess(int(m
.group(1), 0))
294 return output
.split('=')[-1].strip()
297 output
= self
.command("p/x %s" % obj
)
298 m
= re
.search("Cannot access memory at address (0x[0-9a-f]+)", output
)
300 raise CannotAccess(int(m
.group(1), 0))
301 value
= int(output
.split('=')[-1].strip(), 0)
304 def p_string(self
, obj
):
305 output
= self
.command("p %s" % obj
)
306 value
= shlex
.split(output
.split('=')[-1].strip())[1]
310 output
= self
.command("stepi")
314 output
= self
.command("load", timeout
=60)
315 assert "failed" not in output
316 assert "Transfer rate" in output
318 def b(self
, location
):
319 output
= self
.command("b %s" % location
)
320 assert "not defined" not in output
321 assert "Breakpoint" in output
324 def hbreak(self
, location
):
325 output
= self
.command("hbreak %s" % location
)
326 assert "not defined" not in output
327 assert "Hardware assisted breakpoint" in output
330 def run_all_tests(module
, target
, tests
, fail_fast
):
331 good_results
= set(('pass', 'not_applicable'))
337 for name
in dir(module
):
338 definition
= getattr(module
, name
)
339 if type(definition
) == type and hasattr(definition
, 'test') and \
340 (not tests
or any(test
in name
for test
in tests
)):
341 instance
= definition(target
)
342 result
= instance
.run()
343 results
.setdefault(result
, []).append(name
)
345 if result
not in good_results
and fail_fast
:
348 header("ran %d tests in %.0fs" % (count
, time
.time() - start
), dash
=':')
351 for key
, value
in results
.iteritems():
352 print "%d tests returned %s" % (len(value
), key
)
353 if key
not in good_results
:
360 def add_test_run_options(parser
):
361 parser
.add_argument("--fail-fast", "-f", action
="store_true",
362 help="Exit as soon as any test fails.")
363 parser
.add_argument("test", nargs
='*',
364 help="Run only tests that are named here.")
366 def header(title
, dash
='-'):
368 dashes
= dash
* (36 - len(title
))
369 before
= dashes
[:len(dashes
)/2]
370 after
= dashes
[len(dashes
)/2:]
371 print "%s[ %s ]%s" % (before
, title
, after
)
375 class BaseTest(object):
379 def __init__(self
, target
):
382 self
.target_process
= None
386 def early_applicable(self
):
387 """Return a false value if the test has determined it cannot run
388 without ever needing to talk to the target or server."""
389 # pylint: disable=no-self-use
396 compile_args
= getattr(self
, 'compile_args', None)
398 if compile_args
not in BaseTest
.compiled
:
399 # pylint: disable=star-args
400 BaseTest
.compiled
[compile_args
] = \
401 self
.target
.compile(*compile_args
)
402 self
.binary
= BaseTest
.compiled
.get(compile_args
)
404 def classSetup(self
):
406 self
.target_process
= self
.target
.target()
407 self
.server
= self
.target
.server()
408 self
.logs
.append(self
.server
.logname
)
410 def classTeardown(self
):
412 del self
.target_process
416 If compile_args is set, compile a program and set self.binary.
420 Then call test() and return the result, displaying relevant information
421 if an exception is raised.
424 print "Running", type(self
).__name
__, "...",
427 if not self
.early_applicable():
428 print "not_applicable"
429 return "not_applicable"
431 self
.start
= time
.time()
437 result
= self
.test() # pylint: disable=no-member
438 except TestNotApplicable
:
439 result
= "not_applicable"
440 except Exception as e
: # pylint: disable=broad-except
441 if isinstance(e
, TestFailed
):
445 print "%s in %.2fs" % (result
, time
.time() - self
.start
)
447 if isinstance(e
, TestFailed
):
451 traceback
.print_exc(file=sys
.stdout
)
452 for log
in self
.logs
:
454 print open(log
, "r").read()
463 print "%s in %.2fs" % (result
, time
.time() - self
.start
)
466 class TestFailed(Exception):
467 def __init__(self
, message
):
468 Exception.__init
__(self
)
469 self
.message
= message
471 class TestNotApplicable(Exception):
472 def __init__(self
, message
):
473 Exception.__init
__(self
)
474 self
.message
= message
476 def assertEqual(a
, b
):
478 raise TestFailed("%r != %r" % (a
, b
))
480 def assertNotEqual(a
, b
):
482 raise TestFailed("%r == %r" % (a
, b
))
486 raise TestFailed("%r not in %r" % (a
, b
))
488 def assertNotIn(a
, b
):
490 raise TestFailed("%r in %r" % (a
, b
))
492 def assertGreater(a
, b
):
494 raise TestFailed("%r not greater than %r" % (a
, b
))
496 def assertLess(a
, b
):
498 raise TestFailed("%r not less than %r" % (a
, b
))
502 raise TestFailed("%r is not True" % a
)
504 def assertRegexpMatches(text
, regexp
):
505 if not re
.search(regexp
, text
):
506 raise TestFailed("can't find %r in %r" % (regexp
, text
))