Test debug authentication.
[riscv-tests.git] / debug / testlib.py
1 import collections
2 import os
3 import os.path
4 import random
5 import re
6 import shlex
7 import subprocess
8 import sys
9 import tempfile
10 import time
11 import traceback
12 import pipes
13
14 import pexpect
15
16 print_log_names = False
17 real_stdout = sys.stdout
18
19 # Note that gdb comes with its own testsuite. I was unable to figure out how to
20 # run that testsuite against the spike simulator.
21
22 def find_file(path):
23 for directory in (os.getcwd(), os.path.dirname(__file__)):
24 fullpath = os.path.join(directory, path)
25 relpath = os.path.relpath(fullpath)
26 if len(relpath) >= len(fullpath):
27 relpath = fullpath
28 if os.path.exists(relpath):
29 return relpath
30 return None
31
32 def compile(args, xlen=32): # pylint: disable=redefined-builtin
33 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
34 cmd = [cc, "-g"]
35 if xlen == 32:
36 cmd.append("-march=rv32imac")
37 cmd.append("-mabi=ilp32")
38 else:
39 cmd.append("-march=rv64imac")
40 cmd.append("-mabi=lp64")
41 for arg in args:
42 found = find_file(arg)
43 if found:
44 cmd.append(found)
45 else:
46 cmd.append(arg)
47 header("Compile")
48 print "+", " ".join(cmd)
49 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
50 stderr=subprocess.PIPE)
51 stdout, stderr = process.communicate()
52 if process.returncode:
53 print stdout,
54 print stderr,
55 header("")
56 raise Exception("Compile failed!")
57
58 class Spike(object):
59 def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True,
60 isa=None, progbufsize=None):
61 """Launch spike. Return tuple of its process and the port it's running
62 on."""
63 self.process = None
64 self.isa = isa
65 self.progbufsize = progbufsize
66
67 if target.harts:
68 harts = target.harts
69 else:
70 harts = [target]
71
72 cmd = self.command(target, harts, halted, timeout, with_jtag_gdb)
73 self.infinite_loop = target.compile(harts[0],
74 "programs/checksum.c", "programs/tiny-malloc.c",
75 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
76 cmd.append(self.infinite_loop)
77 self.logfile = tempfile.NamedTemporaryFile(prefix="spike-",
78 suffix=".log")
79 self.logname = self.logfile.name
80 if print_log_names:
81 real_stdout.write("Temporary spike log: %s\n" % self.logname)
82 self.logfile.write("+ %s\n" % " ".join(cmd))
83 self.logfile.flush()
84 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
85 stdout=self.logfile, stderr=self.logfile)
86
87 if with_jtag_gdb:
88 self.port = None
89 for _ in range(30):
90 m = re.search(r"Listening for remote bitbang connection on "
91 r"port (\d+).", open(self.logname).read())
92 if m:
93 self.port = int(m.group(1))
94 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
95 break
96 time.sleep(0.11)
97 if not self.port:
98 print_log(self.logname)
99 raise Exception("Didn't get spike message about bitbang "
100 "connection")
101
102 def command(self, target, harts, halted, timeout, with_jtag_gdb):
103 # pylint: disable=no-self-use
104 if target.sim_cmd:
105 cmd = shlex.split(target.sim_cmd)
106 else:
107 spike = os.path.expandvars("$RISCV/bin/spike")
108 cmd = [spike]
109
110 cmd += ["-p%d" % len(harts)]
111
112 assert len(set(t.xlen for t in harts)) == 1, \
113 "All spike harts must have the same XLEN"
114
115 if self.isa:
116 isa = self.isa
117 else:
118 isa = "RV%dG" % harts[0].xlen
119
120 cmd += ["--isa", isa]
121 cmd += ["--debug-auth"]
122
123 if not self.progbufsize is None:
124 cmd += ["--progsize", str(self.progbufsize)]
125 cmd += ["--debug-sba", "32"]
126
127 assert len(set(t.ram for t in harts)) == 1, \
128 "All spike harts must have the same RAM layout"
129 assert len(set(t.ram_size for t in harts)) == 1, \
130 "All spike harts must have the same RAM layout"
131 cmd += ["-m0x%x:0x%x" % (harts[0].ram, harts[0].ram_size)]
132
133 if timeout:
134 cmd = ["timeout", str(timeout)] + cmd
135
136 if halted:
137 cmd.append('-H')
138 if with_jtag_gdb:
139 cmd += ['--rbb-port', '0']
140 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
141
142 return cmd
143
144 def __del__(self):
145 if self.process:
146 try:
147 self.process.kill()
148 self.process.wait()
149 except OSError:
150 pass
151
152 def wait(self, *args, **kwargs):
153 return self.process.wait(*args, **kwargs)
154
155 class VcsSim(object):
156 logfile = tempfile.NamedTemporaryFile(prefix='simv', suffix='.log')
157 logname = logfile.name
158
159 def __init__(self, sim_cmd=None, debug=False, timeout=300):
160 if sim_cmd:
161 cmd = shlex.split(sim_cmd)
162 else:
163 cmd = ["simv"]
164 cmd += ["+jtag_vpi_enable"]
165 if debug:
166 cmd[0] = cmd[0] + "-debug"
167 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
168
169 logfile = open(self.logname, "w")
170 if print_log_names:
171 real_stdout.write("Temporary VCS log: %s\n" % self.logname)
172 logfile.write("+ %s\n" % " ".join(cmd))
173 logfile.flush()
174
175 listenfile = open(self.logname, "r")
176 listenfile.seek(0, 2)
177 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
178 stdout=logfile, stderr=logfile)
179 done = False
180 start = time.time()
181 while not done:
182 # Fail if VCS exits early
183 exit_code = self.process.poll()
184 if exit_code is not None:
185 raise RuntimeError('VCS simulator exited early with status %d'
186 % exit_code)
187
188 line = listenfile.readline()
189 if not line:
190 time.sleep(1)
191 match = re.match(r"^Listening on port (\d+)$", line)
192 if match:
193 done = True
194 self.port = int(match.group(1))
195 os.environ['JTAG_VPI_PORT'] = str(self.port)
196
197 if (time.time() - start) > timeout:
198 raise Exception("Timed out waiting for VCS to listen for JTAG "
199 "vpi")
200
201 def __del__(self):
202 try:
203 self.process.kill()
204 self.process.wait()
205 except OSError:
206 pass
207
208 class Openocd(object):
209 logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')
210 logname = logfile.name
211
212 def __init__(self, server_cmd=None, config=None, debug=False, timeout=60):
213 self.timeout = timeout
214
215 if server_cmd:
216 cmd = shlex.split(server_cmd)
217 else:
218 openocd = os.path.expandvars("$RISCV/bin/openocd")
219 cmd = [openocd]
220 if debug:
221 cmd.append("-d")
222
223 # This command needs to come before any config scripts on the command
224 # line, since they are executed in order.
225 cmd += [
226 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
227 "--command",
228 "gdb_port 0",
229 # Disable tcl and telnet servers, since they are unused and because
230 # the port numbers will conflict if multiple OpenOCD processes are
231 # running on the same server.
232 "--command",
233 "tcl_port disabled",
234 "--command",
235 "telnet_port disabled",
236 ]
237
238 if config:
239 f = find_file(config)
240 if f is None:
241 print "Unable to read file " + config
242 exit(1)
243
244 cmd += ["-f", f]
245 if debug:
246 cmd.append("-d")
247
248 logfile = open(Openocd.logname, "w")
249 if print_log_names:
250 real_stdout.write("Temporary OpenOCD log: %s\n" % Openocd.logname)
251 env_entries = ("REMOTE_BITBANG_HOST", "REMOTE_BITBANG_PORT")
252 env_entries = [key for key in env_entries if key in os.environ]
253 logfile.write("+ %s%s\n" % (
254 "".join("%s=%s " % (key, os.environ[key]) for key in env_entries),
255 " ".join(map(pipes.quote, cmd))))
256 logfile.flush()
257
258 self.gdb_ports = []
259 self.process = self.start(cmd, logfile)
260
261 def start(self, cmd, logfile):
262 process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
263 stdout=logfile, stderr=logfile)
264
265 try:
266 # Wait for OpenOCD to have made it through riscv_examine(). When
267 # using OpenOCD to communicate with a simulator this may take a
268 # long time, and gdb will time out when trying to connect if we
269 # attempt too early.
270 start = time.time()
271 messaged = False
272 fd = open(Openocd.logname, "r")
273 while True:
274 line = fd.readline()
275 if not line:
276 if not process.poll() is None:
277 raise Exception("OpenOCD exited early.")
278 time.sleep(0.1)
279 continue
280
281 m = re.search(r"Listening on port (\d+) for gdb connections",
282 line)
283 if m:
284 self.gdb_ports.append(int(m.group(1)))
285
286 if "telnet server disabled" in line:
287 return process
288
289 if not messaged and time.time() - start > 1:
290 messaged = True
291 print "Waiting for OpenOCD to start..."
292 if (time.time() - start) > self.timeout:
293 raise Exception("Timed out waiting for OpenOCD to "
294 "listen for gdb")
295
296 except Exception:
297 print_log(Openocd.logname)
298 raise
299
300 def __del__(self):
301 try:
302 self.process.kill()
303 self.process.wait()
304 except (OSError, AttributeError):
305 pass
306
307 class OpenocdCli(object):
308 def __init__(self, port=4444):
309 self.child = pexpect.spawn(
310 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
311 self.child.expect("> ")
312
313 def command(self, cmd):
314 self.child.sendline(cmd)
315 self.child.expect(cmd)
316 self.child.expect("\n")
317 self.child.expect("> ")
318 return self.child.before.strip("\t\r\n \0")
319
320 def reg(self, reg=''):
321 output = self.command("reg %s" % reg)
322 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
323 values = {r: int(v, 0) for r, v in matches}
324 if reg:
325 return values[reg]
326 return values
327
328 def load_image(self, image):
329 output = self.command("load_image %s" % image)
330 if 'invalid ELF file, only 32bits files are supported' in output:
331 raise TestNotApplicable(output)
332
333 class CannotAccess(Exception):
334 def __init__(self, address):
335 Exception.__init__(self)
336 self.address = address
337
338 class CouldNotFetch(Exception):
339 def __init__(self, regname, explanation):
340 Exception.__init__(self)
341 self.regname = regname
342 self.explanation = explanation
343
344 Thread = collections.namedtuple('Thread', ('id', 'description', 'target_id',
345 'name', 'frame'))
346
347 class Gdb(object):
348 """A single gdb class which can interact with one or more gdb instances."""
349
350 # pylint: disable=too-many-public-methods
351 # pylint: disable=too-many-instance-attributes
352
353 def __init__(self, ports,
354 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb"),
355 timeout=60, binary=None):
356 assert ports
357
358 self.ports = ports
359 self.cmd = cmd
360 self.timeout = timeout
361 self.binary = binary
362
363 self.stack = []
364 self.harts = {}
365
366 self.logfiles = []
367 self.children = []
368 for port in ports:
369 logfile = tempfile.NamedTemporaryFile(prefix="gdb@%d-" % port,
370 suffix=".log")
371 self.logfiles.append(logfile)
372 if print_log_names:
373 real_stdout.write("Temporary gdb log: %s\n" % logfile.name)
374 child = pexpect.spawn(cmd)
375 child.logfile = logfile
376 child.logfile.write("+ %s\n" % cmd)
377 self.children.append(child)
378 self.active_child = self.children[0]
379
380 def connect(self):
381 for port, child in zip(self.ports, self.children):
382 self.select_child(child)
383 self.wait()
384 self.command("set confirm off")
385 self.command("set width 0")
386 self.command("set height 0")
387 # Force consistency.
388 self.command("set print entry-values no")
389 self.command("set remotetimeout %d" % self.timeout)
390 self.command("target extended-remote localhost:%d" % port)
391 if self.binary:
392 self.command("file %s" % self.binary)
393 threads = self.threads()
394 for t in threads:
395 hartid = None
396 if t.name:
397 m = re.search(r"Hart (\d+)", t.name)
398 if m:
399 hartid = int(m.group(1))
400 if hartid is None:
401 if self.harts:
402 hartid = max(self.harts) + 1
403 else:
404 hartid = 0
405 # solo: True iff this is the only thread on this child
406 self.harts[hartid] = {'child': child,
407 'thread': t,
408 'solo': len(threads) == 1}
409
410 def __del__(self):
411 for child in self.children:
412 del child
413
414 def one_hart_per_gdb(self):
415 return all(h['solo'] for h in self.harts.itervalues())
416
417 def lognames(self):
418 return [logfile.name for logfile in self.logfiles]
419
420 def select_child(self, child):
421 self.active_child = child
422
423 def select_hart(self, hart):
424 h = self.harts[hart.id]
425 self.select_child(h['child'])
426 if not h['solo']:
427 output = self.command("thread %s" % h['thread'].id, timeout=10)
428 assert "Unknown" not in output
429
430 def push_state(self):
431 self.stack.append({
432 'active_child': self.active_child
433 })
434
435 def pop_state(self):
436 state = self.stack.pop()
437 self.active_child = state['active_child']
438
439 def wait(self):
440 """Wait for prompt."""
441 self.active_child.expect(r"\(gdb\)")
442
443 def command(self, command, timeout=6000):
444 """timeout is in seconds"""
445 self.active_child.sendline(command)
446 self.active_child.expect("\n", timeout=timeout)
447 self.active_child.expect(r"\(gdb\)", timeout=timeout)
448 return self.active_child.before.strip()
449
450 def global_command(self, command):
451 """Execute this command on every gdb that we control."""
452 with PrivateState(self):
453 for child in self.children:
454 self.select_child(child)
455 self.command(command)
456
457 def c(self, wait=True, timeout=-1, async=False):
458 """
459 Dumb c command.
460 In RTOS mode, gdb will resume all harts.
461 In multi-gdb mode, this command will just go to the current gdb, so
462 will only resume one hart.
463 """
464 if async:
465 async = "&"
466 else:
467 async = ""
468 if wait:
469 output = self.command("c%s" % async, timeout=timeout)
470 assert "Continuing" in output
471 return output
472 else:
473 self.active_child.sendline("c%s" % async)
474 self.active_child.expect("Continuing")
475
476 def c_all(self):
477 """
478 Resume every hart.
479
480 This function works fine when using multiple gdb sessions, but the
481 caller must be careful when using it nonetheless. gdb's behavior is to
482 not set breakpoints until just before the hart is resumed, and then
483 clears them as soon as the hart halts. That means that you can't set
484 one software breakpoint, and expect multiple harts to hit it. It's
485 possible that the first hart completes set/run/halt/clear before the
486 second hart even gets to resume, so it will never hit the breakpoint.
487 """
488 with PrivateState(self):
489 for child in self.children:
490 child.sendline("c")
491 child.expect("Continuing")
492
493 # Now wait for them all to halt
494 for child in self.children:
495 child.expect(r"\(gdb\)")
496
497 def interrupt(self):
498 self.active_child.send("\003")
499 self.active_child.expect(r"\(gdb\)", timeout=6000)
500 return self.active_child.before.strip()
501
502 def x(self, address, size='w'):
503 output = self.command("x/%s %s" % (size, address))
504 value = int(output.split(':')[1].strip(), 0)
505 return value
506
507 def p_raw(self, obj):
508 output = self.command("p %s" % obj)
509 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
510 if m:
511 raise CannotAccess(int(m.group(1), 0))
512 return output.split('=')[-1].strip()
513
514 def parse_string(self, text):
515 text = text.strip()
516 if text.startswith("{") and text.endswith("}"):
517 inner = text[1:-1]
518 return [self.parse_string(t) for t in inner.split(", ")]
519 elif text.startswith('"') and text.endswith('"'):
520 return text[1:-1]
521 else:
522 return int(text, 0)
523
524 def p(self, obj, fmt="/x"):
525 output = self.command("p%s %s" % (fmt, obj))
526 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
527 if m:
528 raise CannotAccess(int(m.group(1), 0))
529 m = re.search(r"Could not fetch register \"(\w+)\"; (.*)$", output)
530 if m:
531 raise CouldNotFetch(m.group(1), m.group(2))
532 rhs = output.split('=')[-1]
533 return self.parse_string(rhs)
534
535 def p_string(self, obj):
536 output = self.command("p %s" % obj)
537 value = shlex.split(output.split('=')[-1].strip())[1]
538 return value
539
540 def stepi(self):
541 output = self.command("stepi", timeout=60)
542 return output
543
544 def load(self):
545 output = self.command("load", timeout=6000)
546 assert "failed" not in output
547 assert "Transfer rate" in output
548
549 def b(self, location):
550 output = self.command("b %s" % location)
551 assert "not defined" not in output
552 assert "Breakpoint" in output
553 return output
554
555 def hbreak(self, location):
556 output = self.command("hbreak %s" % location)
557 assert "not defined" not in output
558 assert "Hardware assisted breakpoint" in output
559 return output
560
561 def threads(self):
562 output = self.command("info threads")
563 threads = []
564 for line in output.splitlines():
565 m = re.match(
566 r"[\s\*]*(\d+)\s*"
567 r"(Remote target|Thread (\d+)\s*\(Name: ([^\)]+))"
568 r"\s*(.*)", line)
569 if m:
570 threads.append(Thread(*m.groups()))
571 assert threads
572 #>>>if not threads:
573 #>>> threads.append(Thread('1', '1', 'Default', '???'))
574 return threads
575
576 def thread(self, thread):
577 return self.command("thread %s" % thread.id)
578
579 def where(self):
580 return self.command("where 1")
581
582 class PrivateState(object):
583 def __init__(self, gdb):
584 self.gdb = gdb
585
586 def __enter__(self):
587 self.gdb.push_state()
588
589 def __exit__(self, _type, _value, _traceback):
590 self.gdb.pop_state()
591
592 def run_all_tests(module, target, parsed):
593 if not os.path.exists(parsed.logs):
594 os.makedirs(parsed.logs)
595
596 overall_start = time.time()
597
598 global gdb_cmd # pylint: disable=global-statement
599 gdb_cmd = parsed.gdb
600
601 todo = []
602 examine_added = False
603 for hart in target.harts:
604 if parsed.misaval:
605 hart.misa = int(parsed.misaval, 16)
606 print "Using $misa from command line: 0x%x" % hart.misa
607 elif hart.misa:
608 print "Using $misa from hart definition: 0x%x" % hart.misa
609 elif not examine_added:
610 todo.append(("ExamineTarget", ExamineTarget, None))
611 examine_added = True
612
613 for name in dir(module):
614 definition = getattr(module, name)
615 if isinstance(definition, type) and hasattr(definition, 'test') and \
616 (not parsed.test or any(test in name for test in parsed.test)):
617 todo.append((name, definition, None))
618
619 results, count = run_tests(parsed, target, todo)
620
621 header("ran %d tests in %.0fs" % (count, time.time() - overall_start),
622 dash=':')
623
624 return print_results(results)
625
626 good_results = set(('pass', 'not_applicable'))
627 def run_tests(parsed, target, todo):
628 results = {}
629 count = 0
630
631 for name, definition, hart in todo:
632 log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
633 (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
634 log_fd = open(log_name, 'w')
635 print "[%s] Starting > %s" % (name, log_name)
636 instance = definition(target, hart)
637 sys.stdout.flush()
638 log_fd.write("Test: %s\n" % name)
639 log_fd.write("Target: %s\n" % type(target).__name__)
640 start = time.time()
641 global real_stdout # pylint: disable=global-statement
642 real_stdout = sys.stdout
643 sys.stdout = log_fd
644 try:
645 result = instance.run()
646 log_fd.write("Result: %s\n" % result)
647 log_fd.write("Logfile: %s\n" % log_name)
648 finally:
649 sys.stdout = real_stdout
650 log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
651 log_fd.flush()
652 print "[%s] %s in %.2fs" % (name, result, time.time() - start)
653 if result not in good_results and parsed.print_failures:
654 sys.stdout.write(open(log_name).read())
655 sys.stdout.flush()
656 results.setdefault(result, []).append((name, log_name))
657 count += 1
658 if result not in good_results and parsed.fail_fast:
659 break
660
661 return results, count
662
663 def print_results(results):
664 result = 0
665 for key, value in results.iteritems():
666 print "%d tests returned %s" % (len(value), key)
667 if key not in good_results:
668 result = 1
669 for name, log_name in value:
670 print " %s > %s" % (name, log_name)
671
672 return result
673
674 def add_test_run_options(parser):
675 parser.add_argument("--logs", default="logs",
676 help="Store logs in the specified directory.")
677 parser.add_argument("--fail-fast", "-f", action="store_true",
678 help="Exit as soon as any test fails.")
679 parser.add_argument("--print-failures", action="store_true",
680 help="When a test fails, print the log file to stdout.")
681 parser.add_argument("--print-log-names", "--pln", action="store_true",
682 help="Print names of temporary log files as soon as they are "
683 "created.")
684 parser.add_argument("test", nargs='*',
685 help="Run only tests that are named here.")
686 parser.add_argument("--gdb",
687 help="The command to use to start gdb.")
688 parser.add_argument("--misaval",
689 help="Don't run ExamineTarget, just assume the misa value which is "
690 "specified.")
691
692 def header(title, dash='-', length=78):
693 if title:
694 dashes = dash * (length - 4 - len(title))
695 before = dashes[:len(dashes)/2]
696 after = dashes[len(dashes)/2:]
697 print "%s[ %s ]%s" % (before, title, after)
698 else:
699 print dash * length
700
701 def print_log(path):
702 header(path)
703 for l in open(path, "r"):
704 sys.stdout.write(l)
705 print
706
707 class BaseTest(object):
708 compiled = {}
709
710 def __init__(self, target, hart=None):
711 self.target = target
712 if hart:
713 self.hart = hart
714 else:
715 self.hart = random.choice(target.harts)
716 self.hart = target.harts[-1] #<<<
717 self.server = None
718 self.target_process = None
719 self.binary = None
720 self.start = 0
721 self.logs = []
722
723 def early_applicable(self):
724 """Return a false value if the test has determined it cannot run
725 without ever needing to talk to the target or server."""
726 # pylint: disable=no-self-use
727 return True
728
729 def setup(self):
730 pass
731
732 def compile(self):
733 compile_args = getattr(self, 'compile_args', None)
734 if compile_args:
735 if compile_args not in BaseTest.compiled:
736 BaseTest.compiled[compile_args] = \
737 self.target.compile(self.hart, *compile_args)
738 self.binary = BaseTest.compiled.get(compile_args)
739
740 def classSetup(self):
741 self.compile()
742 self.target_process = self.target.create()
743 if self.target_process:
744 self.logs.append(self.target_process.logname)
745 try:
746 self.server = self.target.server()
747 self.logs.append(self.server.logname)
748 except Exception:
749 for log in self.logs:
750 print_log(log)
751 raise
752
753 def classTeardown(self):
754 del self.server
755 del self.target_process
756
757 def postMortem(self):
758 pass
759
760 def run(self):
761 """
762 If compile_args is set, compile a program and set self.binary.
763
764 Call setup().
765
766 Then call test() and return the result, displaying relevant information
767 if an exception is raised.
768 """
769
770 sys.stdout.flush()
771
772 if not self.early_applicable():
773 return "not_applicable"
774
775 self.start = time.time()
776
777 try:
778 self.classSetup()
779 self.setup()
780 result = self.test() # pylint: disable=no-member
781 except TestNotApplicable:
782 result = "not_applicable"
783 except Exception as e: # pylint: disable=broad-except
784 if isinstance(e, TestFailed):
785 result = "fail"
786 else:
787 result = "exception"
788 if isinstance(e, TestFailed):
789 header("Message")
790 print e.message
791 header("Traceback")
792 traceback.print_exc(file=sys.stdout)
793 try:
794 self.postMortem()
795 except Exception as e: # pylint: disable=broad-except
796 header("postMortem Exception")
797 print e
798 traceback.print_exc(file=sys.stdout)
799 return result
800
801 finally:
802 for log in self.logs:
803 print_log(log)
804 header("End of logs")
805 self.classTeardown()
806
807 if not result:
808 result = 'pass'
809 return result
810
811 gdb_cmd = None
812 class GdbTest(BaseTest):
813 def __init__(self, target, hart=None):
814 BaseTest.__init__(self, target, hart=hart)
815 self.gdb = None
816
817 def classSetup(self):
818 BaseTest.classSetup(self)
819
820 if gdb_cmd:
821 self.gdb = Gdb(self.server.gdb_ports, gdb_cmd,
822 timeout=self.target.timeout_sec, binary=self.binary)
823 else:
824 self.gdb = Gdb(self.server.gdb_ports,
825 timeout=self.target.timeout_sec, binary=self.binary)
826
827 self.logs += self.gdb.lognames()
828 self.gdb.connect()
829
830 self.gdb.global_command("set remotetimeout %d" %
831 self.target.timeout_sec)
832
833 for cmd in self.target.gdb_setup:
834 self.gdb.command(cmd)
835
836 self.gdb.select_hart(self.hart)
837
838 # FIXME: OpenOCD doesn't handle PRIV now
839 #self.gdb.p("$priv=3")
840
841 def postMortem(self):
842 if not self.gdb:
843 return
844 self.gdb.interrupt()
845 self.gdb.command("disassemble")
846 self.gdb.command("info registers all", timeout=10)
847
848 def classTeardown(self):
849 del self.gdb
850 BaseTest.classTeardown(self)
851
852 class GdbSingleHartTest(GdbTest):
853 def classSetup(self):
854 GdbTest.classSetup(self)
855
856 for hart in self.target.harts:
857 # Park all harts that we're not using in a safe place.
858 if hart != self.hart:
859 self.gdb.select_hart(hart)
860 self.gdb.p("$pc=loop_forever")
861 self.gdb.select_hart(self.hart)
862
863 class ExamineTarget(GdbTest):
864 def test(self):
865 for hart in self.target.harts:
866 self.gdb.select_hart(hart)
867
868 hart.misa = self.gdb.p("$misa")
869
870 txt = "RV"
871 misa_xlen = 0
872 if ((hart.misa & 0xFFFFFFFF) >> 30) == 1:
873 misa_xlen = 32
874 elif ((hart.misa & 0xFFFFFFFFFFFFFFFF) >> 62) == 2:
875 misa_xlen = 64
876 elif ((hart.misa & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) >> 126) == 3:
877 misa_xlen = 128
878 else:
879 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
880 self.hart.misa)
881
882 if misa_xlen != hart.xlen:
883 raise TestFailed("MISA reported XLEN of %d but we were "\
884 "expecting XLEN of %d\n" % (misa_xlen, hart.xlen))
885
886 txt += ("%d" % misa_xlen)
887
888 for i in range(26):
889 if hart.misa & (1<<i):
890 txt += chr(i + ord('A'))
891 print txt,
892
893 class TestFailed(Exception):
894 def __init__(self, message):
895 Exception.__init__(self)
896 self.message = message
897
898 class TestNotApplicable(Exception):
899 def __init__(self, message):
900 Exception.__init__(self)
901 self.message = message
902
903 def assertEqual(a, b):
904 if a != b:
905 raise TestFailed("%r != %r" % (a, b))
906
907 def assertNotEqual(a, b):
908 if a == b:
909 raise TestFailed("%r == %r" % (a, b))
910
911 def assertIn(a, b):
912 if a not in b:
913 raise TestFailed("%r not in %r" % (a, b))
914
915 def assertNotIn(a, b):
916 if a in b:
917 raise TestFailed("%r in %r" % (a, b))
918
919 def assertGreater(a, b):
920 if not a > b:
921 raise TestFailed("%r not greater than %r" % (a, b))
922
923 def assertLess(a, b):
924 if not a < b:
925 raise TestFailed("%r not less than %r" % (a, b))
926
927 def assertTrue(a):
928 if not a:
929 raise TestFailed("%r is not True" % a)
930
931 def assertRegexpMatches(text, regexp):
932 if not re.search(regexp, text):
933 raise TestFailed("can't find %r in %r" % (regexp, text))