debug: checkpoint of trying to get simulation tests working
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
23 cmd = [cc, "-g"]
24 if (xlen == 32):
25 cmd.append("-march=rv32imac")
26 cmd.append("-mabi=ilp32")
27 for arg in args:
28 found = find_file(arg)
29 if found:
30 cmd.append(found)
31 else:
32 cmd.append(arg)
33 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
34 stderr=subprocess.PIPE)
35 stdout, stderr = process.communicate()
36 if process.returncode:
37 print
38 header("Compile failed")
39 print "+", " ".join(cmd)
40 print stdout,
41 print stderr,
42 header("")
43 raise Exception("Compile failed!")
44
45 def unused_port():
46 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
47 import socket
48 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
49 s.bind(("", 0))
50 port = s.getsockname()[1]
51 s.close()
52 return port
53
54 class Spike(object):
55 logname = "spike.log"
56
57 def __init__(self, cmd, binary=None, halted=False, with_gdb=True,
58 timeout=None, xlen=64):
59 """Launch spike. Return tuple of its process and the port it's running
60 on."""
61 if cmd:
62 cmd = shlex.split(cmd)
63 else:
64 cmd = ["spike"]
65 if xlen == 32:
66 cmd += ["--isa", "RV32"]
67
68 if timeout:
69 cmd = ["timeout", str(timeout)] + cmd
70
71 if halted:
72 cmd.append('-H')
73 if with_gdb:
74 self.port = unused_port()
75 cmd += ['--gdb-port', str(self.port)]
76 cmd.append("-m32")
77 cmd.append('pk')
78 if binary:
79 cmd.append(binary)
80 logfile = open(self.logname, "w")
81 logfile.write("+ %s\n" % " ".join(cmd))
82 logfile.flush()
83 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
84 stdout=logfile, stderr=logfile)
85
86 def __del__(self):
87 try:
88 self.process.kill()
89 self.process.wait()
90 except OSError:
91 pass
92
93 def wait(self, *args, **kwargs):
94 return self.process.wait(*args, **kwargs)
95
96 class VcsSim(object):
97 def __init__(self, simv=None, debug=False):
98 if simv:
99 cmd = shlex.split(simv)
100 else:
101 cmd = ["simv"]
102 cmd += ["+jtag_vpi_enable"]
103 if debug:
104 cmd[0] = cmd[0] + "-debug"
105 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
106 logfile = open("simv.log", "w")
107 logfile.write("+ %s\n" % " ".join(cmd))
108 logfile.flush()
109 listenfile = open("simv.log", "r")
110 listenfile.seek(0, 2)
111 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
112 stdout=logfile, stderr=logfile)
113 done = False
114 while not done:
115 line = listenfile.readline()
116 if not line:
117 time.sleep(1)
118 match = re.match(r"^Listening on port (\d+)$", line)
119 if match:
120 done = True
121 self.port = int(match.group(1))
122 os.environ['JTAG_VPI_PORT'] = str(self.port)
123
124 def __del__(self):
125 try:
126 self.process.kill()
127 self.process.wait()
128 except OSError:
129 pass
130
131 class Openocd(object):
132 logname = "openocd.log"
133
134 def __init__(self, cmd=None, config=None, debug=False):
135 if cmd:
136 cmd = shlex.split(cmd)
137 else:
138 cmd = ["openocd"]
139 if config:
140 cmd += ["-f", find_file(config)]
141 if debug:
142 cmd.append("-d")
143
144 # This command needs to come before any config scripts on the command
145 # line, since they are executed in order.
146 cmd[1:1] = [
147 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
148 "--command",
149 "gdb_port 0",
150 # Disable tcl and telnet servers, since they are unused and because
151 # the port numbers will conflict if multiple OpenOCD processes are
152 # running on the same server.
153 "--command",
154 "tcl_port disabled",
155 "--command",
156 "telnet_port disabled",
157 ]
158
159 logfile = open(Openocd.logname, "w")
160 logfile.write("+ %s\n" % " ".join(cmd))
161 logfile.flush()
162 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
163 stdout=logfile, stderr=logfile)
164
165 # Wait for OpenOCD to have made it through riscv_examine(). When using
166 # OpenOCD to communicate with a simulator this may take a long time,
167 # and gdb will time out when trying to connect if we attempt too early.
168 start = time.time()
169 messaged = False
170 while True:
171 log = open(Openocd.logname).read()
172 if "OK GO NOW" in log:
173 break
174 if not self.process.poll() is None:
175 raise Exception(
176 "OpenOCD exited before completing riscv_examine()")
177 if not messaged and time.time() - start > 1:
178 messaged = True
179 print "Waiting for OpenOCD to examine RISCV core..."
180
181 self.port = self._get_gdb_server_port()
182
183 def _get_gdb_server_port(self):
184 """Get port that OpenOCD's gdb server is listening on."""
185 MAX_ATTEMPTS = 50
186 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
187 for _ in range(MAX_ATTEMPTS):
188 with open(os.devnull, 'w') as devnull:
189 try:
190 output = subprocess.check_output([
191 'lsof',
192 '-a', # Take the AND of the following selectors
193 '-p{}'.format(self.process.pid), # Filter on PID
194 '-iTCP', # Filter only TCP sockets
195 ], stderr=devnull)
196 except subprocess.CalledProcessError:
197 output = ""
198 matches = list(PORT_REGEX.finditer(output))
199 matches = [m for m in matches
200 if m.group('port') not in ('6666', '4444')]
201 if len(matches) > 1:
202 print output
203 raise Exception(
204 "OpenOCD listening on multiple ports. Cannot uniquely "
205 "identify gdb server port.")
206 elif matches:
207 [match] = matches
208 return int(match.group('port'))
209 time.sleep(1)
210 raise Exception("Timed out waiting for gdb server to obtain port.")
211
212 def __del__(self):
213 try:
214 self.process.kill()
215 self.process.wait()
216 except OSError:
217 pass
218
219 class OpenocdCli(object):
220 def __init__(self, port=4444):
221 self.child = pexpect.spawn(
222 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
223 self.child.expect("> ")
224
225 def command(self, cmd):
226 self.child.sendline(cmd)
227 self.child.expect(cmd)
228 self.child.expect("\n")
229 self.child.expect("> ")
230 return self.child.before.strip("\t\r\n \0")
231
232 def reg(self, reg=''):
233 output = self.command("reg %s" % reg)
234 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
235 values = {r: int(v, 0) for r, v in matches}
236 if reg:
237 return values[reg]
238 return values
239
240 def load_image(self, image):
241 output = self.command("load_image %s" % image)
242 if 'invalid ELF file, only 32bits files are supported' in output:
243 raise TestNotApplicable(output)
244
245 class CannotAccess(Exception):
246 def __init__(self, address):
247 Exception.__init__(self)
248 self.address = address
249
250 class Gdb(object):
251 def __init__(self,
252 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
253 self.child = pexpect.spawn(cmd)
254 self.child.logfile = open("gdb.log", "w")
255 self.child.logfile.write("+ %s\n" % cmd)
256 self.wait()
257 self.command("set confirm off")
258 self.command("set width 0")
259 self.command("set height 0")
260 # Force consistency.
261 self.command("set print entry-values no")
262
263 def wait(self):
264 """Wait for prompt."""
265 self.child.expect(r"\(gdb\)")
266
267 def command(self, command, timeout=6000):
268 self.child.sendline(command)
269 self.child.expect("\n", timeout=timeout)
270 self.child.expect(r"\(gdb\)", timeout=timeout)
271 return self.child.before.strip()
272
273 def c(self, wait=True):
274 if wait:
275 output = self.command("c")
276 assert "Continuing" in output
277 return output
278 else:
279 self.child.sendline("c")
280 self.child.expect("Continuing")
281
282 def interrupt(self):
283 self.child.send("\003")
284 self.child.expect(r"\(gdb\)", timeout=6000)
285 return self.child.before.strip()
286
287 def x(self, address, size='w'):
288 output = self.command("x/%s %s" % (size, address))
289 value = int(output.split(':')[1].strip(), 0)
290 return value
291
292 def p_raw(self, obj):
293 output = self.command("p %s" % obj)
294 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
295 if m:
296 raise CannotAccess(int(m.group(1), 0))
297 return output.split('=')[-1].strip()
298
299 def p(self, obj):
300 output = self.command("p/x %s" % obj)
301 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
302 if m:
303 raise CannotAccess(int(m.group(1), 0))
304 value = int(output.split('=')[-1].strip(), 0)
305 return value
306
307 def p_string(self, obj):
308 output = self.command("p %s" % obj)
309 value = shlex.split(output.split('=')[-1].strip())[1]
310 return value
311
312 def stepi(self):
313 output = self.command("stepi")
314 return output
315
316 def load(self):
317 output = self.command("load", timeout=6000)
318 assert "failed" not in output
319 assert "Transfer rate" in output
320
321 def b(self, location):
322 output = self.command("b %s" % location)
323 assert "not defined" not in output
324 assert "Breakpoint" in output
325 return output
326
327 def hbreak(self, location):
328 output = self.command("hbreak %s" % location)
329 assert "not defined" not in output
330 assert "Hardware assisted breakpoint" in output
331 return output
332
333 def run_all_tests(module, target, parsed):
334 good_results = set(('pass', 'not_applicable'))
335
336 start = time.time()
337
338 results = {}
339 count = 0
340
341 global gdb_cmd # pylint: disable=global-statement
342 gdb_cmd = parsed.gdb
343
344 todo = [("ExamineTarget", ExamineTarget)]
345 for name in dir(module):
346 definition = getattr(module, name)
347 if type(definition) == type and hasattr(definition, 'test') and \
348 (not parsed.test or any(test in name for test in parsed.test)):
349 todo.append((name, definition))
350
351 for name, definition in todo:
352 instance = definition(target)
353 result = instance.run()
354 results.setdefault(result, []).append(name)
355 count += 1
356 if result not in good_results and parsed.fail_fast:
357 break
358
359 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
360
361 result = 0
362 for key, value in results.iteritems():
363 print "%d tests returned %s" % (len(value), key)
364 if key not in good_results:
365 result = 1
366 for test in value:
367 print " ", test
368
369 return result
370
371 def add_test_run_options(parser):
372 parser.add_argument("--fail-fast", "-f", action="store_true",
373 help="Exit as soon as any test fails.")
374 parser.add_argument("test", nargs='*',
375 help="Run only tests that are named here.")
376 parser.add_argument("--gdb",
377 help="The command to use to start gdb.")
378
379 def header(title, dash='-'):
380 if title:
381 dashes = dash * (36 - len(title))
382 before = dashes[:len(dashes)/2]
383 after = dashes[len(dashes)/2:]
384 print "%s[ %s ]%s" % (before, title, after)
385 else:
386 print dash * 40
387
388 def print_log(path):
389 header(path)
390 lines = open(path, "r").readlines()
391 if len(lines) > 1000:
392 for l in lines[:500]:
393 sys.stdout.write(l)
394 print "..."
395 for l in lines[-500:]:
396 sys.stdout.write(l)
397 else:
398 for l in lines:
399 sys.stdout.write(l)
400
401 class BaseTest(object):
402 compiled = {}
403
404 def __init__(self, target):
405 self.target = target
406 self.server = None
407 self.target_process = None
408 self.binary = None
409 self.start = 0
410 self.logs = []
411
412 def early_applicable(self):
413 """Return a false value if the test has determined it cannot run
414 without ever needing to talk to the target or server."""
415 # pylint: disable=no-self-use
416 return True
417
418 def setup(self):
419 pass
420
421 def compile(self):
422 compile_args = getattr(self, 'compile_args', None)
423 if compile_args:
424 if compile_args not in BaseTest.compiled:
425 # pylint: disable=star-args
426 BaseTest.compiled[compile_args] = \
427 self.target.compile(*compile_args)
428 self.binary = BaseTest.compiled.get(compile_args)
429
430 def classSetup(self):
431 self.compile()
432 self.target_process = self.target.target()
433 self.server = self.target.server()
434 self.logs.append(self.server.logname)
435
436 def classTeardown(self):
437 del self.server
438 del self.target_process
439
440 def run(self):
441 """
442 If compile_args is set, compile a program and set self.binary.
443
444 Call setup().
445
446 Then call test() and return the result, displaying relevant information
447 if an exception is raised.
448 """
449
450 print "Running", type(self).__name__, "...",
451 sys.stdout.flush()
452
453 if not self.early_applicable():
454 print "not_applicable"
455 return "not_applicable"
456
457 self.start = time.time()
458
459 self.classSetup()
460
461 try:
462 self.setup()
463 result = self.test() # pylint: disable=no-member
464 except TestNotApplicable:
465 result = "not_applicable"
466 except Exception as e: # pylint: disable=broad-except
467 if isinstance(e, TestFailed):
468 result = "fail"
469 else:
470 result = "exception"
471 print "%s in %.2fs" % (result, time.time() - self.start)
472 print "=" * 40
473 if isinstance(e, TestFailed):
474 header("Message")
475 print e.message
476 header("Traceback")
477 traceback.print_exc(file=sys.stdout)
478 for log in self.logs:
479 print_log(log)
480 print "/" * 40
481 return result
482
483 finally:
484 self.classTeardown()
485
486 if not result:
487 result = 'pass'
488 print "%s in %.2fs" % (result, time.time() - self.start)
489 return result
490
491 gdb_cmd = None
492 class GdbTest(BaseTest):
493 def __init__(self, target):
494 BaseTest.__init__(self, target)
495 self.gdb = None
496
497 def classSetup(self):
498 BaseTest.classSetup(self)
499 self.logs.append("gdb.log")
500
501 if gdb_cmd:
502 self.gdb = Gdb(gdb_cmd)
503 else:
504 self.gdb = Gdb()
505
506 if self.binary:
507 self.gdb.command("file %s" % self.binary)
508 if self.target:
509 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
510 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
511 if self.server.port:
512 self.gdb.command(
513 "target extended-remote localhost:%d" % self.server.port)
514
515 self.gdb.p("$priv=3")
516
517 def classTeardown(self):
518 del self.gdb
519 BaseTest.classTeardown(self)
520
521 class ExamineTarget(GdbTest):
522 def test(self):
523 self.target.misa = self.gdb.p("$misa")
524
525 txt = "RV"
526 if (self.target.misa >> 30) == 1:
527 txt += "32"
528 elif (self.target.misa >> 62) == 2:
529 txt += "64"
530 elif (self.target.misa >> 126) == 3:
531 txt += "128"
532 else:
533 txt += "??"
534
535 for i in range(26):
536 if self.target.misa & (1<<i):
537 txt += chr(i + ord('A'))
538 print txt,
539
540 class TestFailed(Exception):
541 def __init__(self, message):
542 Exception.__init__(self)
543 self.message = message
544
545 class TestNotApplicable(Exception):
546 def __init__(self, message):
547 Exception.__init__(self)
548 self.message = message
549
550 def assertEqual(a, b):
551 if a != b:
552 raise TestFailed("%r != %r" % (a, b))
553
554 def assertNotEqual(a, b):
555 if a == b:
556 raise TestFailed("%r == %r" % (a, b))
557
558 def assertIn(a, b):
559 if a not in b:
560 raise TestFailed("%r not in %r" % (a, b))
561
562 def assertNotIn(a, b):
563 if a in b:
564 raise TestFailed("%r in %r" % (a, b))
565
566 def assertGreater(a, b):
567 if not a > b:
568 raise TestFailed("%r not greater than %r" % (a, b))
569
570 def assertLess(a, b):
571 if not a < b:
572 raise TestFailed("%r not less than %r" % (a, b))
573
574 def assertTrue(a):
575 if not a:
576 raise TestFailed("%r is not True" % a)
577
578 def assertRegexpMatches(text, regexp):
579 if not re.search(regexp, text):
580 raise TestFailed("can't find %r in %r" % (regexp, text))