b76f320a7071cc4fb00c02cd0aaacade8066c9e2
[riscv-tests.git] / debug / testlib.py
1 import collections
2 import os.path
3 import random
4 import re
5 import shlex
6 import subprocess
7 import sys
8 import tempfile
9 import time
10 import traceback
11
12 import pexpect
13
14 # Note that gdb comes with its own testsuite. I was unable to figure out how to
15 # run that testsuite against the spike simulator.
16
17 def find_file(path):
18 for directory in (os.getcwd(), os.path.dirname(__file__)):
19 fullpath = os.path.join(directory, path)
20 if os.path.exists(fullpath):
21 return fullpath
22 return None
23
24 def compile(args, xlen=32): # pylint: disable=redefined-builtin
25 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
26 cmd = [cc, "-g"]
27 if xlen == 32:
28 cmd.append("-march=rv32imac")
29 cmd.append("-mabi=ilp32")
30 else:
31 cmd.append("-march=rv64imac")
32 cmd.append("-mabi=lp64")
33 for arg in args:
34 found = find_file(arg)
35 if found:
36 cmd.append(found)
37 else:
38 cmd.append(arg)
39 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
40 stderr=subprocess.PIPE)
41 stdout, stderr = process.communicate()
42 if process.returncode:
43 print
44 header("Compile failed")
45 print "+", " ".join(cmd)
46 print stdout,
47 print stderr,
48 header("")
49 raise Exception("Compile failed!")
50
51 def unused_port():
52 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
53 import socket
54 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
55 s.bind(("", 0))
56 port = s.getsockname()[1]
57 s.close()
58 return port
59
60 class Spike(object):
61 logname = "spike-%d.log" % os.getpid()
62
63 def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True):
64 """Launch spike. Return tuple of its process and the port it's running
65 on."""
66 if target.sim_cmd:
67 cmd = shlex.split(target.sim_cmd)
68 else:
69 spike = os.path.expandvars("$RISCV/bin/spike")
70 cmd = [spike]
71 if target.xlen == 32:
72 cmd += ["--isa", "RV32G"]
73 else:
74 cmd += ["--isa", "RV64G"]
75 cmd += ["-m0x%x:0x%x" % (target.ram, target.ram_size)]
76
77 if timeout:
78 cmd = ["timeout", str(timeout)] + cmd
79
80 if halted:
81 cmd.append('-H')
82 if with_jtag_gdb:
83 cmd += ['--rbb-port', '0']
84 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
85 self.infinite_loop = target.compile(
86 "programs/checksum.c", "programs/tiny-malloc.c",
87 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
88 cmd.append(self.infinite_loop)
89 logfile = open(self.logname, "w")
90 logfile.write("+ %s\n" % " ".join(cmd))
91 logfile.flush()
92 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
93 stdout=logfile, stderr=logfile)
94
95 if with_jtag_gdb:
96 self.port = None
97 for _ in range(30):
98 m = re.search(r"Listening for remote bitbang connection on "
99 r"port (\d+).", open(self.logname).read())
100 if m:
101 self.port = int(m.group(1))
102 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
103 break
104 time.sleep(0.11)
105 assert self.port, "Didn't get spike message about bitbang " \
106 "connection"
107
108 def __del__(self):
109 try:
110 self.process.kill()
111 self.process.wait()
112 except OSError:
113 pass
114
115 def wait(self, *args, **kwargs):
116 return self.process.wait(*args, **kwargs)
117
118 class VcsSim(object):
119 logname = "simv.log"
120
121 def __init__(self, sim_cmd=None, debug=False):
122 if sim_cmd:
123 cmd = shlex.split(sim_cmd)
124 else:
125 cmd = ["simv"]
126 cmd += ["+jtag_vpi_enable"]
127 if debug:
128 cmd[0] = cmd[0] + "-debug"
129 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
130 logfile = open(self.logname, "w")
131 logfile.write("+ %s\n" % " ".join(cmd))
132 logfile.flush()
133 listenfile = open(self.logname, "r")
134 listenfile.seek(0, 2)
135 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
136 stdout=logfile, stderr=logfile)
137 done = False
138 while not done:
139 # Fail if VCS exits early
140 exit_code = self.process.poll()
141 if exit_code is not None:
142 raise RuntimeError('VCS simulator exited early with status %d'
143 % exit_code)
144
145 line = listenfile.readline()
146 if not line:
147 time.sleep(1)
148 match = re.match(r"^Listening on port (\d+)$", line)
149 if match:
150 done = True
151 self.port = int(match.group(1))
152 os.environ['JTAG_VPI_PORT'] = str(self.port)
153
154 def __del__(self):
155 try:
156 self.process.kill()
157 self.process.wait()
158 except OSError:
159 pass
160
161 class Openocd(object):
162 logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')
163 logname = logfile.name
164 print "OpenOCD Temporary Log File: %s" % logname
165
166 def __init__(self, server_cmd=None, config=None, debug=False, timeout=60):
167 if server_cmd:
168 cmd = shlex.split(server_cmd)
169 else:
170 openocd = os.path.expandvars("$RISCV/bin/openocd")
171 cmd = [openocd]
172 if debug:
173 cmd.append("-d")
174
175 # This command needs to come before any config scripts on the command
176 # line, since they are executed in order.
177 cmd += [
178 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
179 "--command",
180 "gdb_port 0",
181 # Disable tcl and telnet servers, since they are unused and because
182 # the port numbers will conflict if multiple OpenOCD processes are
183 # running on the same server.
184 "--command",
185 "tcl_port disabled",
186 "--command",
187 "telnet_port disabled",
188 ]
189
190 if config:
191 f = find_file(config)
192 if f is None:
193 print "Unable to read file " + config
194 exit(1)
195
196 cmd += ["-f", f]
197 if debug:
198 cmd.append("-d")
199
200 logfile = open(Openocd.logname, "w")
201 logfile.write("+ %s\n" % " ".join(cmd))
202 logfile.flush()
203 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
204 stdout=logfile, stderr=logfile)
205
206 try:
207 # Wait for OpenOCD to have made it through riscv_examine(). When
208 # using OpenOCD to communicate with a simulator this may take a
209 # long time, and gdb will time out when trying to connect if we
210 # attempt too early.
211 start = time.time()
212 messaged = False
213 while True:
214 log = open(Openocd.logname).read()
215 m = re.search(r"Listening on port (\d+) for gdb connections",
216 log)
217 if m:
218 self.port = int(m.group(1))
219 break
220
221 if not self.process.poll() is None:
222 raise Exception(
223 "OpenOCD exited before completing riscv_examine()")
224 if not messaged and time.time() - start > 1:
225 messaged = True
226 print "Waiting for OpenOCD to start..."
227 if (time.time() - start) > timeout:
228 raise Exception("ERROR: Timed out waiting for OpenOCD to "
229 "listen for gdb")
230
231 except Exception:
232 header("OpenOCD log")
233 sys.stdout.write(log)
234 raise
235
236 def __del__(self):
237 try:
238 self.process.kill()
239 self.process.wait()
240 except (OSError, AttributeError):
241 pass
242
243 class OpenocdCli(object):
244 def __init__(self, port=4444):
245 self.child = pexpect.spawn(
246 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
247 self.child.expect("> ")
248
249 def command(self, cmd):
250 self.child.sendline(cmd)
251 self.child.expect(cmd)
252 self.child.expect("\n")
253 self.child.expect("> ")
254 return self.child.before.strip("\t\r\n \0")
255
256 def reg(self, reg=''):
257 output = self.command("reg %s" % reg)
258 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
259 values = {r: int(v, 0) for r, v in matches}
260 if reg:
261 return values[reg]
262 return values
263
264 def load_image(self, image):
265 output = self.command("load_image %s" % image)
266 if 'invalid ELF file, only 32bits files are supported' in output:
267 raise TestNotApplicable(output)
268
269 class CannotAccess(Exception):
270 def __init__(self, address):
271 Exception.__init__(self)
272 self.address = address
273
274 Thread = collections.namedtuple('Thread', ('id', 'target_id', 'name',
275 'frame'))
276
277 class Gdb(object):
278 logfile = tempfile.NamedTemporaryFile(prefix="gdb", suffix=".log")
279 logname = logfile.name
280 print "GDB Temporary Log File: %s" % logname
281
282 def __init__(self,
283 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
284 self.child = pexpect.spawn(cmd)
285 self.child.logfile = open(self.logname, "w")
286 self.child.logfile.write("+ %s\n" % cmd)
287 self.wait()
288 self.command("set confirm off")
289 self.command("set width 0")
290 self.command("set height 0")
291 # Force consistency.
292 self.command("set print entry-values no")
293
294 def wait(self):
295 """Wait for prompt."""
296 self.child.expect(r"\(gdb\)")
297
298 def command(self, command, timeout=6000):
299 self.child.sendline(command)
300 self.child.expect("\n", timeout=timeout)
301 self.child.expect(r"\(gdb\)", timeout=timeout)
302 return self.child.before.strip()
303
304 def c(self, wait=True, timeout=-1, async=False):
305 if async:
306 async = "&"
307 else:
308 async = ""
309 if wait:
310 output = self.command("c%s" % async, timeout=timeout)
311 assert "Continuing" in output
312 return output
313 else:
314 self.child.sendline("c%s" % async)
315 self.child.expect("Continuing")
316
317 def interrupt(self):
318 self.child.send("\003")
319 self.child.expect(r"\(gdb\)", timeout=6000)
320 return self.child.before.strip()
321
322 def x(self, address, size='w'):
323 output = self.command("x/%s %s" % (size, address))
324 value = int(output.split(':')[1].strip(), 0)
325 return value
326
327 def p_raw(self, obj):
328 output = self.command("p %s" % obj)
329 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
330 if m:
331 raise CannotAccess(int(m.group(1), 0))
332 return output.split('=')[-1].strip()
333
334 def p(self, obj):
335 output = self.command("p/x %s" % obj)
336 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
337 if m:
338 raise CannotAccess(int(m.group(1), 0))
339 value = int(output.split('=')[-1].strip(), 0)
340 return value
341
342 def p_string(self, obj):
343 output = self.command("p %s" % obj)
344 value = shlex.split(output.split('=')[-1].strip())[1]
345 return value
346
347 def stepi(self):
348 output = self.command("stepi", timeout=60)
349 return output
350
351 def load(self):
352 output = self.command("load", timeout=6000)
353 assert "failed" not in output
354 assert "Transfer rate" in output
355
356 def b(self, location):
357 output = self.command("b %s" % location)
358 assert "not defined" not in output
359 assert "Breakpoint" in output
360 return output
361
362 def hbreak(self, location):
363 output = self.command("hbreak %s" % location)
364 assert "not defined" not in output
365 assert "Hardware assisted breakpoint" in output
366 return output
367
368 def threads(self):
369 output = self.command("info threads")
370 threads = []
371 for line in output.splitlines():
372 m = re.match(
373 r"[\s\*]*(\d+)\s*Thread (\d+)\s*\(Name: ([^\)]+)\s*(.*)",
374 line)
375 if m:
376 threads.append(Thread(*m.groups()))
377 if not threads:
378 threads.append(Thread('1', '1', 'Default', '???'))
379 return threads
380
381 def thread(self, thread):
382 return self.command("thread %s" % thread.id)
383
384 def run_all_tests(module, target, parsed):
385 if not os.path.exists(parsed.logs):
386 os.makedirs(parsed.logs)
387
388 overall_start = time.time()
389
390 global gdb_cmd # pylint: disable=global-statement
391 gdb_cmd = parsed.gdb
392
393 todo = []
394 if parsed.misaval:
395 target.misa = int(parsed.misaval, 16)
396 print "Using $misa from command line: 0x%x" % target.misa
397 elif target.misa:
398 print "Using $misa from target definition: 0x%x" % target.misa
399 else:
400 todo.append(("ExamineTarget", ExamineTarget))
401
402 for name in dir(module):
403 definition = getattr(module, name)
404 if type(definition) == type and hasattr(definition, 'test') and \
405 (not parsed.test or any(test in name for test in parsed.test)):
406 todo.append((name, definition))
407
408 results, count = run_tests(parsed, target, todo)
409
410 header("ran %d tests in %.0fs" % (count, time.time() - overall_start),
411 dash=':')
412
413 return print_results(results)
414
415 good_results = set(('pass', 'not_applicable'))
416 def run_tests(parsed, target, todo):
417 results = {}
418 count = 0
419
420 for name, definition in todo:
421 instance = definition(target)
422 log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
423 (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
424 log_fd = open(log_name, 'w')
425 print "Running %s > %s ..." % (name, log_name),
426 sys.stdout.flush()
427 log_fd.write("Test: %s\n" % name)
428 log_fd.write("Target: %s\n" % type(target).__name__)
429 start = time.time()
430 real_stdout = sys.stdout
431 sys.stdout = log_fd
432 try:
433 result = instance.run()
434 log_fd.write("Result: %s\n" % result)
435 finally:
436 sys.stdout = real_stdout
437 log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
438 print "%s in %.2fs" % (result, time.time() - start)
439 if result not in good_results and parsed.print_failures:
440 sys.stdout.write(open(log_name).read())
441 sys.stdout.flush()
442 results.setdefault(result, []).append((name, log_name))
443 count += 1
444 if result not in good_results and parsed.fail_fast:
445 break
446
447 return results, count
448
449 def print_results(results):
450 result = 0
451 for key, value in results.iteritems():
452 print "%d tests returned %s" % (len(value), key)
453 if key not in good_results:
454 result = 1
455 for name, log_name in value:
456 print " %s > %s" % (name, log_name)
457
458 return result
459
460 def add_test_run_options(parser):
461 parser.add_argument("--logs", default="logs",
462 help="Store logs in the specified directory.")
463 parser.add_argument("--fail-fast", "-f", action="store_true",
464 help="Exit as soon as any test fails.")
465 parser.add_argument("--print-failures", action="store_true",
466 help="When a test fails, print the log file to stdout.")
467 parser.add_argument("test", nargs='*',
468 help="Run only tests that are named here.")
469 parser.add_argument("--gdb",
470 help="The command to use to start gdb.")
471 parser.add_argument("--misaval",
472 help="Don't run ExamineTarget, just assume the misa value which is "
473 "specified.")
474
475 def header(title, dash='-', length=78):
476 if title:
477 dashes = dash * (length - 4 - len(title))
478 before = dashes[:len(dashes)/2]
479 after = dashes[len(dashes)/2:]
480 print "%s[ %s ]%s" % (before, title, after)
481 else:
482 print dash * length
483
484 def print_log(path):
485 header(path)
486 lines = open(path, "r").readlines()
487 for l in lines:
488 sys.stdout.write(l)
489 print
490
491 class BaseTest(object):
492 compiled = {}
493
494 def __init__(self, target):
495 self.target = target
496 self.server = None
497 self.target_process = None
498 self.binary = None
499 self.start = 0
500 self.logs = []
501
502 def early_applicable(self):
503 """Return a false value if the test has determined it cannot run
504 without ever needing to talk to the target or server."""
505 # pylint: disable=no-self-use
506 return True
507
508 def setup(self):
509 pass
510
511 def compile(self):
512 compile_args = getattr(self, 'compile_args', None)
513 if compile_args:
514 if compile_args not in BaseTest.compiled:
515 # pylint: disable=star-args
516 BaseTest.compiled[compile_args] = \
517 self.target.compile(*compile_args)
518 self.binary = BaseTest.compiled.get(compile_args)
519
520 def classSetup(self):
521 self.compile()
522 self.target_process = self.target.create()
523 if self.target_process:
524 self.logs.append(self.target_process.logname)
525 try:
526 self.server = self.target.server()
527 self.logs.append(self.server.logname)
528 except Exception:
529 for log in self.logs:
530 print_log(log)
531 raise
532
533 def classTeardown(self):
534 del self.server
535 del self.target_process
536
537 def run(self):
538 """
539 If compile_args is set, compile a program and set self.binary.
540
541 Call setup().
542
543 Then call test() and return the result, displaying relevant information
544 if an exception is raised.
545 """
546
547 sys.stdout.flush()
548
549 if not self.early_applicable():
550 return "not_applicable"
551
552 self.start = time.time()
553
554 try:
555 self.classSetup()
556 self.setup()
557 result = self.test() # pylint: disable=no-member
558 except TestNotApplicable:
559 result = "not_applicable"
560 except Exception as e: # pylint: disable=broad-except
561 if isinstance(e, TestFailed):
562 result = "fail"
563 else:
564 result = "exception"
565 if isinstance(e, TestFailed):
566 header("Message")
567 print e.message
568 header("Traceback")
569 traceback.print_exc(file=sys.stdout)
570 return result
571
572 finally:
573 for log in self.logs:
574 print_log(log)
575 header("End of logs")
576 self.classTeardown()
577
578 if not result:
579 result = 'pass'
580 return result
581
582 gdb_cmd = None
583 class GdbTest(BaseTest):
584 def __init__(self, target):
585 BaseTest.__init__(self, target)
586 self.gdb = None
587
588 def classSetup(self):
589 BaseTest.classSetup(self)
590
591 if gdb_cmd:
592 self.gdb = Gdb(gdb_cmd)
593 else:
594 self.gdb = Gdb()
595
596 self.logs.append(self.gdb.logname)
597
598 if self.binary:
599 self.gdb.command("file %s" % self.binary)
600 if self.target:
601 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
602 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
603 if self.server.port:
604 self.gdb.command(
605 "target extended-remote localhost:%d" % self.server.port)
606 # Select a random thread.
607 # TODO: Allow a command line option to force a specific thread.
608 thread = random.choice(self.gdb.threads())
609 self.gdb.thread(thread)
610
611 for cmd in self.target.gdb_setup:
612 self.gdb.command(cmd)
613
614 # FIXME: OpenOCD doesn't handle PRIV now
615 #self.gdb.p("$priv=3")
616
617 def classTeardown(self):
618 del self.gdb
619 BaseTest.classTeardown(self)
620
621 class ExamineTarget(GdbTest):
622 def test(self):
623 self.target.misa = self.gdb.p("$misa")
624
625 txt = "RV"
626 if (self.target.misa >> 30) == 1:
627 txt += "32"
628 elif (self.target.misa >> 62) == 2:
629 txt += "64"
630 elif (self.target.misa >> 126) == 3:
631 txt += "128"
632 else:
633 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
634 self.target.misa)
635
636 for i in range(26):
637 if self.target.misa & (1<<i):
638 txt += chr(i + ord('A'))
639 print txt,
640
641 class TestFailed(Exception):
642 def __init__(self, message):
643 Exception.__init__(self)
644 self.message = message
645
646 class TestNotApplicable(Exception):
647 def __init__(self, message):
648 Exception.__init__(self)
649 self.message = message
650
651 def assertEqual(a, b):
652 if a != b:
653 raise TestFailed("%r != %r" % (a, b))
654
655 def assertNotEqual(a, b):
656 if a == b:
657 raise TestFailed("%r == %r" % (a, b))
658
659 def assertIn(a, b):
660 if a not in b:
661 raise TestFailed("%r not in %r" % (a, b))
662
663 def assertNotIn(a, b):
664 if a in b:
665 raise TestFailed("%r in %r" % (a, b))
666
667 def assertGreater(a, b):
668 if not a > b:
669 raise TestFailed("%r not greater than %r" % (a, b))
670
671 def assertLess(a, b):
672 if not a < b:
673 raise TestFailed("%r not less than %r" % (a, b))
674
675 def assertTrue(a):
676 if not a:
677 raise TestFailed("%r is not True" % a)
678
679 def assertRegexpMatches(text, regexp):
680 if not re.search(regexp, text):
681 raise TestFailed("can't find %r in %r" % (regexp, text))