b5bd992234cd2252a30d2018076d71614d1459a1
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
23 cmd = [cc, "-g"]
24 if (xlen == 32):
25 cmd.append("-march=rv32imac")
26 cmd.append("-mabi=ilp32")
27 else:
28 cmd.append("-march=rv64imac")
29 cmd.append("-mabi=lp64")
30 for arg in args:
31 found = find_file(arg)
32 if found:
33 cmd.append(found)
34 else:
35 cmd.append(arg)
36 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
37 stderr=subprocess.PIPE)
38 stdout, stderr = process.communicate()
39 if process.returncode:
40 print
41 header("Compile failed")
42 print "+", " ".join(cmd)
43 print stdout,
44 print stderr,
45 header("")
46 raise Exception("Compile failed!")
47
48 def unused_port():
49 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
50 import socket
51 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52 s.bind(("", 0))
53 port = s.getsockname()[1]
54 s.close()
55 return port
56
57 class Spike(object):
58 logname = "spike.log"
59
60 def __init__(self, sim_cmd, binary=None, halted=False, with_jtag_gdb=True,
61 timeout=None, xlen=64):
62 """Launch spike. Return tuple of its process and the port it's running
63 on."""
64 if sim_cmd:
65 cmd = shlex.split(sim_cmd)
66 else:
67 spike = os.path.expandvars("$RISCV/bin/spike")
68 cmd = [spike]
69 if xlen == 32:
70 cmd += ["--isa", "RV32G"]
71 else:
72 cmd += ["--isa", "RV64G"]
73
74 if timeout:
75 cmd = ["timeout", str(timeout)] + cmd
76
77 cmd += ["-m0x10000000:0x10000000"]
78
79 if halted:
80 cmd.append('-H')
81 if with_jtag_gdb:
82 cmd += ['--rbb-port', '0']
83 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
84 cmd.append('programs/infinite_loop')
85 if binary:
86 cmd.append(binary)
87 logfile = open(self.logname, "w")
88 logfile.write("+ %s\n" % " ".join(cmd))
89 logfile.flush()
90 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
91 stdout=logfile, stderr=logfile)
92
93 if with_jtag_gdb:
94 self.port = None
95 for _ in range(30):
96 m = re.search(r"Listening for remote bitbang connection on "
97 r"port (\d+).", open(self.logname).read())
98 if m:
99 self.port = int(m.group(1))
100 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
101 break
102 time.sleep(0.11)
103 assert self.port, "Didn't get spike message about bitbang " \
104 "connection"
105
106 def __del__(self):
107 try:
108 self.process.kill()
109 self.process.wait()
110 except OSError:
111 pass
112
113 def wait(self, *args, **kwargs):
114 return self.process.wait(*args, **kwargs)
115
116 class VcsSim(object):
117 def __init__(self, sim_cmd=None, debug=False):
118 if sim_cmd:
119 cmd = shlex.split(sim_cmd)
120 else:
121 cmd = ["simv"]
122 cmd += ["+jtag_vpi_enable"]
123 if debug:
124 cmd[0] = cmd[0] + "-debug"
125 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
126 logfile = open("simv.log", "w")
127 logfile.write("+ %s\n" % " ".join(cmd))
128 logfile.flush()
129 listenfile = open("simv.log", "r")
130 listenfile.seek(0, 2)
131 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
132 stdout=logfile, stderr=logfile)
133 done = False
134 while not done:
135 line = listenfile.readline()
136 if not line:
137 time.sleep(1)
138 match = re.match(r"^Listening on port (\d+)$", line)
139 if match:
140 done = True
141 self.port = int(match.group(1))
142 os.environ['JTAG_VPI_PORT'] = str(self.port)
143
144 def __del__(self):
145 try:
146 self.process.kill()
147 self.process.wait()
148 except OSError:
149 pass
150
151 class Openocd(object):
152 logname = "openocd.log"
153
154 def __init__(self, server_cmd=None, config=None, debug=False):
155 if server_cmd:
156 cmd = shlex.split(server_cmd)
157 else:
158 openocd = os.path.expandvars("$RISCV/bin/riscv-openocd")
159 cmd = [openocd]
160 if (debug):
161 cmd.append("-d")
162
163 # This command needs to come before any config scripts on the command
164 # line, since they are executed in order.
165 cmd += [
166 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
167 "--command",
168 "gdb_port 0",
169 # Disable tcl and telnet servers, since they are unused and because
170 # the port numbers will conflict if multiple OpenOCD processes are
171 # running on the same server.
172 "--command",
173 "tcl_port disabled",
174 "--command",
175 "telnet_port disabled",
176 ]
177
178 if config:
179 f = find_file(config)
180 if f is None:
181 print("Unable to read file " + config)
182 exit(1)
183
184 cmd += ["-f", f]
185 if debug:
186 cmd.append("-d")
187
188 logfile = open(Openocd.logname, "w")
189 logfile.write("+ %s\n" % " ".join(cmd))
190 logfile.flush()
191 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
192 stdout=logfile, stderr=logfile)
193
194 # Wait for OpenOCD to have made it through riscv_examine(). When using
195 # OpenOCD to communicate with a simulator this may take a long time,
196 # and gdb will time out when trying to connect if we attempt too early.
197 start = time.time()
198 messaged = False
199 while True:
200 log = open(Openocd.logname).read()
201 if "Ready for Remote Connections" in log:
202 break
203 if not self.process.poll() is None:
204 raise Exception(
205 "OpenOCD exited before completing riscv_examine()")
206 if not messaged and time.time() - start > 1:
207 messaged = True
208 print "Waiting for OpenOCD to examine RISCV core..."
209
210 self.port = self._get_gdb_server_port()
211
212 def _get_gdb_server_port(self):
213 """Get port that OpenOCD's gdb server is listening on."""
214 MAX_ATTEMPTS = 50
215 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
216 for _ in range(MAX_ATTEMPTS):
217 with open(os.devnull, 'w') as devnull:
218 try:
219 output = subprocess.check_output([
220 'lsof',
221 '-a', # Take the AND of the following selectors
222 '-p{}'.format(self.process.pid), # Filter on PID
223 '-iTCP', # Filter only TCP sockets
224 ], stderr=devnull)
225 except subprocess.CalledProcessError:
226 output = ""
227 matches = list(PORT_REGEX.finditer(output))
228 matches = [m for m in matches
229 if m.group('port') not in ('6666', '4444')]
230 if len(matches) > 1:
231 print output
232 raise Exception(
233 "OpenOCD listening on multiple ports. Cannot uniquely "
234 "identify gdb server port.")
235 elif matches:
236 [match] = matches
237 return int(match.group('port'))
238 time.sleep(1)
239 raise Exception("Timed out waiting for gdb server to obtain port.")
240
241 def __del__(self):
242 try:
243 self.process.kill()
244 self.process.wait()
245 except OSError:
246 pass
247
248 class OpenocdCli(object):
249 def __init__(self, port=4444):
250 self.child = pexpect.spawn(
251 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
252 self.child.expect("> ")
253
254 def command(self, cmd):
255 self.child.sendline(cmd)
256 self.child.expect(cmd)
257 self.child.expect("\n")
258 self.child.expect("> ")
259 return self.child.before.strip("\t\r\n \0")
260
261 def reg(self, reg=''):
262 output = self.command("reg %s" % reg)
263 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
264 values = {r: int(v, 0) for r, v in matches}
265 if reg:
266 return values[reg]
267 return values
268
269 def load_image(self, image):
270 output = self.command("load_image %s" % image)
271 if 'invalid ELF file, only 32bits files are supported' in output:
272 raise TestNotApplicable(output)
273
274 class CannotAccess(Exception):
275 def __init__(self, address):
276 Exception.__init__(self)
277 self.address = address
278
279 class Gdb(object):
280 def __init__(self,
281 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
282 self.child = pexpect.spawn(cmd)
283 self.child.logfile = open("gdb.log", "w")
284 self.child.logfile.write("+ %s\n" % cmd)
285 self.wait()
286 self.command("set confirm off")
287 self.command("set width 0")
288 self.command("set height 0")
289 # Force consistency.
290 self.command("set print entry-values no")
291
292 def wait(self):
293 """Wait for prompt."""
294 self.child.expect(r"\(gdb\)")
295
296 def command(self, command, timeout=6000):
297 self.child.sendline(command)
298 self.child.expect("\n", timeout=timeout)
299 self.child.expect(r"\(gdb\)", timeout=timeout)
300 return self.child.before.strip()
301
302 def c(self, wait=True, timeout=-1):
303 if wait:
304 output = self.command("c", timeout=timeout)
305 assert "Continuing" in output
306 return output
307 else:
308 self.child.sendline("c")
309 self.child.expect("Continuing")
310
311 def interrupt(self):
312 self.child.send("\003")
313 self.child.expect(r"\(gdb\)", timeout=6000)
314 return self.child.before.strip()
315
316 def x(self, address, size='w'):
317 output = self.command("x/%s %s" % (size, address))
318 value = int(output.split(':')[1].strip(), 0)
319 return value
320
321 def p_raw(self, obj):
322 output = self.command("p %s" % obj)
323 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
324 if m:
325 raise CannotAccess(int(m.group(1), 0))
326 return output.split('=')[-1].strip()
327
328 def p(self, obj):
329 output = self.command("p/x %s" % obj)
330 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
331 if m:
332 raise CannotAccess(int(m.group(1), 0))
333 value = int(output.split('=')[-1].strip(), 0)
334 return value
335
336 def p_string(self, obj):
337 output = self.command("p %s" % obj)
338 value = shlex.split(output.split('=')[-1].strip())[1]
339 return value
340
341 def stepi(self):
342 output = self.command("stepi")
343 return output
344
345 def load(self):
346 output = self.command("load", timeout=6000)
347 assert "failed" not in output
348 assert "Transfer rate" in output
349
350 def b(self, location):
351 output = self.command("b %s" % location)
352 assert "not defined" not in output
353 assert "Breakpoint" in output
354 return output
355
356 def hbreak(self, location):
357 output = self.command("hbreak %s" % location)
358 assert "not defined" not in output
359 assert "Hardware assisted breakpoint" in output
360 return output
361
362 def run_all_tests(module, target, parsed):
363 good_results = set(('pass', 'not_applicable'))
364
365 start = time.time()
366
367 results = {}
368 count = 0
369
370 global gdb_cmd # pylint: disable=global-statement
371 gdb_cmd = parsed.gdb
372
373 todo = []
374 if (parsed.misaval):
375 target.misa = int(parsed.misaval, 16)
376 print "Assuming $MISA value of 0x%x. Skipping ExamineTarget." % target.misa
377 else:
378 todo.append(("ExamineTarget", ExamineTarget))
379
380 for name in dir(module):
381 definition = getattr(module, name)
382 if type(definition) == type and hasattr(definition, 'test') and \
383 (not parsed.test or any(test in name for test in parsed.test)):
384 todo.append((name, definition))
385
386 for name, definition in todo:
387 instance = definition(target)
388 result = instance.run()
389 results.setdefault(result, []).append(name)
390 count += 1
391 if result not in good_results and parsed.fail_fast:
392 break
393
394 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
395
396 result = 0
397 for key, value in results.iteritems():
398 print "%d tests returned %s" % (len(value), key)
399 if key not in good_results:
400 result = 1
401 for test in value:
402 print " ", test
403
404 return result
405
406 def add_test_run_options(parser):
407
408 parser.add_argument("--fail-fast", "-f", action="store_true",
409 help="Exit as soon as any test fails.")
410 parser.add_argument("test", nargs='*',
411 help="Run only tests that are named here.")
412 parser.add_argument("--gdb",
413 help="The command to use to start gdb.")
414 parser.add_argument("--misaval",
415 help="Don't run ExamineTarget, just assume the misa value which is specified.")
416
417 def header(title, dash='-'):
418 if title:
419 dashes = dash * (36 - len(title))
420 before = dashes[:len(dashes)/2]
421 after = dashes[len(dashes)/2:]
422 print "%s[ %s ]%s" % (before, title, after)
423 else:
424 print dash * 40
425
426 def print_log(path):
427 header(path)
428 lines = open(path, "r").readlines()
429 if len(lines) > 1000:
430 for l in lines[:500]:
431 sys.stdout.write(l)
432 print "..."
433 for l in lines[-500:]:
434 sys.stdout.write(l)
435 else:
436 for l in lines:
437 sys.stdout.write(l)
438
439 class BaseTest(object):
440 compiled = {}
441
442 def __init__(self, target):
443 self.target = target
444 self.server = None
445 self.target_process = None
446 self.binary = None
447 self.start = 0
448 self.logs = []
449
450 def early_applicable(self):
451 """Return a false value if the test has determined it cannot run
452 without ever needing to talk to the target or server."""
453 # pylint: disable=no-self-use
454 return True
455
456 def setup(self):
457 pass
458
459 def compile(self):
460 compile_args = getattr(self, 'compile_args', None)
461 if compile_args:
462 if compile_args not in BaseTest.compiled:
463 # pylint: disable=star-args
464 BaseTest.compiled[compile_args] = \
465 self.target.compile(*compile_args)
466 self.binary = BaseTest.compiled.get(compile_args)
467
468 def classSetup(self):
469 self.compile()
470 self.target_process = self.target.target()
471 self.server = self.target.server()
472 self.logs.append(self.server.logname)
473
474 def classTeardown(self):
475 del self.server
476 del self.target_process
477
478 def run(self):
479 """
480 If compile_args is set, compile a program and set self.binary.
481
482 Call setup().
483
484 Then call test() and return the result, displaying relevant information
485 if an exception is raised.
486 """
487
488 print "Running", type(self).__name__, "...",
489 sys.stdout.flush()
490
491 if not self.early_applicable():
492 print "not_applicable"
493 return "not_applicable"
494
495 self.start = time.time()
496
497 self.classSetup()
498
499 try:
500 self.setup()
501 result = self.test() # pylint: disable=no-member
502 except TestNotApplicable:
503 result = "not_applicable"
504 except Exception as e: # pylint: disable=broad-except
505 if isinstance(e, TestFailed):
506 result = "fail"
507 else:
508 result = "exception"
509 print "%s in %.2fs" % (result, time.time() - self.start)
510 print "=" * 40
511 if isinstance(e, TestFailed):
512 header("Message")
513 print e.message
514 header("Traceback")
515 traceback.print_exc(file=sys.stdout)
516 for log in self.logs:
517 print_log(log)
518 print "/" * 40
519 return result
520
521 finally:
522 self.classTeardown()
523
524 if not result:
525 result = 'pass'
526 print "%s in %.2fs" % (result, time.time() - self.start)
527 return result
528
529 gdb_cmd = None
530 class GdbTest(BaseTest):
531 def __init__(self, target):
532 BaseTest.__init__(self, target)
533 self.gdb = None
534
535 def classSetup(self):
536 BaseTest.classSetup(self)
537 self.logs.append("gdb.log")
538
539 if gdb_cmd:
540 self.gdb = Gdb(gdb_cmd)
541 else:
542 self.gdb = Gdb()
543
544 if self.binary:
545 self.gdb.command("file %s" % self.binary)
546 if self.target:
547 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
548 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
549 if self.server.port:
550 self.gdb.command(
551 "target extended-remote localhost:%d" % self.server.port)
552
553 # FIXME: OpenOCD doesn't handle PRIV now
554 #self.gdb.p("$priv=3")
555
556 def classTeardown(self):
557 del self.gdb
558 BaseTest.classTeardown(self)
559
560 class ExamineTarget(GdbTest):
561 def test(self):
562 self.target.misa = self.gdb.p("$misa")
563
564 txt = "RV"
565 if (self.target.misa >> 30) == 1:
566 txt += "32"
567 elif (self.target.misa >> 62) == 2:
568 txt += "64"
569 elif (self.target.misa >> 126) == 3:
570 txt += "128"
571 else:
572 txt += "??"
573
574 for i in range(26):
575 if self.target.misa & (1<<i):
576 txt += chr(i + ord('A'))
577 print txt,
578
579 class TestFailed(Exception):
580 def __init__(self, message):
581 Exception.__init__(self)
582 self.message = message
583
584 class TestNotApplicable(Exception):
585 def __init__(self, message):
586 Exception.__init__(self)
587 self.message = message
588
589 def assertEqual(a, b):
590 if a != b:
591 raise TestFailed("%r != %r" % (a, b))
592
593 def assertNotEqual(a, b):
594 if a == b:
595 raise TestFailed("%r == %r" % (a, b))
596
597 def assertIn(a, b):
598 if a not in b:
599 raise TestFailed("%r not in %r" % (a, b))
600
601 def assertNotIn(a, b):
602 if a in b:
603 raise TestFailed("%r in %r" % (a, b))
604
605 def assertGreater(a, b):
606 if not a > b:
607 raise TestFailed("%r not greater than %r" % (a, b))
608
609 def assertLess(a, b):
610 if not a < b:
611 raise TestFailed("%r not less than %r" % (a, b))
612
613 def assertTrue(a):
614 if not a:
615 raise TestFailed("%r is not True" % a)
616
617 def assertRegexpMatches(text, regexp):
618 if not re.search(regexp, text):
619 raise TestFailed("can't find %r in %r" % (regexp, text))