Test resuming from a trigger.
[riscv-tests.git] / debug / testlib.py
1 import collections
2 import os
3 import os.path
4 import random
5 import re
6 import shlex
7 import subprocess
8 import sys
9 import tempfile
10 import time
11 import traceback
12 import pipes
13
14 import pexpect
15
16 print_log_names = False
17 real_stdout = sys.stdout
18
19 # Note that gdb comes with its own testsuite. I was unable to figure out how to
20 # run that testsuite against the spike simulator.
21
22 def find_file(path):
23 for directory in (os.getcwd(), os.path.dirname(__file__)):
24 fullpath = os.path.join(directory, path)
25 relpath = os.path.relpath(fullpath)
26 if len(relpath) >= len(fullpath):
27 relpath = fullpath
28 if os.path.exists(relpath):
29 return relpath
30 return None
31
32 def compile(args, xlen=32): # pylint: disable=redefined-builtin
33 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
34 cmd = [cc, "-g"]
35 if xlen == 32:
36 cmd.append("-march=rv32imac")
37 cmd.append("-mabi=ilp32")
38 else:
39 cmd.append("-march=rv64imac")
40 cmd.append("-mabi=lp64")
41 for arg in args:
42 found = find_file(arg)
43 if found:
44 cmd.append(found)
45 else:
46 cmd.append(arg)
47 header("Compile")
48 print "+", " ".join(cmd)
49 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
50 stderr=subprocess.PIPE)
51 stdout, stderr = process.communicate()
52 if process.returncode:
53 print stdout,
54 print stderr,
55 header("")
56 raise Exception("Compile failed!")
57
58 class Spike(object):
59 def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True,
60 isa=None):
61 """Launch spike. Return tuple of its process and the port it's running
62 on."""
63 self.process = None
64 self.isa = isa
65
66 if target.harts:
67 harts = target.harts
68 else:
69 harts = [target]
70
71 cmd = self.command(target, harts, halted, timeout, with_jtag_gdb)
72 self.infinite_loop = target.compile(harts[0],
73 "programs/checksum.c", "programs/tiny-malloc.c",
74 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
75 cmd.append(self.infinite_loop)
76 self.logfile = tempfile.NamedTemporaryFile(prefix="spike-",
77 suffix=".log")
78 self.logname = self.logfile.name
79 if print_log_names:
80 real_stdout.write("Temporary spike log: %s\n" % self.logname)
81 self.logfile.write("+ %s\n" % " ".join(cmd))
82 self.logfile.flush()
83 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
84 stdout=self.logfile, stderr=self.logfile)
85
86 if with_jtag_gdb:
87 self.port = None
88 for _ in range(30):
89 m = re.search(r"Listening for remote bitbang connection on "
90 r"port (\d+).", open(self.logname).read())
91 if m:
92 self.port = int(m.group(1))
93 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
94 break
95 time.sleep(0.11)
96 if not self.port:
97 print_log(self.logname)
98 raise Exception("Didn't get spike message about bitbang "
99 "connection")
100
101 def command(self, target, harts, halted, timeout, with_jtag_gdb):
102 # pylint: disable=no-self-use
103 if target.sim_cmd:
104 cmd = shlex.split(target.sim_cmd)
105 else:
106 spike = os.path.expandvars("$RISCV/bin/spike")
107 cmd = [spike]
108
109 cmd += ["-p%d" % len(harts)]
110
111 assert len(set(t.xlen for t in harts)) == 1, \
112 "All spike harts must have the same XLEN"
113
114 if self.isa:
115 isa = self.isa
116 else:
117 isa = "RV%dG" % harts[0].xlen
118
119 cmd += ["--isa", isa]
120
121 assert len(set(t.ram for t in harts)) == 1, \
122 "All spike harts must have the same RAM layout"
123 assert len(set(t.ram_size for t in harts)) == 1, \
124 "All spike harts must have the same RAM layout"
125 cmd += ["-m0x%x:0x%x" % (harts[0].ram, harts[0].ram_size)]
126
127 if timeout:
128 cmd = ["timeout", str(timeout)] + cmd
129
130 if halted:
131 cmd.append('-H')
132 if with_jtag_gdb:
133 cmd += ['--rbb-port', '0']
134 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
135
136 return cmd
137
138 def __del__(self):
139 if self.process:
140 try:
141 self.process.kill()
142 self.process.wait()
143 except OSError:
144 pass
145
146 def wait(self, *args, **kwargs):
147 return self.process.wait(*args, **kwargs)
148
149 class VcsSim(object):
150 logfile = tempfile.NamedTemporaryFile(prefix='simv', suffix='.log')
151 logname = logfile.name
152
153 def __init__(self, sim_cmd=None, debug=False, timeout=300):
154 if sim_cmd:
155 cmd = shlex.split(sim_cmd)
156 else:
157 cmd = ["simv"]
158 cmd += ["+jtag_vpi_enable"]
159 if debug:
160 cmd[0] = cmd[0] + "-debug"
161 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
162
163 logfile = open(self.logname, "w")
164 if print_log_names:
165 real_stdout.write("Temporary VCS log: %s\n" % self.logname)
166 logfile.write("+ %s\n" % " ".join(cmd))
167 logfile.flush()
168
169 listenfile = open(self.logname, "r")
170 listenfile.seek(0, 2)
171 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
172 stdout=logfile, stderr=logfile)
173 done = False
174 start = time.time()
175 while not done:
176 # Fail if VCS exits early
177 exit_code = self.process.poll()
178 if exit_code is not None:
179 raise RuntimeError('VCS simulator exited early with status %d'
180 % exit_code)
181
182 line = listenfile.readline()
183 if not line:
184 time.sleep(1)
185 match = re.match(r"^Listening on port (\d+)$", line)
186 if match:
187 done = True
188 self.port = int(match.group(1))
189 os.environ['JTAG_VPI_PORT'] = str(self.port)
190
191 if (time.time() - start) > timeout:
192 raise Exception("Timed out waiting for VCS to listen for JTAG "
193 "vpi")
194
195 def __del__(self):
196 try:
197 self.process.kill()
198 self.process.wait()
199 except OSError:
200 pass
201
202 class Openocd(object):
203 logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')
204 logname = logfile.name
205
206 def __init__(self, server_cmd=None, config=None, debug=False, timeout=60):
207 self.timeout = timeout
208
209 if server_cmd:
210 cmd = shlex.split(server_cmd)
211 else:
212 openocd = os.path.expandvars("$RISCV/bin/openocd")
213 cmd = [openocd]
214 if debug:
215 cmd.append("-d")
216
217 # This command needs to come before any config scripts on the command
218 # line, since they are executed in order.
219 cmd += [
220 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
221 "--command",
222 "gdb_port 0",
223 # Disable tcl and telnet servers, since they are unused and because
224 # the port numbers will conflict if multiple OpenOCD processes are
225 # running on the same server.
226 "--command",
227 "tcl_port disabled",
228 "--command",
229 "telnet_port disabled",
230 ]
231
232 if config:
233 f = find_file(config)
234 if f is None:
235 print "Unable to read file " + config
236 exit(1)
237
238 cmd += ["-f", f]
239 if debug:
240 cmd.append("-d")
241
242 logfile = open(Openocd.logname, "w")
243 if print_log_names:
244 real_stdout.write("Temporary OpenOCD log: %s\n" % Openocd.logname)
245 env_entries = ("REMOTE_BITBANG_HOST", "REMOTE_BITBANG_PORT")
246 env_entries = [key for key in env_entries if key in os.environ]
247 logfile.write("+ %s%s\n" % (
248 "".join("%s=%s " % (key, os.environ[key]) for key in env_entries),
249 " ".join(map(pipes.quote, cmd))))
250 logfile.flush()
251
252 self.gdb_ports = []
253 self.process = self.start(cmd, logfile)
254
255 def start(self, cmd, logfile):
256 process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
257 stdout=logfile, stderr=logfile)
258
259 try:
260 # Wait for OpenOCD to have made it through riscv_examine(). When
261 # using OpenOCD to communicate with a simulator this may take a
262 # long time, and gdb will time out when trying to connect if we
263 # attempt too early.
264 start = time.time()
265 messaged = False
266 fd = open(Openocd.logname, "r")
267 while True:
268 line = fd.readline()
269 if not line:
270 if not process.poll() is None:
271 raise Exception("OpenOCD exited early.")
272 time.sleep(0.1)
273 continue
274
275 m = re.search(r"Listening on port (\d+) for gdb connections",
276 line)
277 if m:
278 self.gdb_ports.append(int(m.group(1)))
279
280 if "telnet server disabled" in line:
281 return process
282
283 if not messaged and time.time() - start > 1:
284 messaged = True
285 print "Waiting for OpenOCD to start..."
286 if (time.time() - start) > self.timeout:
287 raise Exception("Timed out waiting for OpenOCD to "
288 "listen for gdb")
289
290 except Exception:
291 print_log(Openocd.logname)
292 raise
293
294 def __del__(self):
295 try:
296 self.process.kill()
297 self.process.wait()
298 except (OSError, AttributeError):
299 pass
300
301 class OpenocdCli(object):
302 def __init__(self, port=4444):
303 self.child = pexpect.spawn(
304 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
305 self.child.expect("> ")
306
307 def command(self, cmd):
308 self.child.sendline(cmd)
309 self.child.expect(cmd)
310 self.child.expect("\n")
311 self.child.expect("> ")
312 return self.child.before.strip("\t\r\n \0")
313
314 def reg(self, reg=''):
315 output = self.command("reg %s" % reg)
316 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
317 values = {r: int(v, 0) for r, v in matches}
318 if reg:
319 return values[reg]
320 return values
321
322 def load_image(self, image):
323 output = self.command("load_image %s" % image)
324 if 'invalid ELF file, only 32bits files are supported' in output:
325 raise TestNotApplicable(output)
326
327 class CannotAccess(Exception):
328 def __init__(self, address):
329 Exception.__init__(self)
330 self.address = address
331
332 class CouldNotFetch(Exception):
333 def __init__(self, regname, explanation):
334 Exception.__init__(self)
335 self.regname = regname
336 self.explanation = explanation
337
338 Thread = collections.namedtuple('Thread', ('id', 'description', 'target_id',
339 'name', 'frame'))
340
341 class Gdb(object):
342 """A single gdb class which can interact with one or more gdb instances."""
343
344 # pylint: disable=too-many-public-methods
345 # pylint: disable=too-many-instance-attributes
346
347 def __init__(self, ports,
348 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb"),
349 timeout=60, binary=None):
350 assert ports
351
352 self.ports = ports
353 self.cmd = cmd
354 self.timeout = timeout
355 self.binary = binary
356
357 self.stack = []
358 self.harts = {}
359
360 self.logfiles = []
361 self.children = []
362 for port in ports:
363 logfile = tempfile.NamedTemporaryFile(prefix="gdb@%d-" % port,
364 suffix=".log")
365 self.logfiles.append(logfile)
366 if print_log_names:
367 real_stdout.write("Temporary gdb log: %s\n" % logfile.name)
368 child = pexpect.spawn(cmd)
369 child.logfile = logfile
370 child.logfile.write("+ %s\n" % cmd)
371 self.children.append(child)
372 self.active_child = self.children[0]
373
374 def connect(self):
375 for port, child in zip(self.ports, self.children):
376 self.select_child(child)
377 self.wait()
378 self.command("set confirm off")
379 self.command("set width 0")
380 self.command("set height 0")
381 # Force consistency.
382 self.command("set print entry-values no")
383 self.command("set remotetimeout %d" % self.timeout)
384 self.command("target extended-remote localhost:%d" % port)
385 if self.binary:
386 self.command("file %s" % self.binary)
387 threads = self.threads()
388 for t in threads:
389 hartid = None
390 if t.name:
391 m = re.search(r"Hart (\d+)", t.name)
392 if m:
393 hartid = int(m.group(1))
394 if hartid is None:
395 if self.harts:
396 hartid = max(self.harts) + 1
397 else:
398 hartid = 0
399 # solo: True iff this is the only thread on this child
400 self.harts[hartid] = {'child': child,
401 'thread': t,
402 'solo': len(threads) == 1}
403
404 def __del__(self):
405 for child in self.children:
406 del child
407
408 def one_hart_per_gdb(self):
409 return all(h['solo'] for h in self.harts.itervalues())
410
411 def lognames(self):
412 return [logfile.name for logfile in self.logfiles]
413
414 def select_child(self, child):
415 self.active_child = child
416
417 def select_hart(self, hart):
418 h = self.harts[hart.id]
419 self.select_child(h['child'])
420 if not h['solo']:
421 output = self.command("thread %s" % h['thread'].id, timeout=10)
422 assert "Unknown" not in output
423
424 def push_state(self):
425 self.stack.append({
426 'active_child': self.active_child
427 })
428
429 def pop_state(self):
430 state = self.stack.pop()
431 self.active_child = state['active_child']
432
433 def wait(self):
434 """Wait for prompt."""
435 self.active_child.expect(r"\(gdb\)")
436
437 def command(self, command, timeout=6000):
438 """timeout is in seconds"""
439 self.active_child.sendline(command)
440 self.active_child.expect("\n", timeout=timeout)
441 self.active_child.expect(r"\(gdb\)", timeout=timeout)
442 return self.active_child.before.strip()
443
444 def global_command(self, command):
445 """Execute this command on every gdb that we control."""
446 with PrivateState(self):
447 for child in self.children:
448 self.select_child(child)
449 self.command(command)
450
451 def c(self, wait=True, timeout=-1, async=False):
452 """
453 Dumb c command.
454 In RTOS mode, gdb will resume all harts.
455 In multi-gdb mode, this command will just go to the current gdb, so
456 will only resume one hart.
457 """
458 if async:
459 async = "&"
460 else:
461 async = ""
462 if wait:
463 output = self.command("c%s" % async, timeout=timeout)
464 assert "Continuing" in output
465 return output
466 else:
467 self.active_child.sendline("c%s" % async)
468 self.active_child.expect("Continuing")
469
470 def c_all(self):
471 """
472 Resume every hart.
473
474 This function works fine when using multiple gdb sessions, but the
475 caller must be careful when using it nonetheless. gdb's behavior is to
476 not set breakpoints until just before the hart is resumed, and then
477 clears them as soon as the hart halts. That means that you can't set
478 one software breakpoint, and expect multiple harts to hit it. It's
479 possible that the first hart completes set/run/halt/clear before the
480 second hart even gets to resume, so it will never hit the breakpoint.
481 """
482 with PrivateState(self):
483 for child in self.children:
484 child.sendline("c")
485 child.expect("Continuing")
486
487 # Now wait for them all to halt
488 for child in self.children:
489 child.expect(r"\(gdb\)")
490
491 def interrupt(self):
492 self.active_child.send("\003")
493 self.active_child.expect(r"\(gdb\)", timeout=6000)
494 return self.active_child.before.strip()
495
496 def x(self, address, size='w'):
497 output = self.command("x/%s %s" % (size, address))
498 value = int(output.split(':')[1].strip(), 0)
499 return value
500
501 def p_raw(self, obj):
502 output = self.command("p %s" % obj)
503 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
504 if m:
505 raise CannotAccess(int(m.group(1), 0))
506 return output.split('=')[-1].strip()
507
508 def parse_string(self, text):
509 text = text.strip()
510 if text.startswith("{") and text.endswith("}"):
511 inner = text[1:-1]
512 return [self.parse_string(t) for t in inner.split(", ")]
513 elif text.startswith('"') and text.endswith('"'):
514 return text[1:-1]
515 else:
516 return int(text, 0)
517
518 def p(self, obj, fmt="/x"):
519 output = self.command("p%s %s" % (fmt, obj))
520 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
521 if m:
522 raise CannotAccess(int(m.group(1), 0))
523 m = re.search(r"Could not fetch register \"(\w+)\"; (.*)$", output)
524 if m:
525 raise CouldNotFetch(m.group(1), m.group(2))
526 rhs = output.split('=')[-1]
527 return self.parse_string(rhs)
528
529 def p_string(self, obj):
530 output = self.command("p %s" % obj)
531 value = shlex.split(output.split('=')[-1].strip())[1]
532 return value
533
534 def stepi(self):
535 output = self.command("stepi", timeout=60)
536 return output
537
538 def load(self):
539 output = self.command("load", timeout=6000)
540 assert "failed" not in output
541 assert "Transfer rate" in output
542
543 def b(self, location):
544 output = self.command("b %s" % location)
545 assert "not defined" not in output
546 assert "Breakpoint" in output
547 return output
548
549 def hbreak(self, location):
550 output = self.command("hbreak %s" % location)
551 assert "not defined" not in output
552 assert "Hardware assisted breakpoint" in output
553 return output
554
555 def threads(self):
556 output = self.command("info threads")
557 threads = []
558 for line in output.splitlines():
559 m = re.match(
560 r"[\s\*]*(\d+)\s*"
561 r"(Remote target|Thread (\d+)\s*\(Name: ([^\)]+))"
562 r"\s*(.*)", line)
563 if m:
564 threads.append(Thread(*m.groups()))
565 assert threads
566 #>>>if not threads:
567 #>>> threads.append(Thread('1', '1', 'Default', '???'))
568 return threads
569
570 def thread(self, thread):
571 return self.command("thread %s" % thread.id)
572
573 def where(self):
574 return self.command("where 1")
575
576 class PrivateState(object):
577 def __init__(self, gdb):
578 self.gdb = gdb
579
580 def __enter__(self):
581 self.gdb.push_state()
582
583 def __exit__(self, _type, _value, _traceback):
584 self.gdb.pop_state()
585
586 def run_all_tests(module, target, parsed):
587 if not os.path.exists(parsed.logs):
588 os.makedirs(parsed.logs)
589
590 overall_start = time.time()
591
592 global gdb_cmd # pylint: disable=global-statement
593 gdb_cmd = parsed.gdb
594
595 todo = []
596 examine_added = False
597 for hart in target.harts:
598 if parsed.misaval:
599 hart.misa = int(parsed.misaval, 16)
600 print "Using $misa from command line: 0x%x" % hart.misa
601 elif hart.misa:
602 print "Using $misa from hart definition: 0x%x" % hart.misa
603 elif not examine_added:
604 todo.append(("ExamineTarget", ExamineTarget, None))
605 examine_added = True
606
607 for name in dir(module):
608 definition = getattr(module, name)
609 if isinstance(definition, type) and hasattr(definition, 'test') and \
610 (not parsed.test or any(test in name for test in parsed.test)):
611 todo.append((name, definition, None))
612
613 results, count = run_tests(parsed, target, todo)
614
615 header("ran %d tests in %.0fs" % (count, time.time() - overall_start),
616 dash=':')
617
618 return print_results(results)
619
620 good_results = set(('pass', 'not_applicable'))
621 def run_tests(parsed, target, todo):
622 results = {}
623 count = 0
624
625 for name, definition, hart in todo:
626 log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
627 (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
628 log_fd = open(log_name, 'w')
629 print "[%s] Starting > %s" % (name, log_name)
630 instance = definition(target, hart)
631 sys.stdout.flush()
632 log_fd.write("Test: %s\n" % name)
633 log_fd.write("Target: %s\n" % type(target).__name__)
634 start = time.time()
635 global real_stdout # pylint: disable=global-statement
636 real_stdout = sys.stdout
637 sys.stdout = log_fd
638 try:
639 result = instance.run()
640 log_fd.write("Result: %s\n" % result)
641 finally:
642 sys.stdout = real_stdout
643 log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
644 log_fd.flush()
645 print "[%s] %s in %.2fs" % (name, result, time.time() - start)
646 if result not in good_results and parsed.print_failures:
647 sys.stdout.write(open(log_name).read())
648 sys.stdout.flush()
649 results.setdefault(result, []).append((name, log_name))
650 count += 1
651 if result not in good_results and parsed.fail_fast:
652 break
653
654 return results, count
655
656 def print_results(results):
657 result = 0
658 for key, value in results.iteritems():
659 print "%d tests returned %s" % (len(value), key)
660 if key not in good_results:
661 result = 1
662 for name, log_name in value:
663 print " %s > %s" % (name, log_name)
664
665 return result
666
667 def add_test_run_options(parser):
668 parser.add_argument("--logs", default="logs",
669 help="Store logs in the specified directory.")
670 parser.add_argument("--fail-fast", "-f", action="store_true",
671 help="Exit as soon as any test fails.")
672 parser.add_argument("--print-failures", action="store_true",
673 help="When a test fails, print the log file to stdout.")
674 parser.add_argument("--print-log-names", "--pln", action="store_true",
675 help="Print names of temporary log files as soon as they are "
676 "created.")
677 parser.add_argument("test", nargs='*',
678 help="Run only tests that are named here.")
679 parser.add_argument("--gdb",
680 help="The command to use to start gdb.")
681 parser.add_argument("--misaval",
682 help="Don't run ExamineTarget, just assume the misa value which is "
683 "specified.")
684
685 def header(title, dash='-', length=78):
686 if title:
687 dashes = dash * (length - 4 - len(title))
688 before = dashes[:len(dashes)/2]
689 after = dashes[len(dashes)/2:]
690 print "%s[ %s ]%s" % (before, title, after)
691 else:
692 print dash * length
693
694 def print_log(path):
695 header(path)
696 for l in open(path, "r"):
697 sys.stdout.write(l)
698 print
699
700 class BaseTest(object):
701 compiled = {}
702
703 def __init__(self, target, hart=None):
704 self.target = target
705 if hart:
706 self.hart = hart
707 else:
708 self.hart = random.choice(target.harts)
709 self.hart = target.harts[-1] #<<<
710 self.server = None
711 self.target_process = None
712 self.binary = None
713 self.start = 0
714 self.logs = []
715
716 def early_applicable(self):
717 """Return a false value if the test has determined it cannot run
718 without ever needing to talk to the target or server."""
719 # pylint: disable=no-self-use
720 return True
721
722 def setup(self):
723 pass
724
725 def compile(self):
726 compile_args = getattr(self, 'compile_args', None)
727 if compile_args:
728 if compile_args not in BaseTest.compiled:
729 BaseTest.compiled[compile_args] = \
730 self.target.compile(self.hart, *compile_args)
731 self.binary = BaseTest.compiled.get(compile_args)
732
733 def classSetup(self):
734 self.compile()
735 self.target_process = self.target.create()
736 if self.target_process:
737 self.logs.append(self.target_process.logname)
738 try:
739 self.server = self.target.server()
740 self.logs.append(self.server.logname)
741 except Exception:
742 for log in self.logs:
743 print_log(log)
744 raise
745
746 def classTeardown(self):
747 del self.server
748 del self.target_process
749
750 def postMortem(self):
751 pass
752
753 def run(self):
754 """
755 If compile_args is set, compile a program and set self.binary.
756
757 Call setup().
758
759 Then call test() and return the result, displaying relevant information
760 if an exception is raised.
761 """
762
763 sys.stdout.flush()
764
765 if not self.early_applicable():
766 return "not_applicable"
767
768 self.start = time.time()
769
770 try:
771 self.classSetup()
772 self.setup()
773 result = self.test() # pylint: disable=no-member
774 except TestNotApplicable:
775 result = "not_applicable"
776 except Exception as e: # pylint: disable=broad-except
777 if isinstance(e, TestFailed):
778 result = "fail"
779 else:
780 result = "exception"
781 if isinstance(e, TestFailed):
782 header("Message")
783 print e.message
784 header("Traceback")
785 traceback.print_exc(file=sys.stdout)
786 try:
787 self.postMortem()
788 except Exception as e: # pylint: disable=broad-except
789 header("postMortem Exception")
790 print e
791 traceback.print_exc(file=sys.stdout)
792 return result
793
794 finally:
795 for log in self.logs:
796 print_log(log)
797 header("End of logs")
798 self.classTeardown()
799
800 if not result:
801 result = 'pass'
802 return result
803
804 gdb_cmd = None
805 class GdbTest(BaseTest):
806 def __init__(self, target, hart=None):
807 BaseTest.__init__(self, target, hart=hart)
808 self.gdb = None
809
810 def classSetup(self):
811 BaseTest.classSetup(self)
812
813 if gdb_cmd:
814 self.gdb = Gdb(self.server.gdb_ports, gdb_cmd,
815 timeout=self.target.timeout_sec, binary=self.binary)
816 else:
817 self.gdb = Gdb(self.server.gdb_ports,
818 timeout=self.target.timeout_sec, binary=self.binary)
819
820 self.logs += self.gdb.lognames()
821 self.gdb.connect()
822
823 self.gdb.global_command("set remotetimeout %d" %
824 self.target.timeout_sec)
825
826 for cmd in self.target.gdb_setup:
827 self.gdb.command(cmd)
828
829 self.gdb.select_hart(self.hart)
830
831 # FIXME: OpenOCD doesn't handle PRIV now
832 #self.gdb.p("$priv=3")
833
834 def postMortem(self):
835 if not self.gdb:
836 return
837 self.gdb.interrupt()
838 self.gdb.command("disassemble")
839 self.gdb.command("info registers all", timeout=10)
840
841 def classTeardown(self):
842 del self.gdb
843 BaseTest.classTeardown(self)
844
845 class GdbSingleHartTest(GdbTest):
846 def classSetup(self):
847 GdbTest.classSetup(self)
848
849 for hart in self.target.harts:
850 # Park all harts that we're not using in a safe place.
851 if hart != self.hart:
852 self.gdb.select_hart(hart)
853 self.gdb.p("$pc=loop_forever")
854 self.gdb.select_hart(self.hart)
855
856 class ExamineTarget(GdbTest):
857 def test(self):
858 for hart in self.target.harts:
859 self.gdb.select_hart(hart)
860
861 hart.misa = self.gdb.p("$misa")
862
863 txt = "RV"
864 misa_xlen = 0
865 if ((hart.misa & 0xFFFFFFFF) >> 30) == 1:
866 misa_xlen = 32
867 elif ((hart.misa & 0xFFFFFFFFFFFFFFFF) >> 62) == 2:
868 misa_xlen = 64
869 elif ((hart.misa & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) >> 126) == 3:
870 misa_xlen = 128
871 else:
872 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
873 self.hart.misa)
874
875 if misa_xlen != hart.xlen:
876 raise TestFailed("MISA reported XLEN of %d but we were "\
877 "expecting XLEN of %d\n" % (misa_xlen, hart.xlen))
878
879 txt += ("%d" % misa_xlen)
880
881 for i in range(26):
882 if hart.misa & (1<<i):
883 txt += chr(i + ord('A'))
884 print txt,
885
886 class TestFailed(Exception):
887 def __init__(self, message):
888 Exception.__init__(self)
889 self.message = message
890
891 class TestNotApplicable(Exception):
892 def __init__(self, message):
893 Exception.__init__(self)
894 self.message = message
895
896 def assertEqual(a, b):
897 if a != b:
898 raise TestFailed("%r != %r" % (a, b))
899
900 def assertNotEqual(a, b):
901 if a == b:
902 raise TestFailed("%r == %r" % (a, b))
903
904 def assertIn(a, b):
905 if a not in b:
906 raise TestFailed("%r not in %r" % (a, b))
907
908 def assertNotIn(a, b):
909 if a in b:
910 raise TestFailed("%r in %r" % (a, b))
911
912 def assertGreater(a, b):
913 if not a > b:
914 raise TestFailed("%r not greater than %r" % (a, b))
915
916 def assertLess(a, b):
917 if not a < b:
918 raise TestFailed("%r not less than %r" % (a, b))
919
920 def assertTrue(a):
921 if not a:
922 raise TestFailed("%r is not True" % a)
923
924 def assertRegexpMatches(text, regexp):
925 if not re.search(regexp, text):
926 raise TestFailed("can't find %r in %r" % (regexp, text))