Merge remote-tracking branch 'origin/newprogram' into debug-0.13
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
23 cmd = [cc, "-g"]
24 if (xlen == 32):
25 cmd.append("-march=rv32imac")
26 cmd.append("-mabi=ilp32")
27 else:
28 cmd.append("-march=rv64imac")
29 cmd.append("-mabi=lp64")
30 for arg in args:
31 found = find_file(arg)
32 if found:
33 cmd.append(found)
34 else:
35 cmd.append(arg)
36 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
37 stderr=subprocess.PIPE)
38 stdout, stderr = process.communicate()
39 if process.returncode:
40 print
41 header("Compile failed")
42 print "+", " ".join(cmd)
43 print stdout,
44 print stderr,
45 header("")
46 raise Exception("Compile failed!")
47
48 def unused_port():
49 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
50 import socket
51 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52 s.bind(("", 0))
53 port = s.getsockname()[1]
54 s.close()
55 return port
56
57 class Spike(object):
58 logname = "spike.log"
59
60 def __init__(self, cmd, binary=None, halted=False, with_jtag_gdb=True,
61 timeout=None, xlen=64):
62 """Launch spike. Return tuple of its process and the port it's running
63 on."""
64 if cmd:
65 cmd = shlex.split(cmd)
66 else:
67 cmd = ["spike"]
68 if xlen == 32:
69 cmd += ["--isa", "RV32"]
70
71 if timeout:
72 cmd = ["timeout", str(timeout)] + cmd
73
74 if halted:
75 cmd.append('-H')
76 if with_jtag_gdb:
77 cmd += ['--rbb-port', '0']
78 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
79 cmd.append("-m32")
80 cmd.append('pk')
81 if binary:
82 cmd.append(binary)
83 logfile = open(self.logname, "w")
84 logfile.write("+ %s\n" % " ".join(cmd))
85 logfile.flush()
86 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
87 stdout=logfile, stderr=logfile)
88
89 if with_jtag_gdb:
90 self.port = None
91 for _ in range(30):
92 m = re.search(r"Listening for remote bitbang connection on "
93 r"port (\d+).", open(self.logname).read())
94 if m:
95 self.port = int(m.group(1))
96 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
97 break
98 time.sleep(0.11)
99 assert self.port, "Didn't get spike message about bitbang " \
100 "connection"
101
102 def __del__(self):
103 try:
104 self.process.kill()
105 self.process.wait()
106 except OSError:
107 pass
108
109 def wait(self, *args, **kwargs):
110 return self.process.wait(*args, **kwargs)
111
112 class VcsSim(object):
113 def __init__(self, simv=None, debug=False):
114 if simv:
115 cmd = shlex.split(simv)
116 else:
117 cmd = ["simv"]
118 cmd += ["+jtag_vpi_enable"]
119 if debug:
120 cmd[0] = cmd[0] + "-debug"
121 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
122 logfile = open("simv.log", "w")
123 logfile.write("+ %s\n" % " ".join(cmd))
124 logfile.flush()
125 listenfile = open("simv.log", "r")
126 listenfile.seek(0, 2)
127 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
128 stdout=logfile, stderr=logfile)
129 done = False
130 while not done:
131 line = listenfile.readline()
132 if not line:
133 time.sleep(1)
134 match = re.match(r"^Listening on port (\d+)$", line)
135 if match:
136 done = True
137 self.port = int(match.group(1))
138 os.environ['JTAG_VPI_PORT'] = str(self.port)
139
140 def __del__(self):
141 try:
142 self.process.kill()
143 self.process.wait()
144 except OSError:
145 pass
146
147 class Openocd(object):
148 logname = "openocd.log"
149
150 def __init__(self, cmd=None, config=None, debug=False):
151 if cmd:
152 cmd = shlex.split(cmd)
153 else:
154 cmd = ["openocd", "-d"]
155
156 # This command needs to come before any config scripts on the command
157 # line, since they are executed in order.
158 cmd += [
159 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
160 "--command",
161 "gdb_port 0",
162 # Disable tcl and telnet servers, since they are unused and because
163 # the port numbers will conflict if multiple OpenOCD processes are
164 # running on the same server.
165 "--command",
166 "tcl_port disabled",
167 "--command",
168 "telnet_port disabled",
169 ]
170
171 if config:
172 f = find_file(config)
173 if f is None:
174 print("Unable to read file " + config)
175 exit(1)
176
177 cmd += ["-f", f]
178 if debug:
179 cmd.append("-d")
180
181 logfile = open(Openocd.logname, "w")
182 logfile.write("+ %s\n" % " ".join(cmd))
183 logfile.flush()
184 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
185 stdout=logfile, stderr=logfile)
186
187 # Wait for OpenOCD to have made it through riscv_examine(). When using
188 # OpenOCD to communicate with a simulator this may take a long time,
189 # and gdb will time out when trying to connect if we attempt too early.
190 start = time.time()
191 messaged = False
192 while True:
193 log = open(Openocd.logname).read()
194 if "OK GO NOW" in log:
195 break
196 if not self.process.poll() is None:
197 raise Exception(
198 "OpenOCD exited before completing riscv_examine()")
199 if not messaged and time.time() - start > 1:
200 messaged = True
201 print "Waiting for OpenOCD to examine RISCV core..."
202
203 self.port = self._get_gdb_server_port()
204
205 def _get_gdb_server_port(self):
206 """Get port that OpenOCD's gdb server is listening on."""
207 MAX_ATTEMPTS = 50
208 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
209 for _ in range(MAX_ATTEMPTS):
210 with open(os.devnull, 'w') as devnull:
211 try:
212 output = subprocess.check_output([
213 'lsof',
214 '-a', # Take the AND of the following selectors
215 '-p{}'.format(self.process.pid), # Filter on PID
216 '-iTCP', # Filter only TCP sockets
217 ], stderr=devnull)
218 except subprocess.CalledProcessError:
219 output = ""
220 matches = list(PORT_REGEX.finditer(output))
221 matches = [m for m in matches
222 if m.group('port') not in ('6666', '4444')]
223 if len(matches) > 1:
224 print output
225 raise Exception(
226 "OpenOCD listening on multiple ports. Cannot uniquely "
227 "identify gdb server port.")
228 elif matches:
229 [match] = matches
230 return int(match.group('port'))
231 time.sleep(1)
232 raise Exception("Timed out waiting for gdb server to obtain port.")
233
234 def __del__(self):
235 try:
236 self.process.kill()
237 self.process.wait()
238 except OSError:
239 pass
240
241 class OpenocdCli(object):
242 def __init__(self, port=4444):
243 self.child = pexpect.spawn(
244 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
245 self.child.expect("> ")
246
247 def command(self, cmd):
248 self.child.sendline(cmd)
249 self.child.expect(cmd)
250 self.child.expect("\n")
251 self.child.expect("> ")
252 return self.child.before.strip("\t\r\n \0")
253
254 def reg(self, reg=''):
255 output = self.command("reg %s" % reg)
256 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
257 values = {r: int(v, 0) for r, v in matches}
258 if reg:
259 return values[reg]
260 return values
261
262 def load_image(self, image):
263 output = self.command("load_image %s" % image)
264 if 'invalid ELF file, only 32bits files are supported' in output:
265 raise TestNotApplicable(output)
266
267 class CannotAccess(Exception):
268 def __init__(self, address):
269 Exception.__init__(self)
270 self.address = address
271
272 class Gdb(object):
273 def __init__(self,
274 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
275 self.child = pexpect.spawn(cmd)
276 self.child.logfile = open("gdb.log", "w")
277 self.child.logfile.write("+ %s\n" % cmd)
278 self.wait()
279 self.command("set confirm off")
280 self.command("set width 0")
281 self.command("set height 0")
282 # Force consistency.
283 self.command("set print entry-values no")
284
285 def wait(self):
286 """Wait for prompt."""
287 self.child.expect(r"\(gdb\)")
288
289 def command(self, command, timeout=6000):
290 self.child.sendline(command)
291 self.child.expect("\n", timeout=timeout)
292 self.child.expect(r"\(gdb\)", timeout=timeout)
293 return self.child.before.strip()
294
295 def c(self, wait=True, timeout=-1):
296 if wait:
297 output = self.command("c", timeout=timeout)
298 assert "Continuing" in output
299 return output
300 else:
301 self.child.sendline("c")
302 self.child.expect("Continuing")
303
304 def interrupt(self):
305 self.child.send("\003")
306 self.child.expect(r"\(gdb\)", timeout=6000)
307 return self.child.before.strip()
308
309 def x(self, address, size='w'):
310 output = self.command("x/%s %s" % (size, address))
311 value = int(output.split(':')[1].strip(), 0)
312 return value
313
314 def p_raw(self, obj):
315 output = self.command("p %s" % obj)
316 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
317 if m:
318 raise CannotAccess(int(m.group(1), 0))
319 return output.split('=')[-1].strip()
320
321 def p(self, obj):
322 output = self.command("p/x %s" % obj)
323 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
324 if m:
325 raise CannotAccess(int(m.group(1), 0))
326 value = int(output.split('=')[-1].strip(), 0)
327 return value
328
329 def p_string(self, obj):
330 output = self.command("p %s" % obj)
331 value = shlex.split(output.split('=')[-1].strip())[1]
332 return value
333
334 def stepi(self):
335 output = self.command("stepi")
336 return output
337
338 def load(self):
339 output = self.command("load", timeout=6000)
340 assert "failed" not in output
341 assert "Transfer rate" in output
342
343 def b(self, location):
344 output = self.command("b %s" % location)
345 assert "not defined" not in output
346 assert "Breakpoint" in output
347 return output
348
349 def hbreak(self, location):
350 output = self.command("hbreak %s" % location)
351 assert "not defined" not in output
352 assert "Hardware assisted breakpoint" in output
353 return output
354
355 def run_all_tests(module, target, parsed):
356 good_results = set(('pass', 'not_applicable'))
357
358 start = time.time()
359
360 results = {}
361 count = 0
362
363 global gdb_cmd # pylint: disable=global-statement
364 gdb_cmd = parsed.gdb
365
366 todo = [("ExamineTarget", ExamineTarget)]
367 for name in dir(module):
368 definition = getattr(module, name)
369 if type(definition) == type and hasattr(definition, 'test') and \
370 (not parsed.test or any(test in name for test in parsed.test)):
371 todo.append((name, definition))
372
373 for name, definition in todo:
374 instance = definition(target)
375 result = instance.run()
376 results.setdefault(result, []).append(name)
377 count += 1
378 if result not in good_results and parsed.fail_fast:
379 break
380
381 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
382
383 result = 0
384 for key, value in results.iteritems():
385 print "%d tests returned %s" % (len(value), key)
386 if key not in good_results:
387 result = 1
388 for test in value:
389 print " ", test
390
391 return result
392
393 def add_test_run_options(parser):
394 parser.add_argument("--fail-fast", "-f", action="store_true",
395 help="Exit as soon as any test fails.")
396 parser.add_argument("test", nargs='*',
397 help="Run only tests that are named here.")
398 parser.add_argument("--gdb",
399 help="The command to use to start gdb.")
400
401 def header(title, dash='-'):
402 if title:
403 dashes = dash * (36 - len(title))
404 before = dashes[:len(dashes)/2]
405 after = dashes[len(dashes)/2:]
406 print "%s[ %s ]%s" % (before, title, after)
407 else:
408 print dash * 40
409
410 def print_log(path):
411 header(path)
412 lines = open(path, "r").readlines()
413 if len(lines) > 1000:
414 for l in lines[:500]:
415 sys.stdout.write(l)
416 print "..."
417 for l in lines[-500:]:
418 sys.stdout.write(l)
419 else:
420 for l in lines:
421 sys.stdout.write(l)
422
423 class BaseTest(object):
424 compiled = {}
425
426 def __init__(self, target):
427 self.target = target
428 self.server = None
429 self.target_process = None
430 self.binary = None
431 self.start = 0
432 self.logs = []
433
434 def early_applicable(self):
435 """Return a false value if the test has determined it cannot run
436 without ever needing to talk to the target or server."""
437 # pylint: disable=no-self-use
438 return True
439
440 def setup(self):
441 pass
442
443 def compile(self):
444 compile_args = getattr(self, 'compile_args', None)
445 if compile_args:
446 if compile_args not in BaseTest.compiled:
447 # pylint: disable=star-args
448 BaseTest.compiled[compile_args] = \
449 self.target.compile(*compile_args)
450 self.binary = BaseTest.compiled.get(compile_args)
451
452 def classSetup(self):
453 self.compile()
454 self.target_process = self.target.target()
455 self.server = self.target.server()
456 self.logs.append(self.server.logname)
457
458 def classTeardown(self):
459 del self.server
460 del self.target_process
461
462 def run(self):
463 """
464 If compile_args is set, compile a program and set self.binary.
465
466 Call setup().
467
468 Then call test() and return the result, displaying relevant information
469 if an exception is raised.
470 """
471
472 print "Running", type(self).__name__, "...",
473 sys.stdout.flush()
474
475 if not self.early_applicable():
476 print "not_applicable"
477 return "not_applicable"
478
479 self.start = time.time()
480
481 self.classSetup()
482
483 try:
484 self.setup()
485 result = self.test() # pylint: disable=no-member
486 except TestNotApplicable:
487 result = "not_applicable"
488 except Exception as e: # pylint: disable=broad-except
489 if isinstance(e, TestFailed):
490 result = "fail"
491 else:
492 result = "exception"
493 print "%s in %.2fs" % (result, time.time() - self.start)
494 print "=" * 40
495 if isinstance(e, TestFailed):
496 header("Message")
497 print e.message
498 header("Traceback")
499 traceback.print_exc(file=sys.stdout)
500 for log in self.logs:
501 print_log(log)
502 print "/" * 40
503 return result
504
505 finally:
506 self.classTeardown()
507
508 if not result:
509 result = 'pass'
510 print "%s in %.2fs" % (result, time.time() - self.start)
511 return result
512
513 gdb_cmd = None
514 class GdbTest(BaseTest):
515 def __init__(self, target):
516 BaseTest.__init__(self, target)
517 self.gdb = None
518
519 def classSetup(self):
520 BaseTest.classSetup(self)
521 self.logs.append("gdb.log")
522
523 if gdb_cmd:
524 self.gdb = Gdb(gdb_cmd)
525 else:
526 self.gdb = Gdb()
527
528 if self.binary:
529 self.gdb.command("file %s" % self.binary)
530 if self.target:
531 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
532 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
533 if self.server.port:
534 self.gdb.command(
535 "target extended-remote localhost:%d" % self.server.port)
536
537 self.gdb.p("$priv=3")
538
539 def classTeardown(self):
540 del self.gdb
541 BaseTest.classTeardown(self)
542
543 class ExamineTarget(GdbTest):
544 def test(self):
545 self.target.misa = self.gdb.p("$misa")
546
547 txt = "RV"
548 if (self.target.misa >> 30) == 1:
549 txt += "32"
550 elif (self.target.misa >> 62) == 2:
551 txt += "64"
552 elif (self.target.misa >> 126) == 3:
553 txt += "128"
554 else:
555 txt += "??"
556
557 for i in range(26):
558 if self.target.misa & (1<<i):
559 txt += chr(i + ord('A'))
560 print txt,
561
562 class TestFailed(Exception):
563 def __init__(self, message):
564 Exception.__init__(self)
565 self.message = message
566
567 class TestNotApplicable(Exception):
568 def __init__(self, message):
569 Exception.__init__(self)
570 self.message = message
571
572 def assertEqual(a, b):
573 if a != b:
574 raise TestFailed("%r != %r" % (a, b))
575
576 def assertNotEqual(a, b):
577 if a == b:
578 raise TestFailed("%r == %r" % (a, b))
579
580 def assertIn(a, b):
581 if a not in b:
582 raise TestFailed("%r not in %r" % (a, b))
583
584 def assertNotIn(a, b):
585 if a in b:
586 raise TestFailed("%r in %r" % (a, b))
587
588 def assertGreater(a, b):
589 if not a > b:
590 raise TestFailed("%r not greater than %r" % (a, b))
591
592 def assertLess(a, b):
593 if not a < b:
594 raise TestFailed("%r not less than %r" % (a, b))
595
596 def assertTrue(a):
597 if not a:
598 raise TestFailed("%r is not True" % a)
599
600 def assertRegexpMatches(text, regexp):
601 if not re.search(regexp, text):
602 raise TestFailed("can't find %r in %r" % (regexp, text))