Store logs for all tests in logs/
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import tempfile
7 import time
8 import traceback
9
10 import pexpect
11
12 # Note that gdb comes with its own testsuite. I was unable to figure out how to
13 # run that testsuite against the spike simulator.
14
15 def find_file(path):
16 for directory in (os.getcwd(), os.path.dirname(__file__)):
17 fullpath = os.path.join(directory, path)
18 if os.path.exists(fullpath):
19 return fullpath
20 return None
21
22 def compile(args, xlen=32): # pylint: disable=redefined-builtin
23 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
24 cmd = [cc, "-g"]
25 if xlen == 32:
26 cmd.append("-march=rv32imac")
27 cmd.append("-mabi=ilp32")
28 else:
29 cmd.append("-march=rv64imac")
30 cmd.append("-mabi=lp64")
31 for arg in args:
32 found = find_file(arg)
33 if found:
34 cmd.append(found)
35 else:
36 cmd.append(arg)
37 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
38 stderr=subprocess.PIPE)
39 stdout, stderr = process.communicate()
40 if process.returncode:
41 print
42 header("Compile failed")
43 print "+", " ".join(cmd)
44 print stdout,
45 print stderr,
46 header("")
47 raise Exception("Compile failed!")
48
49 def unused_port():
50 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
51 import socket
52 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
53 s.bind(("", 0))
54 port = s.getsockname()[1]
55 s.close()
56 return port
57
58 class Spike(object):
59 logname = "spike-%d.log" % os.getpid()
60
61 def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True):
62 """Launch spike. Return tuple of its process and the port it's running
63 on."""
64 if target.sim_cmd:
65 cmd = shlex.split(target.sim_cmd)
66 else:
67 spike = os.path.expandvars("$RISCV/bin/spike")
68 cmd = [spike]
69 if target.xlen == 32:
70 cmd += ["--isa", "RV32G"]
71 else:
72 cmd += ["--isa", "RV64G"]
73 cmd += ["-m0x%x:0x%x" % (target.ram, target.ram_size)]
74
75 if timeout:
76 cmd = ["timeout", str(timeout)] + cmd
77
78 if halted:
79 cmd.append('-H')
80 if with_jtag_gdb:
81 cmd += ['--rbb-port', '0']
82 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
83 self.infinite_loop = target.compile(
84 "programs/checksum.c", "programs/tiny-malloc.c",
85 "programs/infinite_loop.c", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
86 cmd.append(self.infinite_loop)
87 logfile = open(self.logname, "w")
88 logfile.write("+ %s\n" % " ".join(cmd))
89 logfile.flush()
90 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
91 stdout=logfile, stderr=logfile)
92
93 if with_jtag_gdb:
94 self.port = None
95 for _ in range(30):
96 m = re.search(r"Listening for remote bitbang connection on "
97 r"port (\d+).", open(self.logname).read())
98 if m:
99 self.port = int(m.group(1))
100 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
101 break
102 time.sleep(0.11)
103 assert self.port, "Didn't get spike message about bitbang " \
104 "connection"
105
106 def __del__(self):
107 try:
108 self.process.kill()
109 self.process.wait()
110 except OSError:
111 pass
112
113 def wait(self, *args, **kwargs):
114 return self.process.wait(*args, **kwargs)
115
116 class VcsSim(object):
117 def __init__(self, sim_cmd=None, debug=False):
118 if sim_cmd:
119 cmd = shlex.split(sim_cmd)
120 else:
121 cmd = ["simv"]
122 cmd += ["+jtag_vpi_enable"]
123 if debug:
124 cmd[0] = cmd[0] + "-debug"
125 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
126 logfile = open("simv.log", "w")
127 logfile.write("+ %s\n" % " ".join(cmd))
128 logfile.flush()
129 listenfile = open("simv.log", "r")
130 listenfile.seek(0, 2)
131 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
132 stdout=logfile, stderr=logfile)
133 done = False
134 while not done:
135 # Fail if VCS exits early
136 exit_code = self.process.poll()
137 if exit_code is not None:
138 raise RuntimeError('VCS simulator exited early with status %d'
139 % exit_code)
140
141 line = listenfile.readline()
142 if not line:
143 time.sleep(1)
144 match = re.match(r"^Listening on port (\d+)$", line)
145 if match:
146 done = True
147 self.port = int(match.group(1))
148 os.environ['JTAG_VPI_PORT'] = str(self.port)
149
150 def __del__(self):
151 try:
152 self.process.kill()
153 self.process.wait()
154 except OSError:
155 pass
156
157 class Openocd(object):
158 logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')
159 logname = logfile.name
160
161 def __init__(self, server_cmd=None, config=None, debug=False):
162 if server_cmd:
163 cmd = shlex.split(server_cmd)
164 else:
165 openocd = os.path.expandvars("$RISCV/bin/openocd")
166 cmd = [openocd]
167 if debug:
168 cmd.append("-d")
169
170 # This command needs to come before any config scripts on the command
171 # line, since they are executed in order.
172 cmd += [
173 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
174 "--command",
175 "gdb_port 0",
176 # Disable tcl and telnet servers, since they are unused and because
177 # the port numbers will conflict if multiple OpenOCD processes are
178 # running on the same server.
179 "--command",
180 "tcl_port disabled",
181 "--command",
182 "telnet_port disabled",
183 ]
184
185 if config:
186 f = find_file(config)
187 if f is None:
188 print "Unable to read file " + config
189 exit(1)
190
191 cmd += ["-f", f]
192 if debug:
193 cmd.append("-d")
194
195 logfile = open(Openocd.logname, "w")
196 logfile.write("+ %s\n" % " ".join(cmd))
197 logfile.flush()
198 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
199 stdout=logfile, stderr=logfile)
200
201 # Wait for OpenOCD to have made it through riscv_examine(). When using
202 # OpenOCD to communicate with a simulator this may take a long time,
203 # and gdb will time out when trying to connect if we attempt too early.
204 start = time.time()
205 messaged = False
206 while True:
207 log = open(Openocd.logname).read()
208 if "Ready for Remote Connections" in log:
209 break
210 if not self.process.poll() is None:
211 raise Exception(
212 "OpenOCD exited before completing riscv_examine()")
213 if not messaged and time.time() - start > 1:
214 messaged = True
215 print "Waiting for OpenOCD to examine RISCV core..."
216 if time.time() - start > 60:
217 raise Exception("ERROR: Timed out waiting for OpenOCD to "
218 "examine RISCV core")
219
220 self.port = self._get_gdb_server_port()
221
222 def _get_gdb_server_port(self):
223 """Get port that OpenOCD's gdb server is listening on."""
224 MAX_ATTEMPTS = 50
225 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
226 for _ in range(MAX_ATTEMPTS):
227 with open(os.devnull, 'w') as devnull:
228 try:
229 output = subprocess.check_output([
230 'lsof',
231 '-a', # Take the AND of the following selectors
232 '-p{}'.format(self.process.pid), # Filter on PID
233 '-iTCP', # Filter only TCP sockets
234 ], stderr=devnull)
235 except subprocess.CalledProcessError:
236 output = ""
237 matches = list(PORT_REGEX.finditer(output))
238 matches = [m for m in matches
239 if m.group('port') not in ('6666', '4444')]
240 if len(matches) > 1:
241 print output
242 raise Exception(
243 "OpenOCD listening on multiple ports. Cannot uniquely "
244 "identify gdb server port.")
245 elif matches:
246 [match] = matches
247 return int(match.group('port'))
248 time.sleep(1)
249 raise Exception("Timed out waiting for gdb server to obtain port.")
250
251 def __del__(self):
252 try:
253 self.process.kill()
254 self.process.wait()
255 except OSError:
256 pass
257
258 class OpenocdCli(object):
259 def __init__(self, port=4444):
260 self.child = pexpect.spawn(
261 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
262 self.child.expect("> ")
263
264 def command(self, cmd):
265 self.child.sendline(cmd)
266 self.child.expect(cmd)
267 self.child.expect("\n")
268 self.child.expect("> ")
269 return self.child.before.strip("\t\r\n \0")
270
271 def reg(self, reg=''):
272 output = self.command("reg %s" % reg)
273 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
274 values = {r: int(v, 0) for r, v in matches}
275 if reg:
276 return values[reg]
277 return values
278
279 def load_image(self, image):
280 output = self.command("load_image %s" % image)
281 if 'invalid ELF file, only 32bits files are supported' in output:
282 raise TestNotApplicable(output)
283
284 class CannotAccess(Exception):
285 def __init__(self, address):
286 Exception.__init__(self)
287 self.address = address
288
289 class Gdb(object):
290 logfile = tempfile.NamedTemporaryFile(prefix="gdb", suffix=".log")
291 logname = logfile.name
292
293 def __init__(self,
294 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
295 self.child = pexpect.spawn(cmd)
296 self.child.logfile = open(self.logname, "w")
297 self.child.logfile.write("+ %s\n" % cmd)
298 self.wait()
299 self.command("set confirm off")
300 self.command("set width 0")
301 self.command("set height 0")
302 # Force consistency.
303 self.command("set print entry-values no")
304
305 def wait(self):
306 """Wait for prompt."""
307 self.child.expect(r"\(gdb\)")
308
309 def command(self, command, timeout=6000):
310 self.child.sendline(command)
311 self.child.expect("\n", timeout=timeout)
312 self.child.expect(r"\(gdb\)", timeout=timeout)
313 return self.child.before.strip()
314
315 def c(self, wait=True, timeout=-1):
316 if wait:
317 output = self.command("c", timeout=timeout)
318 assert "Continuing" in output
319 return output
320 else:
321 self.child.sendline("c")
322 self.child.expect("Continuing")
323
324 def interrupt(self):
325 self.child.send("\003")
326 self.child.expect(r"\(gdb\)", timeout=6000)
327 return self.child.before.strip()
328
329 def x(self, address, size='w'):
330 output = self.command("x/%s %s" % (size, address))
331 value = int(output.split(':')[1].strip(), 0)
332 return value
333
334 def p_raw(self, obj):
335 output = self.command("p %s" % obj)
336 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
337 if m:
338 raise CannotAccess(int(m.group(1), 0))
339 return output.split('=')[-1].strip()
340
341 def p(self, obj):
342 output = self.command("p/x %s" % obj)
343 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
344 if m:
345 raise CannotAccess(int(m.group(1), 0))
346 value = int(output.split('=')[-1].strip(), 0)
347 return value
348
349 def p_string(self, obj):
350 output = self.command("p %s" % obj)
351 value = shlex.split(output.split('=')[-1].strip())[1]
352 return value
353
354 def stepi(self):
355 output = self.command("stepi")
356 return output
357
358 def load(self):
359 output = self.command("load", timeout=6000)
360 assert "failed" not in output
361 assert "Transfer rate" in output
362
363 def b(self, location):
364 output = self.command("b %s" % location)
365 assert "not defined" not in output
366 assert "Breakpoint" in output
367 return output
368
369 def hbreak(self, location):
370 output = self.command("hbreak %s" % location)
371 assert "not defined" not in output
372 assert "Hardware assisted breakpoint" in output
373 return output
374
375 def run_all_tests(module, target, parsed):
376 if not os.path.exists(parsed.logs):
377 os.makedirs(parsed.logs)
378
379 overall_start = time.time()
380
381 global gdb_cmd # pylint: disable=global-statement
382 gdb_cmd = parsed.gdb
383
384 todo = []
385 if parsed.misaval:
386 target.misa = int(parsed.misaval, 16)
387 print "Assuming $MISA value of 0x%x. Skipping ExamineTarget." % \
388 target.misa
389 else:
390 todo.append(("ExamineTarget", ExamineTarget))
391
392 for name in dir(module):
393 definition = getattr(module, name)
394 if type(definition) == type and hasattr(definition, 'test') and \
395 (not parsed.test or any(test in name for test in parsed.test)):
396 todo.append((name, definition))
397
398 results, count = run_tests(parsed, target, todo)
399
400 header("ran %d tests in %.0fs" % (count, time.time() - overall_start),
401 dash=':')
402
403 return print_results(results)
404
405 good_results = set(('pass', 'not_applicable'))
406 def run_tests(parsed, target, todo):
407 results = {}
408 count = 0
409
410 for name, definition in todo:
411 instance = definition(target)
412 log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
413 (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
414 log_fd = open(log_name, 'w')
415 print "Running", name, "...",
416 sys.stdout.flush()
417 log_fd.write("Test: %s\n" % name)
418 log_fd.write("Target: %s\n" % type(target).__name__)
419 start = time.time()
420 real_stdout = sys.stdout
421 sys.stdout = log_fd
422 try:
423 result = instance.run()
424 finally:
425 sys.stdout = real_stdout
426 print "%s in %.2fs" % (result, time.time() - start)
427 sys.stdout.flush()
428 log_fd.write("Result: %s\n" % result)
429 log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
430 results.setdefault(result, []).append(name)
431 count += 1
432 if result not in good_results and parsed.fail_fast:
433 break
434
435 return results, count
436
437 def print_results(results):
438 result = 0
439 for key, value in results.iteritems():
440 print "%d tests returned %s" % (len(value), key)
441 if key not in good_results:
442 result = 1
443 for test in value:
444 print " ", test
445
446 return result
447
448 def add_test_run_options(parser):
449
450 parser.add_argument("--logs", default="logs",
451 help="Store logs in the specified directory.")
452 parser.add_argument("--fail-fast", "-f", action="store_true",
453 help="Exit as soon as any test fails.")
454 parser.add_argument("test", nargs='*',
455 help="Run only tests that are named here.")
456 parser.add_argument("--gdb",
457 help="The command to use to start gdb.")
458 parser.add_argument("--misaval",
459 help="Don't run ExamineTarget, just assume the misa value which is "
460 "specified.")
461
462 def header(title, dash='-', length=78):
463 if title:
464 dashes = dash * (length - 4 - len(title))
465 before = dashes[:len(dashes)/2]
466 after = dashes[len(dashes)/2:]
467 print "%s[ %s ]%s" % (before, title, after)
468 else:
469 print dash * length
470
471 def print_log(path):
472 header(path)
473 lines = open(path, "r").readlines()
474 for l in lines:
475 sys.stdout.write(l)
476 print
477
478 class BaseTest(object):
479 compiled = {}
480
481 def __init__(self, target):
482 self.target = target
483 self.server = None
484 self.target_process = None
485 self.binary = None
486 self.start = 0
487 self.logs = []
488
489 def early_applicable(self):
490 """Return a false value if the test has determined it cannot run
491 without ever needing to talk to the target or server."""
492 # pylint: disable=no-self-use
493 return True
494
495 def setup(self):
496 pass
497
498 def compile(self):
499 compile_args = getattr(self, 'compile_args', None)
500 if compile_args:
501 if compile_args not in BaseTest.compiled:
502 # pylint: disable=star-args
503 BaseTest.compiled[compile_args] = \
504 self.target.compile(*compile_args)
505 self.binary = BaseTest.compiled.get(compile_args)
506
507 def classSetup(self):
508 self.compile()
509 self.target_process = self.target.target()
510 self.server = self.target.server()
511 self.logs.append(self.server.logname)
512
513 def classTeardown(self):
514 del self.server
515 del self.target_process
516
517 def run(self):
518 """
519 If compile_args is set, compile a program and set self.binary.
520
521 Call setup().
522
523 Then call test() and return the result, displaying relevant information
524 if an exception is raised.
525 """
526
527 sys.stdout.flush()
528
529 if not self.early_applicable():
530 return "not_applicable"
531
532 self.start = time.time()
533
534 self.classSetup()
535
536 try:
537 self.setup()
538 result = self.test() # pylint: disable=no-member
539 except TestNotApplicable:
540 result = "not_applicable"
541 except Exception as e: # pylint: disable=broad-except
542 if isinstance(e, TestFailed):
543 result = "fail"
544 else:
545 result = "exception"
546 if isinstance(e, TestFailed):
547 header("Message")
548 print e.message
549 header("Traceback")
550 traceback.print_exc(file=sys.stdout)
551 return result
552
553 finally:
554 for log in self.logs:
555 print_log(log)
556 header("End of logs")
557 self.classTeardown()
558
559 if not result:
560 result = 'pass'
561 return result
562
563 gdb_cmd = None
564 class GdbTest(BaseTest):
565 def __init__(self, target):
566 BaseTest.__init__(self, target)
567 self.gdb = None
568
569 def classSetup(self):
570 BaseTest.classSetup(self)
571
572 if gdb_cmd:
573 self.gdb = Gdb(gdb_cmd)
574 else:
575 self.gdb = Gdb()
576
577 self.logs.append(self.gdb.logname)
578
579 if self.binary:
580 self.gdb.command("file %s" % self.binary)
581 if self.target:
582 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
583 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
584 if self.server.port:
585 self.gdb.command(
586 "target extended-remote localhost:%d" % self.server.port)
587 # Force gdb to discover threads now, otherwise it might interrupt
588 # us at some point when it decides by itself to check.
589 self.gdb.command("info threads")
590
591 # FIXME: OpenOCD doesn't handle PRIV now
592 #self.gdb.p("$priv=3")
593
594 def classTeardown(self):
595 del self.gdb
596 BaseTest.classTeardown(self)
597
598 class ExamineTarget(GdbTest):
599 def test(self):
600 self.target.misa = self.gdb.p("$misa")
601
602 txt = "RV"
603 if (self.target.misa >> 30) == 1:
604 txt += "32"
605 elif (self.target.misa >> 62) == 2:
606 txt += "64"
607 elif (self.target.misa >> 126) == 3:
608 txt += "128"
609 else:
610 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
611 self.target.misa)
612
613 for i in range(26):
614 if self.target.misa & (1<<i):
615 txt += chr(i + ord('A'))
616 print txt,
617
618 class TestFailed(Exception):
619 def __init__(self, message):
620 Exception.__init__(self)
621 self.message = message
622
623 class TestNotApplicable(Exception):
624 def __init__(self, message):
625 Exception.__init__(self)
626 self.message = message
627
628 def assertEqual(a, b):
629 if a != b:
630 raise TestFailed("%r != %r" % (a, b))
631
632 def assertNotEqual(a, b):
633 if a == b:
634 raise TestFailed("%r == %r" % (a, b))
635
636 def assertIn(a, b):
637 if a not in b:
638 raise TestFailed("%r not in %r" % (a, b))
639
640 def assertNotIn(a, b):
641 if a in b:
642 raise TestFailed("%r in %r" % (a, b))
643
644 def assertGreater(a, b):
645 if not a > b:
646 raise TestFailed("%r not greater than %r" % (a, b))
647
648 def assertLess(a, b):
649 if not a < b:
650 raise TestFailed("%r not less than %r" % (a, b))
651
652 def assertTrue(a):
653 if not a:
654 raise TestFailed("%r is not True" % a)
655
656 def assertRegexpMatches(text, regexp):
657 if not re.search(regexp, text):
658 raise TestFailed("can't find %r in %r" % (regexp, text))