[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
9 import pexpect
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv%d-unknown-elf-gcc" % xlen)
23 cmd = [cc, "-g"]
24 for arg in args:
25 found = find_file(arg)
26 if found:
27 cmd.append(found)
28 else:
29 cmd.append(arg)
30 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
31 stderr=subprocess.PIPE)
32 stdout, stderr = process.communicate()
33 if process.returncode:
34 print
35 header("Compile failed")
36 print "+", " ".join(cmd)
37 print stdout,
38 print stderr,
39 header("")
40 raise Exception("Compile failed!")
42 def unused_port():
43 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
44 import socket
45 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
46 s.bind(("", 0))
47 port = s.getsockname()[1]
48 s.close()
49 return port
51 class Spike(object):
52 logname = "spike.log"
54 def __init__(self, cmd, binary=None, halted=False, with_jtag_gdb=True,
55 timeout=None, xlen=64):
56 """Launch spike. Return tuple of its process and the port it's running
57 on."""
58 if cmd:
59 cmd = shlex.split(cmd)
60 else:
61 cmd = ["spike"]
62 if xlen == 32:
63 cmd += ["--isa", "RV32"]
65 if timeout:
66 cmd = ["timeout", str(timeout)] + cmd
68 if halted:
69 cmd.append('-H')
70 if with_jtag_gdb:
71 cmd += ['--rbb-port', '0']
72 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
73 cmd.append("-m32")
74 cmd.append('pk')
75 if binary:
76 cmd.append(binary)
77 logfile = open(self.logname, "w")
78 logfile.write("+ %s\n" % " ".join(cmd))
79 logfile.flush()
80 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
81 stdout=logfile, stderr=logfile)
83 if with_jtag_gdb:
84 self.port = None
85 for _ in range(30):
86 m = re.search(r"Listening for remote bitbang connection on "
87 r"port (\d+).", open(self.logname).read())
88 if m:
89 self.port = int(m.group(1))
90 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
91 break
92 time.sleep(0.11)
93 assert self.port, "Didn't get spike message about bitbang " \
94 "connection"
96 def __del__(self):
97 try:
98 self.process.kill()
99 self.process.wait()
100 except OSError:
101 pass
103 def wait(self, *args, **kwargs):
104 return self.process.wait(*args, **kwargs)
106 class VcsSim(object):
107 def __init__(self, simv=None, debug=False):
108 if simv:
109 cmd = shlex.split(simv)
110 else:
111 cmd = ["simv"]
112 cmd += ["+jtag_vpi_enable"]
113 if debug:
114 cmd[0] = cmd[0] + "-debug"
115 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
116 logfile = open("simv.log", "w")
117 logfile.write("+ %s\n" % " ".join(cmd))
118 logfile.flush()
119 listenfile = open("simv.log", "r")
120 listenfile.seek(0, 2)
121 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
122 stdout=logfile, stderr=logfile)
123 done = False
124 while not done:
125 line = listenfile.readline()
126 if not line:
127 time.sleep(1)
128 match = re.match(r"^Listening on port (\d+)$", line)
129 if match:
130 done = True
131 self.port = int(match.group(1))
132 os.environ['JTAG_VPI_PORT'] = str(self.port)
134 def __del__(self):
135 try:
136 self.process.kill()
137 self.process.wait()
138 except OSError:
139 pass
141 class Openocd(object):
142 logname = "openocd.log"
144 def __init__(self, cmd=None, config=None, debug=False):
145 if cmd:
146 cmd = shlex.split(cmd)
147 else:
148 cmd = ["openocd", "-d"]
150 # This command needs to come before any config scripts on the command
151 # line, since they are executed in order.
152 cmd += [
153 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
154 "--command",
155 "gdb_port 0",
156 # Disable tcl and telnet servers, since they are unused and because
157 # the port numbers will conflict if multiple OpenOCD processes are
158 # running on the same server.
159 "--command",
160 "tcl_port disabled",
161 "--command",
162 "telnet_port disabled",
163 ]
165 if config:
166 cmd += ["-f", find_file(config)]
167 if debug:
168 cmd.append("-d")
170 logfile = open(Openocd.logname, "w")
171 logfile.write("+ %s\n" % " ".join(cmd))
172 logfile.flush()
173 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
174 stdout=logfile, stderr=logfile)
176 # Wait for OpenOCD to have made it through riscv_examine(). When using
177 # OpenOCD to communicate with a simulator this may take a long time,
178 # and gdb will time out when trying to connect if we attempt too early.
179 start = time.time()
180 messaged = False
181 while True:
182 log = open(Openocd.logname).read()
183 if "Examined RISCV core" in log:
184 break
185 if not self.process.poll() is None:
186 raise Exception(
187 "OpenOCD exited before completing riscv_examine()")
188 if not messaged and time.time() - start > 1:
189 messaged = True
190 print "Waiting for OpenOCD to examine RISCV core..."
192 self.port = self._get_gdb_server_port()
194 def _get_gdb_server_port(self):
195 """Get port that OpenOCD's gdb server is listening on."""
197 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
198 for _ in range(MAX_ATTEMPTS):
199 with open(os.devnull, 'w') as devnull:
200 try:
201 output = subprocess.check_output([
202 'lsof',
203 '-a', # Take the AND of the following selectors
204 '-p{}'.format(self.process.pid), # Filter on PID
205 '-iTCP', # Filter only TCP sockets
206 ], stderr=devnull)
207 except subprocess.CalledProcessError:
208 output = ""
209 matches = list(PORT_REGEX.finditer(output))
210 matches = [m for m in matches
211 if m.group('port') not in ('6666', '4444')]
212 if len(matches) > 1:
213 print output
214 raise Exception(
215 "OpenOCD listening on multiple ports. Cannot uniquely "
216 "identify gdb server port.")
217 elif matches:
218 [match] = matches
219 return int(match.group('port'))
220 time.sleep(0.1)
221 raise Exception("Timed out waiting for gdb server to obtain port.")
223 def __del__(self):
224 try:
225 self.process.kill()
226 self.process.wait()
227 except OSError:
228 pass
230 class OpenocdCli(object):
231 def __init__(self, port=4444):
232 self.child = pexpect.spawn(
233 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
234 self.child.expect("> ")
236 def command(self, cmd):
237 self.child.sendline(cmd)
238 self.child.expect(cmd)
239 self.child.expect("\n")
240 self.child.expect("> ")
241 return self.child.before.strip("\t\r\n \0")
243 def reg(self, reg=''):
244 output = self.command("reg %s" % reg)
245 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
246 values = {r: int(v, 0) for r, v in matches}
247 if reg:
248 return values[reg]
249 return values
251 def load_image(self, image):
252 output = self.command("load_image %s" % image)
253 if 'invalid ELF file, only 32bits files are supported' in output:
254 raise TestNotApplicable(output)
256 class CannotAccess(Exception):
257 def __init__(self, address):
258 Exception.__init__(self)
259 self.address = address
261 class Gdb(object):
262 def __init__(self,
263 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
264 self.child = pexpect.spawn(cmd)
265 self.child.logfile = open("gdb.log", "w")
266 self.child.logfile.write("+ %s\n" % cmd)
267 self.wait()
268 self.command("set confirm off")
269 self.command("set width 0")
270 self.command("set height 0")
271 # Force consistency.
272 self.command("set print entry-values no")
274 def wait(self):
275 """Wait for prompt."""
276 self.child.expect(r"\(gdb\)")
278 def command(self, command, timeout=-1):
279 self.child.sendline(command)
280 self.child.expect("\n", timeout=timeout)
281 self.child.expect(r"\(gdb\)", timeout=timeout)
282 return self.child.before.strip()
284 def c(self, wait=True, timeout=-1):
285 if wait:
286 output = self.command("c", timeout=timeout)
287 assert "Continuing" in output
288 return output
289 else:
290 self.child.sendline("c")
291 self.child.expect("Continuing")
293 def interrupt(self):
294 self.child.send("\003")
295 self.child.expect(r"\(gdb\)", timeout=60)
296 return self.child.before.strip()
298 def x(self, address, size='w'):
299 output = self.command("x/%s %s" % (size, address))
300 value = int(output.split(':')[1].strip(), 0)
301 return value
303 def p_raw(self, obj):
304 output = self.command("p %s" % obj)
305 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
306 if m:
307 raise CannotAccess(int(m.group(1), 0))
308 return output.split('=')[-1].strip()
310 def p(self, obj):
311 output = self.command("p/x %s" % obj)
312 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
313 if m:
314 raise CannotAccess(int(m.group(1), 0))
315 value = int(output.split('=')[-1].strip(), 0)
316 return value
318 def p_string(self, obj):
319 output = self.command("p %s" % obj)
320 value = shlex.split(output.split('=')[-1].strip())[1]
321 return value
323 def stepi(self):
324 output = self.command("stepi")
325 return output
327 def load(self):
328 output = self.command("load", timeout=60)
329 assert "failed" not in output
330 assert "Transfer rate" in output
332 def b(self, location):
333 output = self.command("b %s" % location)
334 assert "not defined" not in output
335 assert "Breakpoint" in output
336 return output
338 def hbreak(self, location):
339 output = self.command("hbreak %s" % location)
340 assert "not defined" not in output
341 assert "Hardware assisted breakpoint" in output
342 return output
344 def run_all_tests(module, target, parsed):
345 good_results = set(('pass', 'not_applicable'))
347 start = time.time()
349 results = {}
350 count = 0
352 global gdb_cmd # pylint: disable=global-statement
353 gdb_cmd = parsed.gdb
355 todo = [("ExamineTarget", ExamineTarget)]
356 for name in dir(module):
357 definition = getattr(module, name)
358 if type(definition) == type and hasattr(definition, 'test') and \
359 (not parsed.test or any(test in name for test in parsed.test)):
360 todo.append((name, definition))
362 for name, definition in todo:
363 instance = definition(target)
364 result = instance.run()
365 results.setdefault(result, []).append(name)
366 count += 1
367 if result not in good_results and parsed.fail_fast:
368 break
370 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
372 result = 0
373 for key, value in results.iteritems():
374 print "%d tests returned %s" % (len(value), key)
375 if key not in good_results:
376 result = 1
377 for test in value:
378 print " ", test
380 return result
382 def add_test_run_options(parser):
383 parser.add_argument("--fail-fast", "-f", action="store_true",
384 help="Exit as soon as any test fails.")
385 parser.add_argument("test", nargs='*',
386 help="Run only tests that are named here.")
387 parser.add_argument("--gdb",
388 help="The command to use to start gdb.")
390 def header(title, dash='-'):
391 if title:
392 dashes = dash * (36 - len(title))
393 before = dashes[:len(dashes)/2]
394 after = dashes[len(dashes)/2:]
395 print "%s[ %s ]%s" % (before, title, after)
396 else:
397 print dash * 40
399 def print_log(path):
400 header(path)
401 lines = open(path, "r").readlines()
402 if len(lines) > 1000:
403 for l in lines[:500]:
404 sys.stdout.write(l)
405 print "..."
406 for l in lines[-500:]:
407 sys.stdout.write(l)
408 else:
409 for l in lines:
410 sys.stdout.write(l)
412 class BaseTest(object):
413 compiled = {}
415 def __init__(self, target):
416 self.target = target
417 self.server = None
418 self.target_process = None
419 self.binary = None
420 self.start = 0
421 self.logs = []
423 def early_applicable(self):
424 """Return a false value if the test has determined it cannot run
425 without ever needing to talk to the target or server."""
426 # pylint: disable=no-self-use
427 return True
429 def setup(self):
430 pass
432 def compile(self):
433 compile_args = getattr(self, 'compile_args', None)
434 if compile_args:
435 if compile_args not in BaseTest.compiled:
436 # pylint: disable=star-args
437 BaseTest.compiled[compile_args] = \
438 self.target.compile(*compile_args)
439 self.binary = BaseTest.compiled.get(compile_args)
441 def classSetup(self):
442 self.compile()
443 self.target_process = self.target.target()
444 self.server = self.target.server()
445 self.logs.append(self.server.logname)
447 def classTeardown(self):
448 del self.server
449 del self.target_process
451 def run(self):
452 """
453 If compile_args is set, compile a program and set self.binary.
455 Call setup().
457 Then call test() and return the result, displaying relevant information
458 if an exception is raised.
459 """
461 print "Running", type(self).__name__, "...",
462 sys.stdout.flush()
464 if not self.early_applicable():
465 print "not_applicable"
466 return "not_applicable"
468 self.start = time.time()
470 self.classSetup()
472 try:
473 self.setup()
474 result = self.test() # pylint: disable=no-member
475 except TestNotApplicable:
476 result = "not_applicable"
477 except Exception as e: # pylint: disable=broad-except
478 if isinstance(e, TestFailed):
479 result = "fail"
480 else:
481 result = "exception"
482 print "%s in %.2fs" % (result, time.time() - self.start)
483 print "=" * 40
484 if isinstance(e, TestFailed):
485 header("Message")
486 print e.message
487 header("Traceback")
488 traceback.print_exc(file=sys.stdout)
489 for log in self.logs:
490 print_log(log)
491 print "/" * 40
492 return result
494 finally:
495 self.classTeardown()
497 if not result:
498 result = 'pass'
499 print "%s in %.2fs" % (result, time.time() - self.start)
500 return result
502 gdb_cmd = None
503 class GdbTest(BaseTest):
504 def __init__(self, target):
505 BaseTest.__init__(self, target)
506 self.gdb = None
508 def classSetup(self):
509 BaseTest.classSetup(self)
510 self.logs.append("gdb.log")
512 if gdb_cmd:
513 self.gdb = Gdb(gdb_cmd)
514 else:
515 self.gdb = Gdb()
517 if self.binary:
518 self.gdb.command("file %s" % self.binary)
519 if self.target:
520 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
521 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
522 if self.server.port:
523 self.gdb.command(
524 "target extended-remote localhost:%d" % self.server.port)
526 self.gdb.p("$priv=3")
528 def classTeardown(self):
529 del self.gdb
530 BaseTest.classTeardown(self)
532 class ExamineTarget(GdbTest):
533 def test(self):
534 self.target.misa = self.gdb.p("$misa")
536 txt = "RV"
537 if (self.target.misa >> 30) == 1:
538 txt += "32"
539 elif (self.target.misa >> 62) == 2:
540 txt += "64"
541 elif (self.target.misa >> 126) == 3:
542 txt += "128"
543 else:
544 txt += "??"
546 for i in range(26):
547 if self.target.misa & (1<<i):
548 txt += chr(i + ord('A'))
549 print txt,
551 class TestFailed(Exception):
552 def __init__(self, message):
553 Exception.__init__(self)
554 self.message = message
556 class TestNotApplicable(Exception):
557 def __init__(self, message):
558 Exception.__init__(self)
559 self.message = message
561 def assertEqual(a, b):
562 if a != b:
563 raise TestFailed("%r != %r" % (a, b))
565 def assertNotEqual(a, b):
566 if a == b:
567 raise TestFailed("%r == %r" % (a, b))
569 def assertIn(a, b):
570 if a not in b:
571 raise TestFailed("%r not in %r" % (a, b))
573 def assertNotIn(a, b):
574 if a in b:
575 raise TestFailed("%r in %r" % (a, b))
577 def assertGreater(a, b):
578 if not a > b:
579 raise TestFailed("%r not greater than %r" % (a, b))
581 def assertLess(a, b):
582 if not a < b:
583 raise TestFailed("%r not less than %r" % (a, b))
585 def assertTrue(a):
586 if not a:
587 raise TestFailed("%r is not True" % a)
589 def assertRegexpMatches(text, regexp):
590 if not re.search(regexp, text):
591 raise TestFailed("can't find %r in %r" % (regexp, text))