Talk to spike using OpenOCD instead of directly.
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv%d-unknown-elf-gcc" % xlen)
23 cmd = [cc, "-g"]
24 for arg in args:
25 found = find_file(arg)
26 if found:
27 cmd.append(found)
28 else:
29 cmd.append(arg)
30 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
31 stderr=subprocess.PIPE)
32 stdout, stderr = process.communicate()
33 if process.returncode:
34 print
35 header("Compile failed")
36 print "+", " ".join(cmd)
37 print stdout,
38 print stderr,
39 header("")
40 raise Exception("Compile failed!")
41
42 def unused_port():
43 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
44 import socket
45 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
46 s.bind(("", 0))
47 port = s.getsockname()[1]
48 s.close()
49 return port
50
51 class Spike(object):
52 logname = "spike.log"
53
54 def __init__(self, cmd, binary=None, halted=False, with_jtag_gdb=True,
55 timeout=None, xlen=64):
56 """Launch spike. Return tuple of its process and the port it's running
57 on."""
58 if cmd:
59 cmd = shlex.split(cmd)
60 else:
61 cmd = ["spike", "-l"]
62 if xlen == 32:
63 cmd += ["--isa", "RV32"]
64
65 if timeout:
66 cmd = ["timeout", str(timeout)] + cmd
67
68 if halted:
69 cmd.append('-H')
70 if with_jtag_gdb:
71 cmd += ['--rbb-port', '0']
72 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
73 cmd.append("-m32")
74 cmd.append('pk')
75 if binary:
76 cmd.append(binary)
77 logfile = open(self.logname, "w")
78 logfile.write("+ %s\n" % " ".join(cmd))
79 logfile.flush()
80 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
81 stdout=logfile, stderr=logfile)
82
83 if with_jtag_gdb:
84 self.port = None
85 for _ in range(30):
86 m = re.search(r"Listening for remote bitbang connection on port (\d+).",
87 file(self.logname).read())
88 if m:
89 self.port = int(m.group(1))
90 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
91 break
92 time.sleep(0.11)
93 assert self.port, "Didn't get spike message about bitbang connection"
94
95 def __del__(self):
96 try:
97 self.process.kill()
98 self.process.wait()
99 except OSError:
100 pass
101
102 def wait(self, *args, **kwargs):
103 return self.process.wait(*args, **kwargs)
104
105 class VcsSim(object):
106 def __init__(self, simv=None, debug=False):
107 if simv:
108 cmd = shlex.split(simv)
109 else:
110 cmd = ["simv"]
111 cmd += ["+jtag_vpi_enable"]
112 if debug:
113 cmd[0] = cmd[0] + "-debug"
114 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
115 logfile = open("simv.log", "w")
116 logfile.write("+ %s\n" % " ".join(cmd))
117 logfile.flush()
118 listenfile = open("simv.log", "r")
119 listenfile.seek(0, 2)
120 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
121 stdout=logfile, stderr=logfile)
122 done = False
123 while not done:
124 line = listenfile.readline()
125 if not line:
126 time.sleep(1)
127 match = re.match(r"^Listening on port (\d+)$", line)
128 if match:
129 done = True
130 self.port = int(match.group(1))
131 os.environ['JTAG_VPI_PORT'] = str(self.port)
132
133 def __del__(self):
134 try:
135 self.process.kill()
136 self.process.wait()
137 except OSError:
138 pass
139
140 class Openocd(object):
141 logname = "openocd.log"
142
143 def __init__(self, cmd=None, config=None, debug=False):
144 if cmd:
145 cmd = shlex.split(cmd)
146 else:
147 cmd = ["openocd", "-d"]
148
149 # This command needs to come before any config scripts on the command
150 # line, since they are executed in order.
151 cmd += [
152 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
153 "--command",
154 "gdb_port 0",
155 # Disable tcl and telnet servers, since they are unused and because
156 # the port numbers will conflict if multiple OpenOCD processes are
157 # running on the same server.
158 "--command",
159 "tcl_port disabled",
160 "--command",
161 "telnet_port disabled",
162 ]
163
164 if config:
165 cmd += ["-f", find_file(config)]
166 if debug:
167 cmd.append("-d")
168
169 logfile = open(Openocd.logname, "w")
170 logfile.write("+ %s\n" % " ".join(cmd))
171 logfile.flush()
172 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
173 stdout=logfile, stderr=logfile)
174
175 # Wait for OpenOCD to have made it through riscv_examine(). When using
176 # OpenOCD to communicate with a simulator this may take a long time,
177 # and gdb will time out when trying to connect if we attempt too early.
178 start = time.time()
179 messaged = False
180 while True:
181 log = open(Openocd.logname).read()
182 if "Examined RISCV core" in log:
183 break
184 if not self.process.poll() is None:
185 raise Exception(
186 "OpenOCD exited before completing riscv_examine()")
187 if not messaged and time.time() - start > 1:
188 messaged = True
189 print "Waiting for OpenOCD to examine RISCV core..."
190
191 self.port = self._get_gdb_server_port()
192
193 def _get_gdb_server_port(self):
194 """Get port that OpenOCD's gdb server is listening on."""
195 MAX_ATTEMPTS = 50
196 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
197 for _ in range(MAX_ATTEMPTS):
198 with open(os.devnull, 'w') as devnull:
199 try:
200 output = subprocess.check_output([
201 'lsof',
202 '-a', # Take the AND of the following selectors
203 '-p{}'.format(self.process.pid), # Filter on PID
204 '-iTCP', # Filter only TCP sockets
205 ], stderr=devnull)
206 except subprocess.CalledProcessError:
207 output = ""
208 matches = list(PORT_REGEX.finditer(output))
209 matches = [m for m in matches
210 if m.group('port') not in ('6666', '4444')]
211 if len(matches) > 1:
212 print output
213 raise Exception(
214 "OpenOCD listening on multiple ports. Cannot uniquely "
215 "identify gdb server port.")
216 elif matches:
217 [match] = matches
218 return int(match.group('port'))
219 time.sleep(0.1)
220 raise Exception("Timed out waiting for gdb server to obtain port.")
221
222 def __del__(self):
223 try:
224 self.process.kill()
225 self.process.wait()
226 except OSError:
227 pass
228
229 class OpenocdCli(object):
230 def __init__(self, port=4444):
231 self.child = pexpect.spawn(
232 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
233 self.child.expect("> ")
234
235 def command(self, cmd):
236 self.child.sendline(cmd)
237 self.child.expect(cmd)
238 self.child.expect("\n")
239 self.child.expect("> ")
240 return self.child.before.strip("\t\r\n \0")
241
242 def reg(self, reg=''):
243 output = self.command("reg %s" % reg)
244 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
245 values = {r: int(v, 0) for r, v in matches}
246 if reg:
247 return values[reg]
248 return values
249
250 def load_image(self, image):
251 output = self.command("load_image %s" % image)
252 if 'invalid ELF file, only 32bits files are supported' in output:
253 raise TestNotApplicable(output)
254
255 class CannotAccess(Exception):
256 def __init__(self, address):
257 Exception.__init__(self)
258 self.address = address
259
260 class Gdb(object):
261 def __init__(self,
262 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
263 self.child = pexpect.spawn(cmd)
264 self.child.logfile = open("gdb.log", "w")
265 self.child.logfile.write("+ %s\n" % cmd)
266 self.wait()
267 self.command("set confirm off")
268 self.command("set width 0")
269 self.command("set height 0")
270 # Force consistency.
271 self.command("set print entry-values no")
272
273 def wait(self):
274 """Wait for prompt."""
275 self.child.expect(r"\(gdb\)")
276
277 def command(self, command, timeout=-1):
278 self.child.sendline(command)
279 self.child.expect("\n", timeout=timeout)
280 self.child.expect(r"\(gdb\)", timeout=timeout)
281 return self.child.before.strip()
282
283 def c(self, wait=True):
284 if wait:
285 output = self.command("c")
286 assert "Continuing" in output
287 return output
288 else:
289 self.child.sendline("c")
290 self.child.expect("Continuing")
291
292 def interrupt(self):
293 self.child.send("\003")
294 self.child.expect(r"\(gdb\)", timeout=60)
295 return self.child.before.strip()
296
297 def x(self, address, size='w'):
298 output = self.command("x/%s %s" % (size, address))
299 value = int(output.split(':')[1].strip(), 0)
300 return value
301
302 def p_raw(self, obj):
303 output = self.command("p %s" % obj)
304 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
305 if m:
306 raise CannotAccess(int(m.group(1), 0))
307 return output.split('=')[-1].strip()
308
309 def p(self, obj):
310 output = self.command("p/x %s" % obj)
311 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
312 if m:
313 raise CannotAccess(int(m.group(1), 0))
314 value = int(output.split('=')[-1].strip(), 0)
315 return value
316
317 def p_string(self, obj):
318 output = self.command("p %s" % obj)
319 value = shlex.split(output.split('=')[-1].strip())[1]
320 return value
321
322 def stepi(self):
323 output = self.command("stepi")
324 return output
325
326 def load(self):
327 output = self.command("load", timeout=60)
328 assert "failed" not in output
329 assert "Transfer rate" in output
330
331 def b(self, location):
332 output = self.command("b %s" % location)
333 assert "not defined" not in output
334 assert "Breakpoint" in output
335 return output
336
337 def hbreak(self, location):
338 output = self.command("hbreak %s" % location)
339 assert "not defined" not in output
340 assert "Hardware assisted breakpoint" in output
341 return output
342
343 def run_all_tests(module, target, parsed):
344 good_results = set(('pass', 'not_applicable'))
345
346 start = time.time()
347
348 results = {}
349 count = 0
350
351 global gdb_cmd # pylint: disable=global-statement
352 gdb_cmd = parsed.gdb
353
354 todo = [("ExamineTarget", ExamineTarget)]
355 for name in dir(module):
356 definition = getattr(module, name)
357 if type(definition) == type and hasattr(definition, 'test') and \
358 (not parsed.test or any(test in name for test in parsed.test)):
359 todo.append((name, definition))
360
361 for name, definition in todo:
362 instance = definition(target)
363 result = instance.run()
364 results.setdefault(result, []).append(name)
365 count += 1
366 if result not in good_results and parsed.fail_fast:
367 break
368
369 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
370
371 result = 0
372 for key, value in results.iteritems():
373 print "%d tests returned %s" % (len(value), key)
374 if key not in good_results:
375 result = 1
376 for test in value:
377 print " ", test
378
379 return result
380
381 def add_test_run_options(parser):
382 parser.add_argument("--fail-fast", "-f", action="store_true",
383 help="Exit as soon as any test fails.")
384 parser.add_argument("test", nargs='*',
385 help="Run only tests that are named here.")
386 parser.add_argument("--gdb",
387 help="The command to use to start gdb.")
388
389 def header(title, dash='-'):
390 if title:
391 dashes = dash * (36 - len(title))
392 before = dashes[:len(dashes)/2]
393 after = dashes[len(dashes)/2:]
394 print "%s[ %s ]%s" % (before, title, after)
395 else:
396 print dash * 40
397
398 def print_log(path):
399 header(path)
400 lines = open(path, "r").readlines()
401 if len(lines) > 1000:
402 for l in lines[:500]:
403 sys.stdout.write(l)
404 print "..."
405 for l in lines[-500:]:
406 sys.stdout.write(l)
407 else:
408 for l in lines:
409 sys.stdout.write(l)
410
411 class BaseTest(object):
412 compiled = {}
413
414 def __init__(self, target):
415 self.target = target
416 self.server = None
417 self.target_process = None
418 self.binary = None
419 self.start = 0
420 self.logs = []
421
422 def early_applicable(self):
423 """Return a false value if the test has determined it cannot run
424 without ever needing to talk to the target or server."""
425 # pylint: disable=no-self-use
426 return True
427
428 def setup(self):
429 pass
430
431 def compile(self):
432 compile_args = getattr(self, 'compile_args', None)
433 if compile_args:
434 if compile_args not in BaseTest.compiled:
435 # pylint: disable=star-args
436 BaseTest.compiled[compile_args] = \
437 self.target.compile(*compile_args)
438 self.binary = BaseTest.compiled.get(compile_args)
439
440 def classSetup(self):
441 self.compile()
442 self.target_process = self.target.target()
443 self.server = self.target.server()
444 self.logs.append(self.server.logname)
445
446 def classTeardown(self):
447 del self.server
448 del self.target_process
449
450 def run(self):
451 """
452 If compile_args is set, compile a program and set self.binary.
453
454 Call setup().
455
456 Then call test() and return the result, displaying relevant information
457 if an exception is raised.
458 """
459
460 print "Running", type(self).__name__, "...",
461 sys.stdout.flush()
462
463 if not self.early_applicable():
464 print "not_applicable"
465 return "not_applicable"
466
467 self.start = time.time()
468
469 self.classSetup()
470
471 try:
472 self.setup()
473 result = self.test() # pylint: disable=no-member
474 except TestNotApplicable:
475 result = "not_applicable"
476 except Exception as e: # pylint: disable=broad-except
477 if isinstance(e, TestFailed):
478 result = "fail"
479 else:
480 result = "exception"
481 print "%s in %.2fs" % (result, time.time() - self.start)
482 print "=" * 40
483 if isinstance(e, TestFailed):
484 header("Message")
485 print e.message
486 header("Traceback")
487 traceback.print_exc(file=sys.stdout)
488 for log in self.logs:
489 print_log(log)
490 print "/" * 40
491 return result
492
493 finally:
494 self.classTeardown()
495
496 if not result:
497 result = 'pass'
498 print "%s in %.2fs" % (result, time.time() - self.start)
499 return result
500
501 gdb_cmd = None
502 class GdbTest(BaseTest):
503 def __init__(self, target):
504 BaseTest.__init__(self, target)
505 self.gdb = None
506
507 def classSetup(self):
508 BaseTest.classSetup(self)
509 self.logs.append("gdb.log")
510
511 if gdb_cmd:
512 self.gdb = Gdb(gdb_cmd)
513 else:
514 self.gdb = Gdb()
515
516 if self.binary:
517 self.gdb.command("file %s" % self.binary)
518 if self.target:
519 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
520 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
521 if self.server.port:
522 self.gdb.command(
523 "target extended-remote localhost:%d" % self.server.port)
524
525 self.gdb.p("$priv=3")
526
527 def classTeardown(self):
528 del self.gdb
529 BaseTest.classTeardown(self)
530
531 class ExamineTarget(GdbTest):
532 def test(self):
533 self.target.misa = self.gdb.p("$misa")
534
535 txt = "RV"
536 if (self.target.misa >> 30) == 1:
537 txt += "32"
538 elif (self.target.misa >> 62) == 2:
539 txt += "64"
540 elif (self.target.misa >> 126) == 3:
541 txt += "128"
542 else:
543 txt += "??"
544
545 for i in range(26):
546 if self.target.misa & (1<<i):
547 txt += chr(i + ord('A'))
548 print txt,
549
550 class TestFailed(Exception):
551 def __init__(self, message):
552 Exception.__init__(self)
553 self.message = message
554
555 class TestNotApplicable(Exception):
556 def __init__(self, message):
557 Exception.__init__(self)
558 self.message = message
559
560 def assertEqual(a, b):
561 if a != b:
562 raise TestFailed("%r != %r" % (a, b))
563
564 def assertNotEqual(a, b):
565 if a == b:
566 raise TestFailed("%r == %r" % (a, b))
567
568 def assertIn(a, b):
569 if a not in b:
570 raise TestFailed("%r not in %r" % (a, b))
571
572 def assertNotIn(a, b):
573 if a in b:
574 raise TestFailed("%r in %r" % (a, b))
575
576 def assertGreater(a, b):
577 if not a > b:
578 raise TestFailed("%r not greater than %r" % (a, b))
579
580 def assertLess(a, b):
581 if not a < b:
582 raise TestFailed("%r not less than %r" % (a, b))
583
584 def assertTrue(a):
585 if not a:
586 raise TestFailed("%r is not True" % a)
587
588 def assertRegexpMatches(text, regexp):
589 if not re.search(regexp, text):
590 raise TestFailed("can't find %r in %r" % (regexp, text))