9ca97da68485c628d9d25cccab38728019627b3d
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
23 cmd = [cc, "-g"]
24 if (xlen == 32):
25 cmd.append("-march=rv32imac")
26 cmd.append("-mabi=ilp32")
27 else:
28 cmd.append("-march=rv64imac")
29 cmd.append("-mabi=lp64")
30 for arg in args:
31 found = find_file(arg)
32 if found:
33 cmd.append(found)
34 else:
35 cmd.append(arg)
36 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
37 stderr=subprocess.PIPE)
38 stdout, stderr = process.communicate()
39 if process.returncode:
40 print
41 header("Compile failed")
42 print "+", " ".join(cmd)
43 print stdout,
44 print stderr,
45 header("")
46 raise Exception("Compile failed!")
47
48 def unused_port():
49 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
50 import socket
51 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52 s.bind(("", 0))
53 port = s.getsockname()[1]
54 s.close()
55 return port
56
57 class Spike(object):
58 logname = "spike.log"
59
60 def __init__(self, sim_cmd, binary=None, halted=False, with_jtag_gdb=True,
61 timeout=None, xlen=64):
62 """Launch spike. Return tuple of its process and the port it's running
63 on."""
64 if sim_cmd:
65 cmd = shlex.split(sim_cmd)
66 else:
67 spike = os.path.expandvars("$RISCV/bin/spike")
68 cmd = [spike]
69 if xlen == 32:
70 cmd += ["--isa", "RV32G"]
71 else:
72 cmd += ["--isa", "RV64G"]
73
74 if timeout:
75 cmd = ["timeout", str(timeout)] + cmd
76
77 cmd += ["-m0x10000000:0x10000000"]
78
79 if halted:
80 cmd.append('-H')
81 if with_jtag_gdb:
82 cmd += ['--rbb-port', '0']
83 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
84 cmd.append('programs/infinite_loop')
85 if binary:
86 cmd.append(binary)
87 logfile = open(self.logname, "w")
88 logfile.write("+ %s\n" % " ".join(cmd))
89 logfile.flush()
90 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
91 stdout=logfile, stderr=logfile)
92
93 if with_jtag_gdb:
94 self.port = None
95 for _ in range(30):
96 m = re.search(r"Listening for remote bitbang connection on "
97 r"port (\d+).", open(self.logname).read())
98 if m:
99 self.port = int(m.group(1))
100 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
101 break
102 time.sleep(0.11)
103 assert self.port, "Didn't get spike message about bitbang " \
104 "connection"
105
106 def __del__(self):
107 try:
108 self.process.kill()
109 self.process.wait()
110 except OSError:
111 pass
112
113 def wait(self, *args, **kwargs):
114 return self.process.wait(*args, **kwargs)
115
116 class VcsSim(object):
117 def __init__(self, sim_cmd=None, debug=False):
118 if sim_cmd:
119 cmd = shlex.split(sim_cmd)
120 else:
121 cmd = ["simv"]
122 cmd += ["+jtag_vpi_enable"]
123 if debug:
124 cmd[0] = cmd[0] + "-debug"
125 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
126 logfile = open("simv.log", "w")
127 logfile.write("+ %s\n" % " ".join(cmd))
128 logfile.flush()
129 listenfile = open("simv.log", "r")
130 listenfile.seek(0, 2)
131 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
132 stdout=logfile, stderr=logfile)
133 done = False
134 while not done:
135 line = listenfile.readline()
136 if not line:
137 time.sleep(1)
138 match = re.match(r"^Listening on port (\d+)$", line)
139 if match:
140 done = True
141 self.port = int(match.group(1))
142 os.environ['JTAG_VPI_PORT'] = str(self.port)
143
144 def __del__(self):
145 try:
146 self.process.kill()
147 self.process.wait()
148 except OSError:
149 pass
150
151 class Openocd(object):
152 logname = "openocd.log"
153
154 def __init__(self, server_cmd=None, config=None, debug=False):
155 if server_cmd:
156 cmd = shlex.split(server_cmd)
157 else:
158 openocd = os.path.expandvars("$RISCV/bin/riscv-openocd")
159 cmd = [openocd]
160 if (debug):
161 cmd.append("-d")
162
163 # This command needs to come before any config scripts on the command
164 # line, since they are executed in order.
165 cmd += [
166 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
167 "--command",
168 "gdb_port 0",
169 # Disable tcl and telnet servers, since they are unused and because
170 # the port numbers will conflict if multiple OpenOCD processes are
171 # running on the same server.
172 "--command",
173 "tcl_port disabled",
174 "--command",
175 "telnet_port disabled",
176 ]
177
178 if config:
179 f = find_file(config)
180 if f is None:
181 print("Unable to read file " + config)
182 exit(1)
183
184 cmd += ["-f", f]
185 if debug:
186 cmd.append("-d")
187
188 logfile = open(Openocd.logname, "w")
189 logfile.write("+ %s\n" % " ".join(cmd))
190 logfile.flush()
191 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
192 stdout=logfile, stderr=logfile)
193
194 # Wait for OpenOCD to have made it through riscv_examine(). When using
195 # OpenOCD to communicate with a simulator this may take a long time,
196 # and gdb will time out when trying to connect if we attempt too early.
197 start = time.time()
198 messaged = False
199 while True:
200 log = open(Openocd.logname).read()
201 if "Ready for Remote Connections" in log:
202 break
203 if not self.process.poll() is None:
204 raise Exception(
205 "OpenOCD exited before completing riscv_examine()")
206 if not messaged and time.time() - start > 1:
207 messaged = True
208 print "Waiting for OpenOCD to examine RISCV core..."
209
210 self.port = self._get_gdb_server_port()
211
212 def _get_gdb_server_port(self):
213 """Get port that OpenOCD's gdb server is listening on."""
214 MAX_ATTEMPTS = 50
215 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
216 for _ in range(MAX_ATTEMPTS):
217 with open(os.devnull, 'w') as devnull:
218 try:
219 output = subprocess.check_output([
220 'lsof',
221 '-a', # Take the AND of the following selectors
222 '-p{}'.format(self.process.pid), # Filter on PID
223 '-iTCP', # Filter only TCP sockets
224 ], stderr=devnull)
225 except subprocess.CalledProcessError:
226 output = ""
227 matches = list(PORT_REGEX.finditer(output))
228 matches = [m for m in matches
229 if m.group('port') not in ('6666', '4444')]
230 if len(matches) > 1:
231 print output
232 raise Exception(
233 "OpenOCD listening on multiple ports. Cannot uniquely "
234 "identify gdb server port.")
235 elif matches:
236 [match] = matches
237 return int(match.group('port'))
238 time.sleep(1)
239 raise Exception("Timed out waiting for gdb server to obtain port.")
240
241 def __del__(self):
242 try:
243 self.process.kill()
244 self.process.wait()
245 except OSError:
246 pass
247
248 class OpenocdCli(object):
249 def __init__(self, port=4444):
250 self.child = pexpect.spawn(
251 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
252 self.child.expect("> ")
253
254 def command(self, cmd):
255 self.child.sendline(cmd)
256 self.child.expect(cmd)
257 self.child.expect("\n")
258 self.child.expect("> ")
259 return self.child.before.strip("\t\r\n \0")
260
261 def reg(self, reg=''):
262 output = self.command("reg %s" % reg)
263 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
264 values = {r: int(v, 0) for r, v in matches}
265 if reg:
266 return values[reg]
267 return values
268
269 def load_image(self, image):
270 output = self.command("load_image %s" % image)
271 if 'invalid ELF file, only 32bits files are supported' in output:
272 raise TestNotApplicable(output)
273
274 class CannotAccess(Exception):
275 def __init__(self, address):
276 Exception.__init__(self)
277 self.address = address
278
279 class Gdb(object):
280 def __init__(self,
281 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
282 self.child = pexpect.spawn(cmd)
283 self.child.logfile = open("gdb.log", "w")
284 self.child.logfile.write("+ %s\n" % cmd)
285 self.wait()
286 self.command("set confirm off")
287 self.command("set width 0")
288 self.command("set height 0")
289 # Force consistency.
290 self.command("set print entry-values no")
291
292 def wait(self):
293 """Wait for prompt."""
294 self.child.expect(r"\(gdb\)")
295
296 def command(self, command, timeout=6000):
297 self.child.sendline(command)
298 self.child.expect("\n", timeout=timeout)
299 self.child.expect(r"\(gdb\)", timeout=timeout)
300 return self.child.before.strip()
301
302 def c(self, wait=True, timeout=-1):
303 if wait:
304 output = self.command("c", timeout=timeout)
305 assert "Continuing" in output
306 return output
307 else:
308 self.child.sendline("c")
309 self.child.expect("Continuing")
310
311 def interrupt(self):
312 self.child.send("\003")
313 self.child.expect(r"\(gdb\)", timeout=6000)
314 return self.child.before.strip()
315
316 def x(self, address, size='w'):
317 output = self.command("x/%s %s" % (size, address))
318 value = int(output.split(':')[1].strip(), 0)
319 return value
320
321 def p_raw(self, obj):
322 output = self.command("p %s" % obj)
323 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
324 if m:
325 raise CannotAccess(int(m.group(1), 0))
326 return output.split('=')[-1].strip()
327
328 def p(self, obj):
329 output = self.command("p/x %s" % obj)
330 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
331 if m:
332 raise CannotAccess(int(m.group(1), 0))
333 value = int(output.split('=')[-1].strip(), 0)
334 return value
335
336 def p_string(self, obj):
337 output = self.command("p %s" % obj)
338 value = shlex.split(output.split('=')[-1].strip())[1]
339 return value
340
341 def stepi(self):
342 output = self.command("stepi")
343 return output
344
345 def load(self):
346 output = self.command("load", timeout=6000)
347 assert "failed" not in output
348 assert "Transfer rate" in output
349
350 def b(self, location):
351 output = self.command("b %s" % location)
352 assert "not defined" not in output
353 assert "Breakpoint" in output
354 return output
355
356 def hbreak(self, location):
357 output = self.command("hbreak %s" % location)
358 assert "not defined" not in output
359 assert "Hardware assisted breakpoint" in output
360 return output
361
362 def run_all_tests(module, target, parsed):
363 good_results = set(('pass', 'not_applicable'))
364
365 start = time.time()
366
367 results = {}
368 count = 0
369
370 global gdb_cmd # pylint: disable=global-statement
371 gdb_cmd = parsed.gdb
372
373 todo = []
374 if (parsed.misaval):
375 target.misa = int(parsed.misaval, 16)
376 print "Assuming $MISA value of 0x%x. Skipping ExamineTarget." % target.misa
377 else:
378 todo.append(("ExamineTarget", ExamineTarget))
379
380 for name in dir(module):
381 definition = getattr(module, name)
382 if type(definition) == type and hasattr(definition, 'test') and \
383 (not parsed.test or any(test in name for test in parsed.test)):
384 todo.append((name, definition))
385
386 for name, definition in todo:
387 instance = definition(target)
388 result = instance.run()
389 results.setdefault(result, []).append(name)
390 count += 1
391 if result not in good_results and parsed.fail_fast:
392 break
393
394 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
395
396 result = 0
397 for key, value in results.iteritems():
398 print "%d tests returned %s" % (len(value), key)
399 if key not in good_results:
400 result = 1
401 for test in value:
402 print " ", test
403
404 return result
405
406 def auto_int (x) :
407 return int(x, 0)
408
409 def add_test_run_options(parser):
410
411 parser.add_argument("--fail-fast", "-f", action="store_true",
412 help="Exit as soon as any test fails.")
413 parser.add_argument("test", nargs='*',
414 help="Run only tests that are named here.")
415 parser.add_argument("--gdb",
416 help="The command to use to start gdb.")
417 parser.add_argument("--misaval",
418 help="Don't run ExamineTarget, just assume the misa value which is specified.")
419
420 def header(title, dash='-'):
421 if title:
422 dashes = dash * (36 - len(title))
423 before = dashes[:len(dashes)/2]
424 after = dashes[len(dashes)/2:]
425 print "%s[ %s ]%s" % (before, title, after)
426 else:
427 print dash * 40
428
429 def print_log(path):
430 header(path)
431 lines = open(path, "r").readlines()
432 if len(lines) > 1000:
433 for l in lines[:500]:
434 sys.stdout.write(l)
435 print "..."
436 for l in lines[-500:]:
437 sys.stdout.write(l)
438 else:
439 for l in lines:
440 sys.stdout.write(l)
441
442 class BaseTest(object):
443 compiled = {}
444
445 def __init__(self, target):
446 self.target = target
447 self.server = None
448 self.target_process = None
449 self.binary = None
450 self.start = 0
451 self.logs = []
452
453 def early_applicable(self):
454 """Return a false value if the test has determined it cannot run
455 without ever needing to talk to the target or server."""
456 # pylint: disable=no-self-use
457 return True
458
459 def setup(self):
460 pass
461
462 def compile(self):
463 compile_args = getattr(self, 'compile_args', None)
464 if compile_args:
465 if compile_args not in BaseTest.compiled:
466 # pylint: disable=star-args
467 BaseTest.compiled[compile_args] = \
468 self.target.compile(*compile_args)
469 self.binary = BaseTest.compiled.get(compile_args)
470
471 def classSetup(self):
472 self.compile()
473 self.target_process = self.target.target()
474 self.server = self.target.server()
475 self.logs.append(self.server.logname)
476
477 def classTeardown(self):
478 del self.server
479 del self.target_process
480
481 def run(self):
482 """
483 If compile_args is set, compile a program and set self.binary.
484
485 Call setup().
486
487 Then call test() and return the result, displaying relevant information
488 if an exception is raised.
489 """
490
491 print "Running", type(self).__name__, "...",
492 sys.stdout.flush()
493
494 if not self.early_applicable():
495 print "not_applicable"
496 return "not_applicable"
497
498 self.start = time.time()
499
500 self.classSetup()
501
502 try:
503 self.setup()
504 result = self.test() # pylint: disable=no-member
505 except TestNotApplicable:
506 result = "not_applicable"
507 except Exception as e: # pylint: disable=broad-except
508 if isinstance(e, TestFailed):
509 result = "fail"
510 else:
511 result = "exception"
512 print "%s in %.2fs" % (result, time.time() - self.start)
513 print "=" * 40
514 if isinstance(e, TestFailed):
515 header("Message")
516 print e.message
517 header("Traceback")
518 traceback.print_exc(file=sys.stdout)
519 for log in self.logs:
520 print_log(log)
521 print "/" * 40
522 return result
523
524 finally:
525 self.classTeardown()
526
527 if not result:
528 result = 'pass'
529 print "%s in %.2fs" % (result, time.time() - self.start)
530 return result
531
532 gdb_cmd = None
533 class GdbTest(BaseTest):
534 def __init__(self, target):
535 BaseTest.__init__(self, target)
536 self.gdb = None
537
538 def classSetup(self):
539 BaseTest.classSetup(self)
540 self.logs.append("gdb.log")
541
542 if gdb_cmd:
543 self.gdb = Gdb(gdb_cmd)
544 else:
545 self.gdb = Gdb()
546
547 if self.binary:
548 self.gdb.command("file %s" % self.binary)
549 if self.target:
550 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
551 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
552 if self.server.port:
553 self.gdb.command(
554 "target extended-remote localhost:%d" % self.server.port)
555
556 # FIXME: OpenOCD doesn't handle PRIV now
557 #self.gdb.p("$priv=3")
558
559 def classTeardown(self):
560 del self.gdb
561 BaseTest.classTeardown(self)
562
563 class ExamineTarget(GdbTest):
564 def test(self):
565 self.target.misa = self.gdb.p("$misa")
566
567 txt = "RV"
568 if (self.target.misa >> 30) == 1:
569 txt += "32"
570 elif (self.target.misa >> 62) == 2:
571 txt += "64"
572 elif (self.target.misa >> 126) == 3:
573 txt += "128"
574 else:
575 txt += "??"
576
577 for i in range(26):
578 if self.target.misa & (1<<i):
579 txt += chr(i + ord('A'))
580 print txt,
581
582 class TestFailed(Exception):
583 def __init__(self, message):
584 Exception.__init__(self)
585 self.message = message
586
587 class TestNotApplicable(Exception):
588 def __init__(self, message):
589 Exception.__init__(self)
590 self.message = message
591
592 def assertEqual(a, b):
593 if a != b:
594 raise TestFailed("%r != %r" % (a, b))
595
596 def assertNotEqual(a, b):
597 if a == b:
598 raise TestFailed("%r == %r" % (a, b))
599
600 def assertIn(a, b):
601 if a not in b:
602 raise TestFailed("%r not in %r" % (a, b))
603
604 def assertNotIn(a, b):
605 if a in b:
606 raise TestFailed("%r in %r" % (a, b))
607
608 def assertGreater(a, b):
609 if not a > b:
610 raise TestFailed("%r not greater than %r" % (a, b))
611
612 def assertLess(a, b):
613 if not a < b:
614 raise TestFailed("%r not less than %r" % (a, b))
615
616 def assertTrue(a):
617 if not a:
618 raise TestFailed("%r is not True" % a)
619
620 def assertRegexpMatches(text, regexp):
621 if not re.search(regexp, text):
622 raise TestFailed("can't find %r in %r" % (regexp, text))