Smoketest multicore.
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import random
3 import re
4 import shlex
5 import subprocess
6 import sys
7 import tempfile
8 import time
9 import traceback
10
11 import pexpect
12
13 # Note that gdb comes with its own testsuite. I was unable to figure out how to
14 # run that testsuite against the spike simulator.
15
16 def find_file(path):
17 for directory in (os.getcwd(), os.path.dirname(__file__)):
18 fullpath = os.path.join(directory, path)
19 if os.path.exists(fullpath):
20 return fullpath
21 return None
22
23 def compile(args, xlen=32): # pylint: disable=redefined-builtin
24 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
25 cmd = [cc, "-g"]
26 if xlen == 32:
27 cmd.append("-march=rv32imac")
28 cmd.append("-mabi=ilp32")
29 else:
30 cmd.append("-march=rv64imac")
31 cmd.append("-mabi=lp64")
32 for arg in args:
33 found = find_file(arg)
34 if found:
35 cmd.append(found)
36 else:
37 cmd.append(arg)
38 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
39 stderr=subprocess.PIPE)
40 stdout, stderr = process.communicate()
41 if process.returncode:
42 print
43 header("Compile failed")
44 print "+", " ".join(cmd)
45 print stdout,
46 print stderr,
47 header("")
48 raise Exception("Compile failed!")
49
50 def unused_port():
51 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
52 import socket
53 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
54 s.bind(("", 0))
55 port = s.getsockname()[1]
56 s.close()
57 return port
58
59 class Spike(object):
60 logname = "spike-%d.log" % os.getpid()
61
62 def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True):
63 """Launch spike. Return tuple of its process and the port it's running
64 on."""
65 if target.sim_cmd:
66 cmd = shlex.split(target.sim_cmd)
67 else:
68 spike = os.path.expandvars("$RISCV/bin/spike")
69 cmd = [spike]
70 if target.xlen == 32:
71 cmd += ["--isa", "RV32G"]
72 else:
73 cmd += ["--isa", "RV64G"]
74 cmd += ["-m0x%x:0x%x" % (target.ram, target.ram_size)]
75
76 if timeout:
77 cmd = ["timeout", str(timeout)] + cmd
78
79 if halted:
80 cmd.append('-H')
81 if with_jtag_gdb:
82 cmd += ['--rbb-port', '0']
83 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
84 self.infinite_loop = target.compile(
85 "programs/checksum.c", "programs/tiny-malloc.c",
86 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
87 cmd.append(self.infinite_loop)
88 logfile = open(self.logname, "w")
89 logfile.write("+ %s\n" % " ".join(cmd))
90 logfile.flush()
91 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
92 stdout=logfile, stderr=logfile)
93
94 if with_jtag_gdb:
95 self.port = None
96 for _ in range(30):
97 m = re.search(r"Listening for remote bitbang connection on "
98 r"port (\d+).", open(self.logname).read())
99 if m:
100 self.port = int(m.group(1))
101 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
102 break
103 time.sleep(0.11)
104 assert self.port, "Didn't get spike message about bitbang " \
105 "connection"
106
107 def __del__(self):
108 try:
109 self.process.kill()
110 self.process.wait()
111 except OSError:
112 pass
113
114 def wait(self, *args, **kwargs):
115 return self.process.wait(*args, **kwargs)
116
117 class VcsSim(object):
118 def __init__(self, sim_cmd=None, debug=False):
119 if sim_cmd:
120 cmd = shlex.split(sim_cmd)
121 else:
122 cmd = ["simv"]
123 cmd += ["+jtag_vpi_enable"]
124 if debug:
125 cmd[0] = cmd[0] + "-debug"
126 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
127 logfile = open("simv.log", "w")
128 logfile.write("+ %s\n" % " ".join(cmd))
129 logfile.flush()
130 listenfile = open("simv.log", "r")
131 listenfile.seek(0, 2)
132 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
133 stdout=logfile, stderr=logfile)
134 done = False
135 while not done:
136 # Fail if VCS exits early
137 exit_code = self.process.poll()
138 if exit_code is not None:
139 raise RuntimeError('VCS simulator exited early with status %d'
140 % exit_code)
141
142 line = listenfile.readline()
143 if not line:
144 time.sleep(1)
145 match = re.match(r"^Listening on port (\d+)$", line)
146 if match:
147 done = True
148 self.port = int(match.group(1))
149 os.environ['JTAG_VPI_PORT'] = str(self.port)
150
151 def __del__(self):
152 try:
153 self.process.kill()
154 self.process.wait()
155 except OSError:
156 pass
157
158 class Openocd(object):
159 logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')
160 logname = logfile.name
161
162 def __init__(self, server_cmd=None, config=None, debug=False):
163 if server_cmd:
164 cmd = shlex.split(server_cmd)
165 else:
166 openocd = os.path.expandvars("$RISCV/bin/openocd")
167 cmd = [openocd]
168 if debug:
169 cmd.append("-d")
170
171 # This command needs to come before any config scripts on the command
172 # line, since they are executed in order.
173 cmd += [
174 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
175 "--command",
176 "gdb_port 0",
177 # Disable tcl and telnet servers, since they are unused and because
178 # the port numbers will conflict if multiple OpenOCD processes are
179 # running on the same server.
180 "--command",
181 "tcl_port disabled",
182 "--command",
183 "telnet_port disabled",
184 ]
185
186 if config:
187 f = find_file(config)
188 if f is None:
189 print "Unable to read file " + config
190 exit(1)
191
192 cmd += ["-f", f]
193 if debug:
194 cmd.append("-d")
195
196 logfile = open(Openocd.logname, "w")
197 logfile.write("+ %s\n" % " ".join(cmd))
198 logfile.flush()
199 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
200 stdout=logfile, stderr=logfile)
201
202 # Wait for OpenOCD to have made it through riscv_examine(). When using
203 # OpenOCD to communicate with a simulator this may take a long time,
204 # and gdb will time out when trying to connect if we attempt too early.
205 start = time.time()
206 messaged = False
207 while True:
208 log = open(Openocd.logname).read()
209 if "Ready for Remote Connections" in log:
210 break
211 if not self.process.poll() is None:
212 header("OpenOCD log")
213 sys.stdout.write(log)
214 raise Exception(
215 "OpenOCD exited before completing riscv_examine()")
216 if not messaged and time.time() - start > 1:
217 messaged = True
218 print "Waiting for OpenOCD to examine RISCV core..."
219 if time.time() - start > 60:
220 raise Exception("ERROR: Timed out waiting for OpenOCD to "
221 "examine RISCV core")
222
223 try:
224 self.port = self._get_gdb_server_port()
225 except:
226 header("OpenOCD log")
227 sys.stdout.write(log)
228 raise
229
230 def _get_gdb_server_port(self):
231 """Get port that OpenOCD's gdb server is listening on."""
232 MAX_ATTEMPTS = 50
233 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
234 for _ in range(MAX_ATTEMPTS):
235 with open(os.devnull, 'w') as devnull:
236 try:
237 output = subprocess.check_output([
238 'lsof',
239 '-a', # Take the AND of the following selectors
240 '-p{}'.format(self.process.pid), # Filter on PID
241 '-iTCP', # Filter only TCP sockets
242 ], stderr=devnull)
243 except subprocess.CalledProcessError:
244 output = ""
245 matches = list(PORT_REGEX.finditer(output))
246 matches = [m for m in matches
247 if m.group('port') not in ('6666', '4444')]
248 if len(matches) > 1:
249 print output
250 raise Exception(
251 "OpenOCD listening on multiple ports. Cannot uniquely "
252 "identify gdb server port.")
253 elif matches:
254 [match] = matches
255 return int(match.group('port'))
256 time.sleep(1)
257 raise Exception("Timed out waiting for gdb server to obtain port.")
258
259 def __del__(self):
260 try:
261 self.process.kill()
262 self.process.wait()
263 except OSError:
264 pass
265
266 class OpenocdCli(object):
267 def __init__(self, port=4444):
268 self.child = pexpect.spawn(
269 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
270 self.child.expect("> ")
271
272 def command(self, cmd):
273 self.child.sendline(cmd)
274 self.child.expect(cmd)
275 self.child.expect("\n")
276 self.child.expect("> ")
277 return self.child.before.strip("\t\r\n \0")
278
279 def reg(self, reg=''):
280 output = self.command("reg %s" % reg)
281 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
282 values = {r: int(v, 0) for r, v in matches}
283 if reg:
284 return values[reg]
285 return values
286
287 def load_image(self, image):
288 output = self.command("load_image %s" % image)
289 if 'invalid ELF file, only 32bits files are supported' in output:
290 raise TestNotApplicable(output)
291
292 class CannotAccess(Exception):
293 def __init__(self, address):
294 Exception.__init__(self)
295 self.address = address
296
297 class Gdb(object):
298 logfile = tempfile.NamedTemporaryFile(prefix="gdb", suffix=".log")
299 logname = logfile.name
300
301 def __init__(self,
302 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
303 self.child = pexpect.spawn(cmd)
304 self.child.logfile = open(self.logname, "w")
305 self.child.logfile.write("+ %s\n" % cmd)
306 self.wait()
307 self.command("set confirm off")
308 self.command("set width 0")
309 self.command("set height 0")
310 # Force consistency.
311 self.command("set print entry-values no")
312
313 def wait(self):
314 """Wait for prompt."""
315 self.child.expect(r"\(gdb\)")
316
317 def command(self, command, timeout=6000):
318 self.child.sendline(command)
319 self.child.expect("\n", timeout=timeout)
320 self.child.expect(r"\(gdb\)", timeout=timeout)
321 return self.child.before.strip()
322
323 def c(self, wait=True, timeout=-1):
324 if wait:
325 output = self.command("c", timeout=timeout)
326 assert "Continuing" in output
327 return output
328 else:
329 self.child.sendline("c")
330 self.child.expect("Continuing")
331
332 def interrupt(self):
333 self.child.send("\003")
334 self.child.expect(r"\(gdb\)", timeout=6000)
335 return self.child.before.strip()
336
337 def x(self, address, size='w'):
338 output = self.command("x/%s %s" % (size, address))
339 value = int(output.split(':')[1].strip(), 0)
340 return value
341
342 def p_raw(self, obj):
343 output = self.command("p %s" % obj)
344 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
345 if m:
346 raise CannotAccess(int(m.group(1), 0))
347 return output.split('=')[-1].strip()
348
349 def p(self, obj):
350 output = self.command("p/x %s" % obj)
351 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
352 if m:
353 raise CannotAccess(int(m.group(1), 0))
354 value = int(output.split('=')[-1].strip(), 0)
355 return value
356
357 def p_string(self, obj):
358 output = self.command("p %s" % obj)
359 value = shlex.split(output.split('=')[-1].strip())[1]
360 return value
361
362 def stepi(self):
363 output = self.command("stepi")
364 return output
365
366 def load(self):
367 output = self.command("load", timeout=6000)
368 assert "failed" not in output
369 assert "Transfer rate" in output
370
371 def b(self, location):
372 output = self.command("b %s" % location)
373 assert "not defined" not in output
374 assert "Breakpoint" in output
375 return output
376
377 def hbreak(self, location):
378 output = self.command("hbreak %s" % location)
379 assert "not defined" not in output
380 assert "Hardware assisted breakpoint" in output
381 return output
382
383 def run_all_tests(module, target, parsed):
384 if not os.path.exists(parsed.logs):
385 os.makedirs(parsed.logs)
386
387 overall_start = time.time()
388
389 global gdb_cmd # pylint: disable=global-statement
390 gdb_cmd = parsed.gdb
391
392 todo = []
393 if parsed.misaval:
394 target.misa = int(parsed.misaval, 16)
395 print "Assuming $MISA value of 0x%x. Skipping ExamineTarget." % \
396 target.misa
397 else:
398 todo.append(("ExamineTarget", ExamineTarget))
399
400 for name in dir(module):
401 definition = getattr(module, name)
402 if type(definition) == type and hasattr(definition, 'test') and \
403 (not parsed.test or any(test in name for test in parsed.test)):
404 todo.append((name, definition))
405
406 results, count = run_tests(parsed, target, todo)
407
408 header("ran %d tests in %.0fs" % (count, time.time() - overall_start),
409 dash=':')
410
411 return print_results(results)
412
413 good_results = set(('pass', 'not_applicable'))
414 def run_tests(parsed, target, todo):
415 results = {}
416 count = 0
417
418 for name, definition in todo:
419 instance = definition(target)
420 log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
421 (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
422 log_fd = open(log_name, 'w')
423 print "Running", name, "...",
424 sys.stdout.flush()
425 log_fd.write("Test: %s\n" % name)
426 log_fd.write("Target: %s\n" % type(target).__name__)
427 start = time.time()
428 real_stdout = sys.stdout
429 sys.stdout = log_fd
430 try:
431 result = instance.run()
432 log_fd.write("Result: %s\n" % result)
433 finally:
434 sys.stdout = real_stdout
435 log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
436 print "%s in %.2fs" % (result, time.time() - start)
437 sys.stdout.flush()
438 results.setdefault(result, []).append(name)
439 count += 1
440 if result not in good_results and parsed.fail_fast:
441 break
442
443 return results, count
444
445 def print_results(results):
446 result = 0
447 for key, value in results.iteritems():
448 print "%d tests returned %s" % (len(value), key)
449 if key not in good_results:
450 result = 1
451 for test in value:
452 print " ", test
453
454 return result
455
456 def add_test_run_options(parser):
457
458 parser.add_argument("--logs", default="logs",
459 help="Store logs in the specified directory.")
460 parser.add_argument("--fail-fast", "-f", action="store_true",
461 help="Exit as soon as any test fails.")
462 parser.add_argument("test", nargs='*',
463 help="Run only tests that are named here.")
464 parser.add_argument("--gdb",
465 help="The command to use to start gdb.")
466 parser.add_argument("--misaval",
467 help="Don't run ExamineTarget, just assume the misa value which is "
468 "specified.")
469
470 def header(title, dash='-', length=78):
471 if title:
472 dashes = dash * (length - 4 - len(title))
473 before = dashes[:len(dashes)/2]
474 after = dashes[len(dashes)/2:]
475 print "%s[ %s ]%s" % (before, title, after)
476 else:
477 print dash * length
478
479 def print_log(path):
480 header(path)
481 lines = open(path, "r").readlines()
482 for l in lines:
483 sys.stdout.write(l)
484 print
485
486 class BaseTest(object):
487 compiled = {}
488
489 def __init__(self, target):
490 self.target = target
491 self.server = None
492 self.target_process = None
493 self.binary = None
494 self.start = 0
495 self.logs = []
496
497 def early_applicable(self):
498 """Return a false value if the test has determined it cannot run
499 without ever needing to talk to the target or server."""
500 # pylint: disable=no-self-use
501 return True
502
503 def setup(self):
504 pass
505
506 def compile(self):
507 compile_args = getattr(self, 'compile_args', None)
508 if compile_args:
509 if compile_args not in BaseTest.compiled:
510 # pylint: disable=star-args
511 BaseTest.compiled[compile_args] = \
512 self.target.compile(*compile_args)
513 self.binary = BaseTest.compiled.get(compile_args)
514
515 def classSetup(self):
516 self.compile()
517 self.target_process = self.target.target()
518 self.server = self.target.server()
519 self.logs.append(self.server.logname)
520
521 def classTeardown(self):
522 del self.server
523 del self.target_process
524
525 def run(self):
526 """
527 If compile_args is set, compile a program and set self.binary.
528
529 Call setup().
530
531 Then call test() and return the result, displaying relevant information
532 if an exception is raised.
533 """
534
535 sys.stdout.flush()
536
537 if not self.early_applicable():
538 return "not_applicable"
539
540 self.start = time.time()
541
542 self.classSetup()
543
544 try:
545 self.setup()
546 result = self.test() # pylint: disable=no-member
547 except TestNotApplicable:
548 result = "not_applicable"
549 except Exception as e: # pylint: disable=broad-except
550 if isinstance(e, TestFailed):
551 result = "fail"
552 else:
553 result = "exception"
554 if isinstance(e, TestFailed):
555 header("Message")
556 print e.message
557 header("Traceback")
558 traceback.print_exc(file=sys.stdout)
559 return result
560
561 finally:
562 for log in self.logs:
563 print_log(log)
564 header("End of logs")
565 self.classTeardown()
566
567 if not result:
568 result = 'pass'
569 return result
570
571 gdb_cmd = None
572 class GdbTest(BaseTest):
573 def __init__(self, target):
574 BaseTest.__init__(self, target)
575 self.gdb = None
576
577 def classSetup(self):
578 BaseTest.classSetup(self)
579
580 if gdb_cmd:
581 self.gdb = Gdb(gdb_cmd)
582 else:
583 self.gdb = Gdb()
584
585 self.logs.append(self.gdb.logname)
586
587 if self.binary:
588 self.gdb.command("file %s" % self.binary)
589 if self.target:
590 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
591 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
592 if self.server.port:
593 self.gdb.command(
594 "target extended-remote localhost:%d" % self.server.port)
595 # Select a random thread.
596 # TODO: Allow a command line option to force a specific thread.
597 output = self.gdb.command("info threads")
598 threads = re.findall(r"Thread (\d+)", output)
599 if threads:
600 thread = random.choice(threads)
601 self.gdb.command("thread %s" % thread)
602
603 # FIXME: OpenOCD doesn't handle PRIV now
604 #self.gdb.p("$priv=3")
605
606 def classTeardown(self):
607 del self.gdb
608 BaseTest.classTeardown(self)
609
610 class ExamineTarget(GdbTest):
611 def test(self):
612 self.target.misa = self.gdb.p("$misa")
613
614 txt = "RV"
615 if (self.target.misa >> 30) == 1:
616 txt += "32"
617 elif (self.target.misa >> 62) == 2:
618 txt += "64"
619 elif (self.target.misa >> 126) == 3:
620 txt += "128"
621 else:
622 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
623 self.target.misa)
624
625 for i in range(26):
626 if self.target.misa & (1<<i):
627 txt += chr(i + ord('A'))
628 print txt,
629
630 class TestFailed(Exception):
631 def __init__(self, message):
632 Exception.__init__(self)
633 self.message = message
634
635 class TestNotApplicable(Exception):
636 def __init__(self, message):
637 Exception.__init__(self)
638 self.message = message
639
640 def assertEqual(a, b):
641 if a != b:
642 raise TestFailed("%r != %r" % (a, b))
643
644 def assertNotEqual(a, b):
645 if a == b:
646 raise TestFailed("%r == %r" % (a, b))
647
648 def assertIn(a, b):
649 if a not in b:
650 raise TestFailed("%r not in %r" % (a, b))
651
652 def assertNotIn(a, b):
653 if a in b:
654 raise TestFailed("%r in %r" % (a, b))
655
656 def assertGreater(a, b):
657 if not a > b:
658 raise TestFailed("%r not greater than %r" % (a, b))
659
660 def assertLess(a, b):
661 if not a < b:
662 raise TestFailed("%r not less than %r" % (a, b))
663
664 def assertTrue(a):
665 if not a:
666 raise TestFailed("%r is not True" % a)
667
668 def assertRegexpMatches(text, regexp):
669 if not re.search(regexp, text):
670 raise TestFailed("can't find %r in %r" % (regexp, text))