Add reproduce line to the end of debug test logs
[riscv-tests.git] / debug / testlib.py
1 import collections
2 import os
3 import os.path
4 import random
5 import re
6 import shlex
7 import subprocess
8 import sys
9 import tempfile
10 import time
11 import traceback
12 import pipes
13
14 import pexpect
15
16 print_log_names = False
17 real_stdout = sys.stdout
18
19 # Note that gdb comes with its own testsuite. I was unable to figure out how to
20 # run that testsuite against the spike simulator.
21
22 def find_file(path):
23 for directory in (os.getcwd(), os.path.dirname(__file__)):
24 fullpath = os.path.join(directory, path)
25 relpath = os.path.relpath(fullpath)
26 if len(relpath) >= len(fullpath):
27 relpath = fullpath
28 if os.path.exists(relpath):
29 return relpath
30 return None
31
32 def compile(args, xlen=32): # pylint: disable=redefined-builtin
33 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
34 cmd = [cc, "-g"]
35 if xlen == 32:
36 cmd.append("-march=rv32imac")
37 cmd.append("-mabi=ilp32")
38 else:
39 cmd.append("-march=rv64imac")
40 cmd.append("-mabi=lp64")
41 for arg in args:
42 found = find_file(arg)
43 if found:
44 cmd.append(found)
45 else:
46 cmd.append(arg)
47 header("Compile")
48 print "+", " ".join(cmd)
49 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
50 stderr=subprocess.PIPE)
51 stdout, stderr = process.communicate()
52 if process.returncode:
53 print stdout,
54 print stderr,
55 header("")
56 raise Exception("Compile failed!")
57
58 class Spike(object):
59 def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True,
60 isa=None, progbufsize=None):
61 """Launch spike. Return tuple of its process and the port it's running
62 on."""
63 self.process = None
64 self.isa = isa
65 self.progbufsize = progbufsize
66
67 if target.harts:
68 harts = target.harts
69 else:
70 harts = [target]
71
72 cmd = self.command(target, harts, halted, timeout, with_jtag_gdb)
73 self.infinite_loop = target.compile(harts[0],
74 "programs/checksum.c", "programs/tiny-malloc.c",
75 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
76 cmd.append(self.infinite_loop)
77 self.logfile = tempfile.NamedTemporaryFile(prefix="spike-",
78 suffix=".log")
79 self.logname = self.logfile.name
80 if print_log_names:
81 real_stdout.write("Temporary spike log: %s\n" % self.logname)
82 self.logfile.write("+ %s\n" % " ".join(cmd))
83 self.logfile.flush()
84 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
85 stdout=self.logfile, stderr=self.logfile)
86
87 if with_jtag_gdb:
88 self.port = None
89 for _ in range(30):
90 m = re.search(r"Listening for remote bitbang connection on "
91 r"port (\d+).", open(self.logname).read())
92 if m:
93 self.port = int(m.group(1))
94 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
95 break
96 time.sleep(0.11)
97 if not self.port:
98 print_log(self.logname)
99 raise Exception("Didn't get spike message about bitbang "
100 "connection")
101
102 def command(self, target, harts, halted, timeout, with_jtag_gdb):
103 # pylint: disable=no-self-use
104 if target.sim_cmd:
105 cmd = shlex.split(target.sim_cmd)
106 else:
107 spike = os.path.expandvars("$RISCV/bin/spike")
108 cmd = [spike]
109
110 cmd += ["-p%d" % len(harts)]
111
112 assert len(set(t.xlen for t in harts)) == 1, \
113 "All spike harts must have the same XLEN"
114
115 if self.isa:
116 isa = self.isa
117 else:
118 isa = "RV%dG" % harts[0].xlen
119
120 cmd += ["--isa", isa]
121 cmd += ["--debug-auth"]
122
123 if not self.progbufsize is None:
124 cmd += ["--progsize", str(self.progbufsize)]
125 cmd += ["--debug-sba", "32"]
126
127 assert len(set(t.ram for t in harts)) == 1, \
128 "All spike harts must have the same RAM layout"
129 assert len(set(t.ram_size for t in harts)) == 1, \
130 "All spike harts must have the same RAM layout"
131 cmd += ["-m0x%x:0x%x" % (harts[0].ram, harts[0].ram_size)]
132
133 if timeout:
134 cmd = ["timeout", str(timeout)] + cmd
135
136 if halted:
137 cmd.append('-H')
138 if with_jtag_gdb:
139 cmd += ['--rbb-port', '0']
140 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
141
142 return cmd
143
144 def __del__(self):
145 if self.process:
146 try:
147 self.process.kill()
148 self.process.wait()
149 except OSError:
150 pass
151
152 def wait(self, *args, **kwargs):
153 return self.process.wait(*args, **kwargs)
154
155 class VcsSim(object):
156 logfile = tempfile.NamedTemporaryFile(prefix='simv', suffix='.log')
157 logname = logfile.name
158
159 def __init__(self, sim_cmd=None, debug=False, timeout=300):
160 if sim_cmd:
161 cmd = shlex.split(sim_cmd)
162 else:
163 cmd = ["simv"]
164 cmd += ["+jtag_vpi_enable"]
165 if debug:
166 cmd[0] = cmd[0] + "-debug"
167 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
168
169 logfile = open(self.logname, "w")
170 if print_log_names:
171 real_stdout.write("Temporary VCS log: %s\n" % self.logname)
172 logfile.write("+ %s\n" % " ".join(cmd))
173 logfile.flush()
174
175 listenfile = open(self.logname, "r")
176 listenfile.seek(0, 2)
177 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
178 stdout=logfile, stderr=logfile)
179 done = False
180 start = time.time()
181 while not done:
182 # Fail if VCS exits early
183 exit_code = self.process.poll()
184 if exit_code is not None:
185 raise RuntimeError('VCS simulator exited early with status %d'
186 % exit_code)
187
188 line = listenfile.readline()
189 if not line:
190 time.sleep(1)
191 match = re.match(r"^Listening on port (\d+)$", line)
192 if match:
193 done = True
194 self.port = int(match.group(1))
195 os.environ['JTAG_VPI_PORT'] = str(self.port)
196
197 if (time.time() - start) > timeout:
198 raise Exception("Timed out waiting for VCS to listen for JTAG "
199 "vpi")
200
201 def __del__(self):
202 try:
203 self.process.kill()
204 self.process.wait()
205 except OSError:
206 pass
207
208 class Openocd(object):
209 logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')
210 logname = logfile.name
211
212 def __init__(self, server_cmd=None, config=None, debug=False, timeout=60):
213 self.timeout = timeout
214
215 if server_cmd:
216 cmd = shlex.split(server_cmd)
217 else:
218 openocd = os.path.expandvars("$RISCV/bin/openocd")
219 cmd = [openocd]
220 if debug:
221 cmd.append("-d")
222
223 # This command needs to come before any config scripts on the command
224 # line, since they are executed in order.
225 cmd += [
226 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
227 "--command",
228 "gdb_port 0",
229 # Disable tcl and telnet servers, since they are unused and because
230 # the port numbers will conflict if multiple OpenOCD processes are
231 # running on the same server.
232 "--command",
233 "tcl_port disabled",
234 "--command",
235 "telnet_port disabled",
236 ]
237
238 if config:
239 f = find_file(config)
240 if f is None:
241 print "Unable to read file " + config
242 exit(1)
243
244 cmd += ["-f", f]
245 if debug:
246 cmd.append("-d")
247
248 logfile = open(Openocd.logname, "w")
249 if print_log_names:
250 real_stdout.write("Temporary OpenOCD log: %s\n" % Openocd.logname)
251 env_entries = ("REMOTE_BITBANG_HOST", "REMOTE_BITBANG_PORT")
252 env_entries = [key for key in env_entries if key in os.environ]
253 logfile.write("+ %s%s\n" % (
254 "".join("%s=%s " % (key, os.environ[key]) for key in env_entries),
255 " ".join(map(pipes.quote, cmd))))
256 logfile.flush()
257
258 self.gdb_ports = []
259 self.process = self.start(cmd, logfile)
260
261 def start(self, cmd, logfile):
262 process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
263 stdout=logfile, stderr=logfile)
264
265 try:
266 # Wait for OpenOCD to have made it through riscv_examine(). When
267 # using OpenOCD to communicate with a simulator this may take a
268 # long time, and gdb will time out when trying to connect if we
269 # attempt too early.
270 start = time.time()
271 messaged = False
272 fd = open(Openocd.logname, "r")
273 while True:
274 line = fd.readline()
275 if not line:
276 if not process.poll() is None:
277 raise Exception("OpenOCD exited early.")
278 time.sleep(0.1)
279 continue
280
281 m = re.search(r"Listening on port (\d+) for gdb connections",
282 line)
283 if m:
284 self.gdb_ports.append(int(m.group(1)))
285
286 if "telnet server disabled" in line:
287 return process
288
289 if not messaged and time.time() - start > 1:
290 messaged = True
291 print "Waiting for OpenOCD to start..."
292 if (time.time() - start) > self.timeout:
293 raise Exception("Timed out waiting for OpenOCD to "
294 "listen for gdb")
295
296 except Exception:
297 print_log(Openocd.logname)
298 raise
299
300 def __del__(self):
301 try:
302 self.process.kill()
303 self.process.wait()
304 except (OSError, AttributeError):
305 pass
306
307 class OpenocdCli(object):
308 def __init__(self, port=4444):
309 self.child = pexpect.spawn(
310 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
311 self.child.expect("> ")
312
313 def command(self, cmd):
314 self.child.sendline(cmd)
315 self.child.expect(cmd)
316 self.child.expect("\n")
317 self.child.expect("> ")
318 return self.child.before.strip("\t\r\n \0")
319
320 def reg(self, reg=''):
321 output = self.command("reg %s" % reg)
322 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
323 values = {r: int(v, 0) for r, v in matches}
324 if reg:
325 return values[reg]
326 return values
327
328 def load_image(self, image):
329 output = self.command("load_image %s" % image)
330 if 'invalid ELF file, only 32bits files are supported' in output:
331 raise TestNotApplicable(output)
332
333 class CannotAccess(Exception):
334 def __init__(self, address):
335 Exception.__init__(self)
336 self.address = address
337
338 class CouldNotFetch(Exception):
339 def __init__(self, regname, explanation):
340 Exception.__init__(self)
341 self.regname = regname
342 self.explanation = explanation
343
344 Thread = collections.namedtuple('Thread', ('id', 'description', 'target_id',
345 'name', 'frame'))
346
347 class Gdb(object):
348 """A single gdb class which can interact with one or more gdb instances."""
349
350 # pylint: disable=too-many-public-methods
351 # pylint: disable=too-many-instance-attributes
352
353 def __init__(self, ports,
354 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb"),
355 timeout=60, binary=None):
356 assert ports
357
358 self.ports = ports
359 self.cmd = cmd
360 self.timeout = timeout
361 self.binary = binary
362
363 self.stack = []
364 self.harts = {}
365
366 self.logfiles = []
367 self.children = []
368 for port in ports:
369 logfile = tempfile.NamedTemporaryFile(prefix="gdb@%d-" % port,
370 suffix=".log")
371 self.logfiles.append(logfile)
372 if print_log_names:
373 real_stdout.write("Temporary gdb log: %s\n" % logfile.name)
374 child = pexpect.spawn(cmd)
375 child.logfile = logfile
376 child.logfile.write("+ %s\n" % cmd)
377 self.children.append(child)
378 self.active_child = self.children[0]
379
380 def connect(self):
381 for port, child in zip(self.ports, self.children):
382 self.select_child(child)
383 self.wait()
384 self.command("set confirm off")
385 self.command("set width 0")
386 self.command("set height 0")
387 # Force consistency.
388 self.command("set print entry-values no")
389 self.command("set remotetimeout %d" % self.timeout)
390 self.command("target extended-remote localhost:%d" % port)
391 if self.binary:
392 self.command("file %s" % self.binary)
393 threads = self.threads()
394 for t in threads:
395 hartid = None
396 if t.name:
397 m = re.search(r"Hart (\d+)", t.name)
398 if m:
399 hartid = int(m.group(1))
400 if hartid is None:
401 if self.harts:
402 hartid = max(self.harts) + 1
403 else:
404 hartid = 0
405 # solo: True iff this is the only thread on this child
406 self.harts[hartid] = {'child': child,
407 'thread': t,
408 'solo': len(threads) == 1}
409
410 def __del__(self):
411 for child in self.children:
412 del child
413
414 def one_hart_per_gdb(self):
415 return all(h['solo'] for h in self.harts.itervalues())
416
417 def lognames(self):
418 return [logfile.name for logfile in self.logfiles]
419
420 def select_child(self, child):
421 self.active_child = child
422
423 def select_hart(self, hart):
424 h = self.harts[hart.id]
425 self.select_child(h['child'])
426 if not h['solo']:
427 output = self.command("thread %s" % h['thread'].id, ops=5)
428 assert "Unknown" not in output
429
430 def push_state(self):
431 self.stack.append({
432 'active_child': self.active_child
433 })
434
435 def pop_state(self):
436 state = self.stack.pop()
437 self.active_child = state['active_child']
438
439 def wait(self):
440 """Wait for prompt."""
441 self.active_child.expect(r"\(gdb\)")
442
443 def command(self, command, ops=1):
444 """ops is the estimated number of operations gdb will have to perform
445 to perform this command. It is used to compute a timeout based on
446 self.timeout."""
447 timeout = ops * self.timeout
448 self.active_child.sendline(command)
449 self.active_child.expect("\n", timeout=timeout)
450 self.active_child.expect(r"\(gdb\)", timeout=timeout)
451 return self.active_child.before.strip()
452
453 def global_command(self, command):
454 """Execute this command on every gdb that we control."""
455 with PrivateState(self):
456 for child in self.children:
457 self.select_child(child)
458 self.command(command)
459
460 def c(self, wait=True, async=False):
461 """
462 Dumb c command.
463 In RTOS mode, gdb will resume all harts.
464 In multi-gdb mode, this command will just go to the current gdb, so
465 will only resume one hart.
466 """
467 if async:
468 async = "&"
469 else:
470 async = ""
471 ops = 10
472 if wait:
473 output = self.command("c%s" % async, ops=ops)
474 assert "Continuing" in output
475 return output
476 else:
477 self.active_child.sendline("c%s" % async)
478 self.active_child.expect("Continuing", timeout=ops * self.timeout)
479
480 def c_all(self, wait=True):
481 """
482 Resume every hart.
483
484 This function works fine when using multiple gdb sessions, but the
485 caller must be careful when using it nonetheless. gdb's behavior is to
486 not set breakpoints until just before the hart is resumed, and then
487 clears them as soon as the hart halts. That means that you can't set
488 one software breakpoint, and expect multiple harts to hit it. It's
489 possible that the first hart completes set/run/halt/clear before the
490 second hart even gets to resume, so it will never hit the breakpoint.
491 """
492 with PrivateState(self):
493 for child in self.children:
494 child.sendline("c")
495 child.expect("Continuing")
496
497 if wait:
498 for child in self.children:
499 child.expect(r"\(gdb\)")
500
501 def interrupt(self):
502 self.active_child.send("\003")
503 self.active_child.expect(r"\(gdb\)", timeout=6000)
504 return self.active_child.before.strip()
505
506 def interrupt_all(self):
507 for child in self.children:
508 self.select_child(child)
509 self.interrupt()
510
511 def x(self, address, size='w'):
512 output = self.command("x/%s %s" % (size, address))
513 value = int(output.split(':')[1].strip(), 0)
514 return value
515
516 def p_raw(self, obj):
517 output = self.command("p %s" % obj)
518 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
519 if m:
520 raise CannotAccess(int(m.group(1), 0))
521 return output.split('=')[-1].strip()
522
523 def parse_string(self, text):
524 text = text.strip()
525 if text.startswith("{") and text.endswith("}"):
526 inner = text[1:-1]
527 return [self.parse_string(t) for t in inner.split(", ")]
528 elif text.startswith('"') and text.endswith('"'):
529 return text[1:-1]
530 else:
531 return int(text, 0)
532
533 def p(self, obj, fmt="/x"):
534 output = self.command("p%s %s" % (fmt, obj))
535 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
536 if m:
537 raise CannotAccess(int(m.group(1), 0))
538 m = re.search(r"Could not fetch register \"(\w+)\"; (.*)$", output)
539 if m:
540 raise CouldNotFetch(m.group(1), m.group(2))
541 rhs = output.split('=')[-1]
542 return self.parse_string(rhs)
543
544 def p_string(self, obj):
545 output = self.command("p %s" % obj)
546 value = shlex.split(output.split('=')[-1].strip())[1]
547 return value
548
549 def stepi(self):
550 output = self.command("stepi", ops=10)
551 return output
552
553 def load(self):
554 output = self.command("load", ops=1000)
555 assert "failed" not in output
556 assert "Transfer rate" in output
557 output = self.command("compare-sections", ops=1000)
558 assert "MIS" not in output
559
560 def b(self, location):
561 output = self.command("b %s" % location, ops=5)
562 assert "not defined" not in output
563 assert "Breakpoint" in output
564 return output
565
566 def hbreak(self, location):
567 output = self.command("hbreak %s" % location, ops=5)
568 assert "not defined" not in output
569 assert "Hardware assisted breakpoint" in output
570 return output
571
572 def threads(self):
573 output = self.command("info threads", ops=100)
574 threads = []
575 for line in output.splitlines():
576 m = re.match(
577 r"[\s\*]*(\d+)\s*"
578 r"(Remote target|Thread (\d+)\s*\(Name: ([^\)]+))"
579 r"\s*(.*)", line)
580 if m:
581 threads.append(Thread(*m.groups()))
582 assert threads
583 #>>>if not threads:
584 #>>> threads.append(Thread('1', '1', 'Default', '???'))
585 return threads
586
587 def thread(self, thread):
588 return self.command("thread %s" % thread.id)
589
590 def where(self):
591 return self.command("where 1")
592
593 class PrivateState(object):
594 def __init__(self, gdb):
595 self.gdb = gdb
596
597 def __enter__(self):
598 self.gdb.push_state()
599
600 def __exit__(self, _type, _value, _traceback):
601 self.gdb.pop_state()
602
603 def run_all_tests(module, target, parsed):
604 try:
605 os.makedirs(parsed.logs)
606 except OSError:
607 # There's a race where multiple instances of the test program might
608 # decide to create the logs directory at the same time.
609 pass
610
611 overall_start = time.time()
612
613 global gdb_cmd # pylint: disable=global-statement
614 gdb_cmd = parsed.gdb
615
616 todo = []
617 examine_added = False
618 for hart in target.harts:
619 if parsed.misaval:
620 hart.misa = int(parsed.misaval, 16)
621 print "Using $misa from command line: 0x%x" % hart.misa
622 elif hart.misa:
623 print "Using $misa from hart definition: 0x%x" % hart.misa
624 elif not examine_added:
625 todo.append(("ExamineTarget", ExamineTarget, None))
626 examine_added = True
627
628 for name in dir(module):
629 definition = getattr(module, name)
630 if isinstance(definition, type) and hasattr(definition, 'test') and \
631 (not parsed.test or any(test in name for test in parsed.test)):
632 todo.append((name, definition, None))
633
634 results, count = run_tests(parsed, target, todo)
635
636 header("ran %d tests in %.0fs" % (count, time.time() - overall_start),
637 dash=':')
638
639 return print_results(results)
640
641 good_results = set(('pass', 'not_applicable'))
642 def run_tests(parsed, target, todo):
643 results = {}
644 count = 0
645
646 for name, definition, hart in todo:
647 log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
648 (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
649 log_fd = open(log_name, 'w')
650 print "[%s] Starting > %s" % (name, log_name)
651 instance = definition(target, hart)
652 sys.stdout.flush()
653 log_fd.write("Test: %s\n" % name)
654 log_fd.write("Target: %s\n" % type(target).__name__)
655 start = time.time()
656 global real_stdout # pylint: disable=global-statement
657 real_stdout = sys.stdout
658 sys.stdout = log_fd
659 try:
660 result = instance.run()
661 log_fd.write("Result: %s\n" % result)
662 log_fd.write("Logfile: %s\n" % log_name)
663 log_fd.write("Reproduce: %s %s %s\n" % (sys.argv[0], parsed.target,
664 name))
665 finally:
666 sys.stdout = real_stdout
667 log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
668 log_fd.flush()
669 print "[%s] %s in %.2fs" % (name, result, time.time() - start)
670 if result not in good_results and parsed.print_failures:
671 sys.stdout.write(open(log_name).read())
672 sys.stdout.flush()
673 results.setdefault(result, []).append((name, log_name))
674 count += 1
675 if result not in good_results and parsed.fail_fast:
676 break
677
678 return results, count
679
680 def print_results(results):
681 result = 0
682 for key, value in results.iteritems():
683 print "%d tests returned %s" % (len(value), key)
684 if key not in good_results:
685 result = 1
686 for name, log_name in value:
687 print " %s > %s" % (name, log_name)
688
689 return result
690
691 def add_test_run_options(parser):
692 parser.add_argument("--logs", default="logs",
693 help="Store logs in the specified directory.")
694 parser.add_argument("--fail-fast", "-f", action="store_true",
695 help="Exit as soon as any test fails.")
696 parser.add_argument("--print-failures", action="store_true",
697 help="When a test fails, print the log file to stdout.")
698 parser.add_argument("--print-log-names", "--pln", action="store_true",
699 help="Print names of temporary log files as soon as they are "
700 "created.")
701 parser.add_argument("test", nargs='*',
702 help="Run only tests that are named here.")
703 parser.add_argument("--gdb",
704 help="The command to use to start gdb.")
705 parser.add_argument("--misaval",
706 help="Don't run ExamineTarget, just assume the misa value which is "
707 "specified.")
708
709 def header(title, dash='-', length=78):
710 if title:
711 dashes = dash * (length - 4 - len(title))
712 before = dashes[:len(dashes)/2]
713 after = dashes[len(dashes)/2:]
714 print "%s[ %s ]%s" % (before, title, after)
715 else:
716 print dash * length
717
718 def print_log(path):
719 header(path)
720 for l in open(path, "r"):
721 sys.stdout.write(l)
722 print
723
724 class BaseTest(object):
725 compiled = {}
726
727 def __init__(self, target, hart=None):
728 self.target = target
729 if hart:
730 self.hart = hart
731 else:
732 self.hart = random.choice(target.harts)
733 self.hart = target.harts[-1] #<<<
734 self.server = None
735 self.target_process = None
736 self.binary = None
737 self.start = 0
738 self.logs = []
739
740 def early_applicable(self):
741 """Return a false value if the test has determined it cannot run
742 without ever needing to talk to the target or server."""
743 # pylint: disable=no-self-use
744 return True
745
746 def setup(self):
747 pass
748
749 def compile(self):
750 compile_args = getattr(self, 'compile_args', None)
751 if compile_args:
752 if compile_args not in BaseTest.compiled:
753 BaseTest.compiled[compile_args] = \
754 self.target.compile(self.hart, *compile_args)
755 self.binary = BaseTest.compiled.get(compile_args)
756
757 def classSetup(self):
758 self.compile()
759 self.target_process = self.target.create()
760 if self.target_process:
761 self.logs.append(self.target_process.logname)
762 try:
763 self.server = self.target.server()
764 self.logs.append(self.server.logname)
765 except Exception:
766 for log in self.logs:
767 print_log(log)
768 raise
769
770 def classTeardown(self):
771 del self.server
772 del self.target_process
773
774 def postMortem(self):
775 pass
776
777 def run(self):
778 """
779 If compile_args is set, compile a program and set self.binary.
780
781 Call setup().
782
783 Then call test() and return the result, displaying relevant information
784 if an exception is raised.
785 """
786
787 sys.stdout.flush()
788
789 if not self.early_applicable():
790 return "not_applicable"
791
792 self.start = time.time()
793
794 try:
795 self.classSetup()
796 self.setup()
797 result = self.test() # pylint: disable=no-member
798 except TestNotApplicable:
799 result = "not_applicable"
800 except Exception as e: # pylint: disable=broad-except
801 if isinstance(e, TestFailed):
802 result = "fail"
803 else:
804 result = "exception"
805 if isinstance(e, TestFailed):
806 header("Message")
807 print e.message
808 header("Traceback")
809 traceback.print_exc(file=sys.stdout)
810 try:
811 self.postMortem()
812 except Exception as e: # pylint: disable=broad-except
813 header("postMortem Exception")
814 print e
815 traceback.print_exc(file=sys.stdout)
816 return result
817
818 finally:
819 for log in self.logs:
820 print_log(log)
821 header("End of logs")
822 self.classTeardown()
823
824 if not result:
825 result = 'pass'
826 return result
827
828 gdb_cmd = None
829 class GdbTest(BaseTest):
830 def __init__(self, target, hart=None):
831 BaseTest.__init__(self, target, hart=hart)
832 self.gdb = None
833
834 def classSetup(self):
835 BaseTest.classSetup(self)
836
837 if gdb_cmd:
838 self.gdb = Gdb(self.server.gdb_ports, gdb_cmd,
839 timeout=self.target.timeout_sec, binary=self.binary)
840 else:
841 self.gdb = Gdb(self.server.gdb_ports,
842 timeout=self.target.timeout_sec, binary=self.binary)
843
844 self.logs += self.gdb.lognames()
845 self.gdb.connect()
846
847 self.gdb.global_command("set remotetimeout %d" %
848 self.target.timeout_sec)
849
850 for cmd in self.target.gdb_setup:
851 self.gdb.command(cmd)
852
853 self.gdb.select_hart(self.hart)
854
855 # FIXME: OpenOCD doesn't handle PRIV now
856 #self.gdb.p("$priv=3")
857
858 def postMortem(self):
859 if not self.gdb:
860 return
861 self.gdb.interrupt()
862 self.gdb.command("disassemble", ops=20)
863 self.gdb.command("info registers all", ops=100)
864 self.gdb.command("flush regs")
865 self.gdb.command("info threads", ops=100)
866
867 def classTeardown(self):
868 del self.gdb
869 BaseTest.classTeardown(self)
870
871 def parkOtherHarts(self):
872 """Park harts besides the currently selected one in loop_forever()."""
873 for hart in self.target.harts:
874 # Park all harts that we're not using in a safe place.
875 if hart != self.hart:
876 self.gdb.select_hart(hart)
877 self.gdb.p("$pc=loop_forever")
878
879 self.gdb.select_hart(self.hart)
880
881 class GdbSingleHartTest(GdbTest):
882 def classSetup(self):
883 GdbTest.classSetup(self)
884 self.parkOtherHarts()
885
886 class ExamineTarget(GdbTest):
887 def test(self):
888 for hart in self.target.harts:
889 self.gdb.select_hart(hart)
890
891 hart.misa = self.gdb.p("$misa")
892
893 txt = "RV"
894 misa_xlen = 0
895 if ((hart.misa & 0xFFFFFFFF) >> 30) == 1:
896 misa_xlen = 32
897 elif ((hart.misa & 0xFFFFFFFFFFFFFFFF) >> 62) == 2:
898 misa_xlen = 64
899 elif ((hart.misa & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) >> 126) == 3:
900 misa_xlen = 128
901 else:
902 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
903 self.hart.misa)
904
905 if misa_xlen != hart.xlen:
906 raise TestFailed("MISA reported XLEN of %d but we were "\
907 "expecting XLEN of %d\n" % (misa_xlen, hart.xlen))
908
909 txt += ("%d" % misa_xlen)
910
911 for i in range(26):
912 if hart.misa & (1<<i):
913 txt += chr(i + ord('A'))
914 print txt,
915
916 class TestFailed(Exception):
917 def __init__(self, message):
918 Exception.__init__(self)
919 self.message = message
920
921 class TestNotApplicable(Exception):
922 def __init__(self, message):
923 Exception.__init__(self)
924 self.message = message
925
926 def assertEqual(a, b):
927 if a != b:
928 raise TestFailed("%r != %r" % (a, b))
929
930 def assertNotEqual(a, b):
931 if a == b:
932 raise TestFailed("%r == %r" % (a, b))
933
934 def assertIn(a, b):
935 if a not in b:
936 raise TestFailed("%r not in %r" % (a, b))
937
938 def assertNotIn(a, b):
939 if a in b:
940 raise TestFailed("%r in %r" % (a, b))
941
942 def assertGreater(a, b):
943 if not a > b:
944 raise TestFailed("%r not greater than %r" % (a, b))
945
946 def assertLess(a, b):
947 if not a < b:
948 raise TestFailed("%r not less than %r" % (a, b))
949
950 def assertTrue(a):
951 if not a:
952 raise TestFailed("%r is not True" % a)
953
954 def assertRegexpMatches(text, regexp):
955 if not re.search(regexp, text):
956 raise TestFailed("can't find %r in %r" % (regexp, text))