ce8aecaafbf3369e718e93b100f4276747207224
[riscv-tests.git] / debug / testlib.py
1 import collections
2 import os
3 import os.path
4 import random
5 import re
6 import shlex
7 import subprocess
8 import sys
9 import tempfile
10 import time
11 import traceback
12 import pipes
13
14 import pexpect
15
16 print_log_names = False
17 real_stdout = sys.stdout
18
19 # Note that gdb comes with its own testsuite. I was unable to figure out how to
20 # run that testsuite against the spike simulator.
21
22 def find_file(path):
23 for directory in (os.getcwd(), os.path.dirname(__file__)):
24 fullpath = os.path.join(directory, path)
25 relpath = os.path.relpath(fullpath)
26 if len(relpath) >= len(fullpath):
27 relpath = fullpath
28 if os.path.exists(relpath):
29 return relpath
30 return None
31
32 def compile(args, xlen=32): # pylint: disable=redefined-builtin
33 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
34 cmd = [cc, "-g"]
35 if xlen == 32:
36 cmd.append("-march=rv32imac")
37 cmd.append("-mabi=ilp32")
38 else:
39 cmd.append("-march=rv64imac")
40 cmd.append("-mabi=lp64")
41 for arg in args:
42 found = find_file(arg)
43 if found:
44 cmd.append(found)
45 else:
46 cmd.append(arg)
47 header("Compile")
48 print "+", " ".join(cmd)
49 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
50 stderr=subprocess.PIPE)
51 stdout, stderr = process.communicate()
52 if process.returncode:
53 print stdout,
54 print stderr,
55 header("")
56 raise Exception("Compile failed!")
57
58 class Spike(object):
59 def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True):
60 """Launch spike. Return tuple of its process and the port it's running
61 on."""
62 self.process = None
63
64 if target.harts:
65 harts = target.harts
66 else:
67 harts = [target]
68
69 cmd = self.command(target, harts, halted, timeout, with_jtag_gdb)
70 self.infinite_loop = target.compile(harts[0],
71 "programs/checksum.c", "programs/tiny-malloc.c",
72 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")
73 cmd.append(self.infinite_loop)
74 self.logfile = tempfile.NamedTemporaryFile(prefix="spike-",
75 suffix=".log")
76 self.logname = self.logfile.name
77 if print_log_names:
78 real_stdout.write("Temporary spike log: %s\n" % self.logname)
79 self.logfile.write("+ %s\n" % " ".join(cmd))
80 self.logfile.flush()
81 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
82 stdout=self.logfile, stderr=self.logfile)
83
84 if with_jtag_gdb:
85 self.port = None
86 for _ in range(30):
87 m = re.search(r"Listening for remote bitbang connection on "
88 r"port (\d+).", open(self.logname).read())
89 if m:
90 self.port = int(m.group(1))
91 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)
92 break
93 time.sleep(0.11)
94 if not self.port:
95 print_log(self.logname)
96 raise Exception("Didn't get spike message about bitbang "
97 "connection")
98
99 def command(self, target, harts, halted, timeout, with_jtag_gdb):
100 # pylint: disable=no-self-use
101 if target.sim_cmd:
102 cmd = shlex.split(target.sim_cmd)
103 else:
104 spike = os.path.expandvars("$RISCV/bin/spike")
105 cmd = [spike]
106
107 cmd += ["-p%d" % len(harts)]
108
109 assert len(set(t.xlen for t in harts)) == 1, \
110 "All spike harts must have the same XLEN"
111
112 if harts[0].xlen == 32:
113 cmd += ["--isa", "RV32G"]
114 else:
115 cmd += ["--isa", "RV64G"]
116
117 assert len(set(t.ram for t in harts)) == 1, \
118 "All spike harts must have the same RAM layout"
119 assert len(set(t.ram_size for t in harts)) == 1, \
120 "All spike harts must have the same RAM layout"
121 cmd += ["-m0x%x:0x%x" % (harts[0].ram, harts[0].ram_size)]
122
123 if timeout:
124 cmd = ["timeout", str(timeout)] + cmd
125
126 if halted:
127 cmd.append('-H')
128 if with_jtag_gdb:
129 cmd += ['--rbb-port', '0']
130 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'
131
132 return cmd
133
134 def __del__(self):
135 if self.process:
136 try:
137 self.process.kill()
138 self.process.wait()
139 except OSError:
140 pass
141
142 def wait(self, *args, **kwargs):
143 return self.process.wait(*args, **kwargs)
144
145 class VcsSim(object):
146 logfile = tempfile.NamedTemporaryFile(prefix='simv', suffix='.log')
147 logname = logfile.name
148
149 def __init__(self, sim_cmd=None, debug=False, timeout=300):
150 if sim_cmd:
151 cmd = shlex.split(sim_cmd)
152 else:
153 cmd = ["simv"]
154 cmd += ["+jtag_vpi_enable"]
155 if debug:
156 cmd[0] = cmd[0] + "-debug"
157 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
158
159 logfile = open(self.logname, "w")
160 if print_log_names:
161 real_stdout.write("Temporary VCS log: %s\n" % self.logname)
162 logfile.write("+ %s\n" % " ".join(cmd))
163 logfile.flush()
164
165 listenfile = open(self.logname, "r")
166 listenfile.seek(0, 2)
167 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
168 stdout=logfile, stderr=logfile)
169 done = False
170 start = time.time()
171 while not done:
172 # Fail if VCS exits early
173 exit_code = self.process.poll()
174 if exit_code is not None:
175 raise RuntimeError('VCS simulator exited early with status %d'
176 % exit_code)
177
178 line = listenfile.readline()
179 if not line:
180 time.sleep(1)
181 match = re.match(r"^Listening on port (\d+)$", line)
182 if match:
183 done = True
184 self.port = int(match.group(1))
185 os.environ['JTAG_VPI_PORT'] = str(self.port)
186
187 if (time.time() - start) > timeout:
188 raise Exception("Timed out waiting for VCS to listen for JTAG "
189 "vpi")
190
191 def __del__(self):
192 try:
193 self.process.kill()
194 self.process.wait()
195 except OSError:
196 pass
197
198 class Openocd(object):
199 logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')
200 logname = logfile.name
201
202 def __init__(self, server_cmd=None, config=None, debug=False, timeout=60):
203 self.timeout = timeout
204
205 if server_cmd:
206 cmd = shlex.split(server_cmd)
207 else:
208 openocd = os.path.expandvars("$RISCV/bin/openocd")
209 cmd = [openocd]
210 if debug:
211 cmd.append("-d")
212
213 # This command needs to come before any config scripts on the command
214 # line, since they are executed in order.
215 cmd += [
216 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
217 "--command",
218 "gdb_port 0",
219 # Disable tcl and telnet servers, since they are unused and because
220 # the port numbers will conflict if multiple OpenOCD processes are
221 # running on the same server.
222 "--command",
223 "tcl_port disabled",
224 "--command",
225 "telnet_port disabled",
226 ]
227
228 if config:
229 f = find_file(config)
230 if f is None:
231 print "Unable to read file " + config
232 exit(1)
233
234 cmd += ["-f", f]
235 if debug:
236 cmd.append("-d")
237
238 logfile = open(Openocd.logname, "w")
239 if print_log_names:
240 real_stdout.write("Temporary OpenOCD log: %s\n" % Openocd.logname)
241 env_entries = ("REMOTE_BITBANG_HOST", "REMOTE_BITBANG_PORT")
242 env_entries = [key for key in env_entries if key in os.environ]
243 logfile.write("+ %s%s\n" % (
244 "".join("%s=%s " % (key, os.environ[key]) for key in env_entries),
245 " ".join(map(pipes.quote, cmd))))
246 logfile.flush()
247
248 self.gdb_ports = []
249 self.process = self.start(cmd, logfile)
250
251 def start(self, cmd, logfile):
252 process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
253 stdout=logfile, stderr=logfile)
254
255 try:
256 # Wait for OpenOCD to have made it through riscv_examine(). When
257 # using OpenOCD to communicate with a simulator this may take a
258 # long time, and gdb will time out when trying to connect if we
259 # attempt too early.
260 start = time.time()
261 messaged = False
262 fd = open(Openocd.logname, "r")
263 while True:
264 line = fd.readline()
265 if not line:
266 if not process.poll() is None:
267 raise Exception("OpenOCD exited early.")
268 time.sleep(0.1)
269 continue
270
271 m = re.search(r"Listening on port (\d+) for gdb connections",
272 line)
273 if m:
274 self.gdb_ports.append(int(m.group(1)))
275
276 if "telnet server disabled" in line:
277 return process
278
279 if not messaged and time.time() - start > 1:
280 messaged = True
281 print "Waiting for OpenOCD to start..."
282 if (time.time() - start) > self.timeout:
283 raise Exception("Timed out waiting for OpenOCD to "
284 "listen for gdb")
285
286 except Exception:
287 print_log(Openocd.logname)
288 raise
289
290 def __del__(self):
291 try:
292 self.process.kill()
293 self.process.wait()
294 except (OSError, AttributeError):
295 pass
296
297 class OpenocdCli(object):
298 def __init__(self, port=4444):
299 self.child = pexpect.spawn(
300 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
301 self.child.expect("> ")
302
303 def command(self, cmd):
304 self.child.sendline(cmd)
305 self.child.expect(cmd)
306 self.child.expect("\n")
307 self.child.expect("> ")
308 return self.child.before.strip("\t\r\n \0")
309
310 def reg(self, reg=''):
311 output = self.command("reg %s" % reg)
312 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
313 values = {r: int(v, 0) for r, v in matches}
314 if reg:
315 return values[reg]
316 return values
317
318 def load_image(self, image):
319 output = self.command("load_image %s" % image)
320 if 'invalid ELF file, only 32bits files are supported' in output:
321 raise TestNotApplicable(output)
322
323 class CannotAccess(Exception):
324 def __init__(self, address):
325 Exception.__init__(self)
326 self.address = address
327
328 Thread = collections.namedtuple('Thread', ('id', 'description', 'target_id',
329 'name', 'frame'))
330
331 class Gdb(object):
332 """A single gdb class which can interact with one or more gdb instances."""
333
334 # pylint: disable=too-many-public-methods
335 # pylint: disable=too-many-instance-attributes
336
337 def __init__(self, ports,
338 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb"),
339 timeout=60, binary=None):
340 assert ports
341
342 self.ports = ports
343 self.cmd = cmd
344 self.timeout = timeout
345 self.binary = binary
346
347 self.stack = []
348 self.harts = {}
349
350 self.logfiles = []
351 self.children = []
352 for port in ports:
353 logfile = tempfile.NamedTemporaryFile(prefix="gdb@%d-" % port,
354 suffix=".log")
355 self.logfiles.append(logfile)
356 if print_log_names:
357 real_stdout.write("Temporary gdb log: %s\n" % logfile.name)
358 child = pexpect.spawn(cmd)
359 child.logfile = logfile
360 child.logfile.write("+ %s\n" % cmd)
361 self.children.append(child)
362 self.active_child = self.children[0]
363
364 def connect(self):
365 for port, child in zip(self.ports, self.children):
366 self.select_child(child)
367 self.wait()
368 self.command("set confirm off")
369 self.command("set width 0")
370 self.command("set height 0")
371 # Force consistency.
372 self.command("set print entry-values no")
373 self.command("set remotetimeout %d" % self.timeout)
374 self.command("target extended-remote localhost:%d" % port)
375 if self.binary:
376 self.command("file %s" % self.binary)
377 threads = self.threads()
378 for t in threads:
379 hartid = None
380 if t.name:
381 m = re.search(r"Hart (\d+)", t.name)
382 if m:
383 hartid = int(m.group(1))
384 if hartid is None:
385 if self.harts:
386 hartid = max(self.harts) + 1
387 else:
388 hartid = 0
389 self.harts[hartid] = (child, t)
390
391 def __del__(self):
392 for child in self.children:
393 del child
394
395 def lognames(self):
396 return [logfile.name for logfile in self.logfiles]
397
398 def select_child(self, child):
399 self.active_child = child
400
401 def select_hart(self, hart):
402 child, thread = self.harts[hart.id]
403 self.select_child(child)
404 output = self.command("thread %s" % thread.id)
405 assert "Unknown" not in output
406
407 def push_state(self):
408 self.stack.append({
409 'active_child': self.active_child
410 })
411
412 def pop_state(self):
413 state = self.stack.pop()
414 self.active_child = state['active_child']
415
416 def wait(self):
417 """Wait for prompt."""
418 self.active_child.expect(r"\(gdb\)")
419
420 def command(self, command, timeout=6000):
421 """timeout is in seconds"""
422 self.active_child.sendline(command)
423 self.active_child.expect("\n", timeout=timeout)
424 self.active_child.expect(r"\(gdb\)", timeout=timeout)
425 return self.active_child.before.strip()
426
427 def global_command(self, command):
428 """Execute this command on every gdb that we control."""
429 with PrivateState(self):
430 for child in self.children:
431 self.select_child(child)
432 self.command(command)
433
434 def c(self, wait=True, timeout=-1, async=False):
435 """
436 Dumb c command.
437 In RTOS mode, gdb will resume all harts.
438 In multi-gdb mode, this command will just go to the current gdb, so
439 will only resume one hart.
440 """
441 if async:
442 async = "&"
443 else:
444 async = ""
445 if wait:
446 output = self.command("c%s" % async, timeout=timeout)
447 assert "Continuing" in output
448 return output
449 else:
450 self.active_child.sendline("c%s" % async)
451 self.active_child.expect("Continuing")
452
453 def c_all(self):
454 """
455 Resume every hart.
456
457 This function works fine when using multiple gdb sessions, but the
458 caller must be careful when using it nonetheless. gdb's behavior is to
459 not set breakpoints until just before the hart is resumed, and then
460 clears them as soon as the hart halts. That means that you can't set
461 one software breakpoint, and expect multiple harts to hit it. It's
462 possible that the first hart completes set/run/halt/clear before the
463 second hart even gets to resume, so it will never hit the breakpoint.
464 """
465 with PrivateState(self):
466 for child in self.children:
467 child.sendline("c")
468 child.expect("Continuing")
469
470 # Now wait for them all to halt
471 for child in self.children:
472 child.expect(r"\(gdb\)")
473
474 def interrupt(self):
475 self.active_child.send("\003")
476 self.active_child.expect(r"\(gdb\)", timeout=6000)
477 return self.active_child.before.strip()
478
479 def x(self, address, size='w'):
480 output = self.command("x/%s %s" % (size, address))
481 value = int(output.split(':')[1].strip(), 0)
482 return value
483
484 def p_raw(self, obj):
485 output = self.command("p %s" % obj)
486 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
487 if m:
488 raise CannotAccess(int(m.group(1), 0))
489 return output.split('=')[-1].strip()
490
491 def parse_string(self, text):
492 text = text.strip()
493 if text.startswith("{") and text.endswith("}"):
494 inner = text[1:-1]
495 return [self.parse_string(t) for t in inner.split(", ")]
496 elif text.startswith('"') and text.endswith('"'):
497 return text[1:-1]
498 else:
499 return int(text, 0)
500
501 def p(self, obj, fmt="/x"):
502 output = self.command("p%s %s" % (fmt, obj))
503 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
504 if m:
505 raise CannotAccess(int(m.group(1), 0))
506 rhs = output.split('=')[-1]
507 return self.parse_string(rhs)
508
509 def p_string(self, obj):
510 output = self.command("p %s" % obj)
511 value = shlex.split(output.split('=')[-1].strip())[1]
512 return value
513
514 def stepi(self):
515 output = self.command("stepi", timeout=60)
516 return output
517
518 def load(self):
519 output = self.command("load", timeout=6000)
520 assert "failed" not in output
521 assert "Transfer rate" in output
522
523 def b(self, location):
524 output = self.command("b %s" % location)
525 assert "not defined" not in output
526 assert "Breakpoint" in output
527 return output
528
529 def hbreak(self, location):
530 output = self.command("hbreak %s" % location)
531 assert "not defined" not in output
532 assert "Hardware assisted breakpoint" in output
533 return output
534
535 def threads(self):
536 output = self.command("info threads")
537 threads = []
538 for line in output.splitlines():
539 m = re.match(
540 r"[\s\*]*(\d+)\s*"
541 r"(Remote target|Thread (\d+)\s*\(Name: ([^\)]+))"
542 r"\s*(.*)", line)
543 if m:
544 threads.append(Thread(*m.groups()))
545 assert threads
546 #>>>if not threads:
547 #>>> threads.append(Thread('1', '1', 'Default', '???'))
548 return threads
549
550 def thread(self, thread):
551 return self.command("thread %s" % thread.id)
552
553 def where(self):
554 return self.command("where 1")
555
556 class PrivateState(object):
557 def __init__(self, gdb):
558 self.gdb = gdb
559
560 def __enter__(self):
561 self.gdb.push_state()
562
563 def __exit__(self, _type, _value, _traceback):
564 self.gdb.pop_state()
565
566 def run_all_tests(module, target, parsed):
567 if not os.path.exists(parsed.logs):
568 os.makedirs(parsed.logs)
569
570 overall_start = time.time()
571
572 global gdb_cmd # pylint: disable=global-statement
573 gdb_cmd = parsed.gdb
574
575 todo = []
576 examine_added = False
577 for hart in target.harts:
578 if parsed.misaval:
579 hart.misa = int(parsed.misaval, 16)
580 print "Using $misa from command line: 0x%x" % hart.misa
581 elif hart.misa:
582 print "Using $misa from hart definition: 0x%x" % hart.misa
583 elif not examine_added:
584 todo.append(("ExamineTarget", ExamineTarget, None))
585 examine_added = True
586
587 for name in dir(module):
588 definition = getattr(module, name)
589 if isinstance(definition, type) and hasattr(definition, 'test') and \
590 (not parsed.test or any(test in name for test in parsed.test)):
591 todo.append((name, definition, None))
592
593 results, count = run_tests(parsed, target, todo)
594
595 header("ran %d tests in %.0fs" % (count, time.time() - overall_start),
596 dash=':')
597
598 return print_results(results)
599
600 good_results = set(('pass', 'not_applicable'))
601 def run_tests(parsed, target, todo):
602 results = {}
603 count = 0
604
605 for name, definition, hart in todo:
606 log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %
607 (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))
608 log_fd = open(log_name, 'w')
609 print "[%s] Starting > %s" % (name, log_name)
610 instance = definition(target, hart)
611 sys.stdout.flush()
612 log_fd.write("Test: %s\n" % name)
613 log_fd.write("Target: %s\n" % type(target).__name__)
614 start = time.time()
615 global real_stdout # pylint: disable=global-statement
616 real_stdout = sys.stdout
617 sys.stdout = log_fd
618 try:
619 result = instance.run()
620 log_fd.write("Result: %s\n" % result)
621 finally:
622 sys.stdout = real_stdout
623 log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))
624 log_fd.flush()
625 print "[%s] %s in %.2fs" % (name, result, time.time() - start)
626 if result not in good_results and parsed.print_failures:
627 sys.stdout.write(open(log_name).read())
628 sys.stdout.flush()
629 results.setdefault(result, []).append((name, log_name))
630 count += 1
631 if result not in good_results and parsed.fail_fast:
632 break
633
634 return results, count
635
636 def print_results(results):
637 result = 0
638 for key, value in results.iteritems():
639 print "%d tests returned %s" % (len(value), key)
640 if key not in good_results:
641 result = 1
642 for name, log_name in value:
643 print " %s > %s" % (name, log_name)
644
645 return result
646
647 def add_test_run_options(parser):
648 parser.add_argument("--logs", default="logs",
649 help="Store logs in the specified directory.")
650 parser.add_argument("--fail-fast", "-f", action="store_true",
651 help="Exit as soon as any test fails.")
652 parser.add_argument("--print-failures", action="store_true",
653 help="When a test fails, print the log file to stdout.")
654 parser.add_argument("--print-log-names", "--pln", action="store_true",
655 help="Print names of temporary log files as soon as they are "
656 "created.")
657 parser.add_argument("test", nargs='*',
658 help="Run only tests that are named here.")
659 parser.add_argument("--gdb",
660 help="The command to use to start gdb.")
661 parser.add_argument("--misaval",
662 help="Don't run ExamineTarget, just assume the misa value which is "
663 "specified.")
664
665 def header(title, dash='-', length=78):
666 if title:
667 dashes = dash * (length - 4 - len(title))
668 before = dashes[:len(dashes)/2]
669 after = dashes[len(dashes)/2:]
670 print "%s[ %s ]%s" % (before, title, after)
671 else:
672 print dash * length
673
674 def print_log(path):
675 header(path)
676 for l in open(path, "r"):
677 sys.stdout.write(l)
678 print
679
680 class BaseTest(object):
681 compiled = {}
682
683 def __init__(self, target, hart=None):
684 self.target = target
685 if hart:
686 self.hart = hart
687 else:
688 self.hart = random.choice(target.harts)
689 self.hart = target.harts[-1] #<<<
690 self.server = None
691 self.target_process = None
692 self.binary = None
693 self.start = 0
694 self.logs = []
695
696 def early_applicable(self):
697 """Return a false value if the test has determined it cannot run
698 without ever needing to talk to the target or server."""
699 # pylint: disable=no-self-use
700 return True
701
702 def setup(self):
703 pass
704
705 def compile(self):
706 compile_args = getattr(self, 'compile_args', None)
707 if compile_args:
708 if compile_args not in BaseTest.compiled:
709 BaseTest.compiled[compile_args] = \
710 self.target.compile(self.hart, *compile_args)
711 self.binary = BaseTest.compiled.get(compile_args)
712
713 def classSetup(self):
714 self.compile()
715 self.target_process = self.target.create()
716 if self.target_process:
717 self.logs.append(self.target_process.logname)
718 try:
719 self.server = self.target.server()
720 self.logs.append(self.server.logname)
721 except Exception:
722 for log in self.logs:
723 print_log(log)
724 raise
725
726 def classTeardown(self):
727 del self.server
728 del self.target_process
729
730 def postMortem(self):
731 pass
732
733 def run(self):
734 """
735 If compile_args is set, compile a program and set self.binary.
736
737 Call setup().
738
739 Then call test() and return the result, displaying relevant information
740 if an exception is raised.
741 """
742
743 sys.stdout.flush()
744
745 if not self.early_applicable():
746 return "not_applicable"
747
748 self.start = time.time()
749
750 try:
751 self.classSetup()
752 self.setup()
753 result = self.test() # pylint: disable=no-member
754 except TestNotApplicable:
755 result = "not_applicable"
756 except Exception as e: # pylint: disable=broad-except
757 if isinstance(e, TestFailed):
758 result = "fail"
759 else:
760 result = "exception"
761 if isinstance(e, TestFailed):
762 header("Message")
763 print e.message
764 header("Traceback")
765 traceback.print_exc(file=sys.stdout)
766 try:
767 self.postMortem()
768 except Exception as e: # pylint: disable=broad-except
769 header("postMortem Exception")
770 print e
771 traceback.print_exc(file=sys.stdout)
772 return result
773
774 finally:
775 for log in self.logs:
776 print_log(log)
777 header("End of logs")
778 self.classTeardown()
779
780 if not result:
781 result = 'pass'
782 return result
783
784 gdb_cmd = None
785 class GdbTest(BaseTest):
786 def __init__(self, target, hart=None):
787 BaseTest.__init__(self, target, hart=hart)
788 self.gdb = None
789
790 def classSetup(self):
791 BaseTest.classSetup(self)
792
793 if gdb_cmd:
794 self.gdb = Gdb(self.server.gdb_ports, gdb_cmd,
795 timeout=self.target.timeout_sec, binary=self.binary)
796 else:
797 self.gdb = Gdb(self.server.gdb_ports,
798 timeout=self.target.timeout_sec, binary=self.binary)
799
800 self.logs += self.gdb.lognames()
801 self.gdb.connect()
802
803 self.gdb.global_command("set remotetimeout %d" %
804 self.target.timeout_sec)
805
806 for cmd in self.target.gdb_setup:
807 self.gdb.command(cmd)
808
809 self.gdb.select_hart(self.hart)
810
811 # FIXME: OpenOCD doesn't handle PRIV now
812 #self.gdb.p("$priv=3")
813
814 def postMortem(self):
815 if not self.gdb:
816 return
817 self.gdb.interrupt()
818 self.gdb.command("info registers all", timeout=10)
819
820 def classTeardown(self):
821 del self.gdb
822 BaseTest.classTeardown(self)
823
824 class GdbSingleHartTest(GdbTest):
825 def classSetup(self):
826 GdbTest.classSetup(self)
827
828 for hart in self.target.harts:
829 # Park all harts that we're not using in a safe place.
830 if hart != self.hart:
831 self.gdb.select_hart(hart)
832 self.gdb.p("$pc=loop_forever")
833 self.gdb.select_hart(self.hart)
834
835 class ExamineTarget(GdbTest):
836 def test(self):
837 for hart in self.target.harts:
838 self.gdb.select_hart(hart)
839
840 hart.misa = self.gdb.p("$misa")
841
842 txt = "RV"
843 misa_xlen = 0
844 if ((hart.misa & 0xFFFFFFFF) >> 30) == 1:
845 misa_xlen = 32
846 elif ((hart.misa & 0xFFFFFFFFFFFFFFFF) >> 62) == 2:
847 misa_xlen = 64
848 elif ((hart.misa & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) >> 126) == 3:
849 misa_xlen = 128
850 else:
851 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %
852 self.hart.misa)
853
854 if misa_xlen != hart.xlen:
855 raise TestFailed("MISA reported XLEN of %d but we were "\
856 "expecting XLEN of %d\n" % (misa_xlen, hart.xlen))
857
858 txt += ("%d" % misa_xlen)
859
860 for i in range(26):
861 if hart.misa & (1<<i):
862 txt += chr(i + ord('A'))
863 print txt,
864
865 class TestFailed(Exception):
866 def __init__(self, message):
867 Exception.__init__(self)
868 self.message = message
869
870 class TestNotApplicable(Exception):
871 def __init__(self, message):
872 Exception.__init__(self)
873 self.message = message
874
875 def assertEqual(a, b):
876 if a != b:
877 raise TestFailed("%r != %r" % (a, b))
878
879 def assertNotEqual(a, b):
880 if a == b:
881 raise TestFailed("%r == %r" % (a, b))
882
883 def assertIn(a, b):
884 if a not in b:
885 raise TestFailed("%r not in %r" % (a, b))
886
887 def assertNotIn(a, b):
888 if a in b:
889 raise TestFailed("%r in %r" % (a, b))
890
891 def assertGreater(a, b):
892 if not a > b:
893 raise TestFailed("%r not greater than %r" % (a, b))
894
895 def assertLess(a, b):
896 if not a < b:
897 raise TestFailed("%r not less than %r" % (a, b))
898
899 def assertTrue(a):
900 if not a:
901 raise TestFailed("%r is not True" % a)
902
903 def assertRegexpMatches(text, regexp):
904 if not re.search(regexp, text):
905 raise TestFailed("can't find %r in %r" % (regexp, text))