debug: checkpoint trying to get 64 bit programs to compile as well.
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import re
3 import shlex
4 import subprocess
5 import sys
6 import time
7 import traceback
8
9 import pexpect
10
11 # Note that gdb comes with its own testsuite. I was unable to figure out how to
12 # run that testsuite against the spike simulator.
13
14 def find_file(path):
15 for directory in (os.getcwd(), os.path.dirname(__file__)):
16 fullpath = os.path.join(directory, path)
17 if os.path.exists(fullpath):
18 return fullpath
19 return None
20
21 def compile(args, xlen=32): # pylint: disable=redefined-builtin
22 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")
23 cmd = [cc, "-g"]
24 if (xlen == 32):
25 cmd.append("-march=rv32imac")
26 cmd.append("-mabi=ilp32")
27 else:
28 cmd.append("-march=rv64imac")
29 cmd.append("-mabi=lp64")
30 for arg in args:
31 found = find_file(arg)
32 if found:
33 cmd.append(found)
34 else:
35 cmd.append(arg)
36 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
37 stderr=subprocess.PIPE)
38 stdout, stderr = process.communicate()
39 if process.returncode:
40 print
41 header("Compile failed")
42 print "+", " ".join(cmd)
43 print stdout,
44 print stderr,
45 header("")
46 raise Exception("Compile failed!")
47
48 def unused_port():
49 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
50 import socket
51 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
52 s.bind(("", 0))
53 port = s.getsockname()[1]
54 s.close()
55 return port
56
57 class Spike(object):
58 logname = "spike.log"
59
60 def __init__(self, cmd, binary=None, halted=False, with_gdb=True,
61 timeout=None, xlen=64):
62 """Launch spike. Return tuple of its process and the port it's running
63 on."""
64 if cmd:
65 cmd = shlex.split(cmd)
66 else:
67 cmd = ["spike"]
68 if xlen == 32:
69 cmd += ["--isa", "RV32"]
70
71 if timeout:
72 cmd = ["timeout", str(timeout)] + cmd
73
74 if halted:
75 cmd.append('-H')
76 if with_gdb:
77 self.port = unused_port()
78 cmd += ['--gdb-port', str(self.port)]
79 cmd.append("-m32")
80 cmd.append('pk')
81 if binary:
82 cmd.append(binary)
83 logfile = open(self.logname, "w")
84 logfile.write("+ %s\n" % " ".join(cmd))
85 logfile.flush()
86 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
87 stdout=logfile, stderr=logfile)
88
89 def __del__(self):
90 try:
91 self.process.kill()
92 self.process.wait()
93 except OSError:
94 pass
95
96 def wait(self, *args, **kwargs):
97 return self.process.wait(*args, **kwargs)
98
99 class VcsSim(object):
100 def __init__(self, simv=None, debug=False):
101 if simv:
102 cmd = shlex.split(simv)
103 else:
104 cmd = ["simv"]
105 cmd += ["+jtag_vpi_enable"]
106 if debug:
107 cmd[0] = cmd[0] + "-debug"
108 cmd += ["+vcdplusfile=output/gdbserver.vpd"]
109 logfile = open("simv.log", "w")
110 logfile.write("+ %s\n" % " ".join(cmd))
111 logfile.flush()
112 listenfile = open("simv.log", "r")
113 listenfile.seek(0, 2)
114 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
115 stdout=logfile, stderr=logfile)
116 done = False
117 while not done:
118 line = listenfile.readline()
119 if not line:
120 time.sleep(1)
121 match = re.match(r"^Listening on port (\d+)$", line)
122 if match:
123 done = True
124 self.port = int(match.group(1))
125 os.environ['JTAG_VPI_PORT'] = str(self.port)
126
127 def __del__(self):
128 try:
129 self.process.kill()
130 self.process.wait()
131 except OSError:
132 pass
133
134 class Openocd(object):
135 logname = "openocd.log"
136
137 def __init__(self, cmd=None, config=None, debug=False):
138 if cmd:
139 cmd = shlex.split(cmd)
140 else:
141 cmd = ["openocd"]
142 if config:
143 cmd += ["-f", find_file(config)]
144 if debug:
145 cmd.append("-d")
146
147 # This command needs to come before any config scripts on the command
148 # line, since they are executed in order.
149 cmd[1:1] = [
150 # Tell OpenOCD to bind gdb to an unused, ephemeral port.
151 "--command",
152 "gdb_port 0",
153 # Disable tcl and telnet servers, since they are unused and because
154 # the port numbers will conflict if multiple OpenOCD processes are
155 # running on the same server.
156 "--command",
157 "tcl_port disabled",
158 "--command",
159 "telnet_port disabled",
160 ]
161
162 logfile = open(Openocd.logname, "w")
163 logfile.write("+ %s\n" % " ".join(cmd))
164 logfile.flush()
165 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
166 stdout=logfile, stderr=logfile)
167
168 # Wait for OpenOCD to have made it through riscv_examine(). When using
169 # OpenOCD to communicate with a simulator this may take a long time,
170 # and gdb will time out when trying to connect if we attempt too early.
171 start = time.time()
172 messaged = False
173 while True:
174 log = open(Openocd.logname).read()
175 if "OK GO NOW" in log:
176 break
177 if not self.process.poll() is None:
178 raise Exception(
179 "OpenOCD exited before completing riscv_examine()")
180 if not messaged and time.time() - start > 1:
181 messaged = True
182 print "Waiting for OpenOCD to examine RISCV core..."
183
184 self.port = self._get_gdb_server_port()
185
186 def _get_gdb_server_port(self):
187 """Get port that OpenOCD's gdb server is listening on."""
188 MAX_ATTEMPTS = 50
189 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')
190 for _ in range(MAX_ATTEMPTS):
191 with open(os.devnull, 'w') as devnull:
192 try:
193 output = subprocess.check_output([
194 'lsof',
195 '-a', # Take the AND of the following selectors
196 '-p{}'.format(self.process.pid), # Filter on PID
197 '-iTCP', # Filter only TCP sockets
198 ], stderr=devnull)
199 except subprocess.CalledProcessError:
200 output = ""
201 matches = list(PORT_REGEX.finditer(output))
202 matches = [m for m in matches
203 if m.group('port') not in ('6666', '4444')]
204 if len(matches) > 1:
205 print output
206 raise Exception(
207 "OpenOCD listening on multiple ports. Cannot uniquely "
208 "identify gdb server port.")
209 elif matches:
210 [match] = matches
211 return int(match.group('port'))
212 time.sleep(1)
213 raise Exception("Timed out waiting for gdb server to obtain port.")
214
215 def __del__(self):
216 try:
217 self.process.kill()
218 self.process.wait()
219 except OSError:
220 pass
221
222 class OpenocdCli(object):
223 def __init__(self, port=4444):
224 self.child = pexpect.spawn(
225 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)
226 self.child.expect("> ")
227
228 def command(self, cmd):
229 self.child.sendline(cmd)
230 self.child.expect(cmd)
231 self.child.expect("\n")
232 self.child.expect("> ")
233 return self.child.before.strip("\t\r\n \0")
234
235 def reg(self, reg=''):
236 output = self.command("reg %s" % reg)
237 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)
238 values = {r: int(v, 0) for r, v in matches}
239 if reg:
240 return values[reg]
241 return values
242
243 def load_image(self, image):
244 output = self.command("load_image %s" % image)
245 if 'invalid ELF file, only 32bits files are supported' in output:
246 raise TestNotApplicable(output)
247
248 class CannotAccess(Exception):
249 def __init__(self, address):
250 Exception.__init__(self)
251 self.address = address
252
253 class Gdb(object):
254 def __init__(self,
255 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
256 self.child = pexpect.spawn(cmd)
257 self.child.logfile = open("gdb.log", "w")
258 self.child.logfile.write("+ %s\n" % cmd)
259 self.wait()
260 self.command("set confirm off")
261 self.command("set width 0")
262 self.command("set height 0")
263 # Force consistency.
264 self.command("set print entry-values no")
265
266 def wait(self):
267 """Wait for prompt."""
268 self.child.expect(r"\(gdb\)")
269
270 def command(self, command, timeout=6000):
271 self.child.sendline(command)
272 self.child.expect("\n", timeout=timeout)
273 self.child.expect(r"\(gdb\)", timeout=timeout)
274 return self.child.before.strip()
275
276 def c(self, wait=True):
277 if wait:
278 output = self.command("c")
279 assert "Continuing" in output
280 return output
281 else:
282 self.child.sendline("c")
283 self.child.expect("Continuing")
284
285 def interrupt(self):
286 self.child.send("\003")
287 self.child.expect(r"\(gdb\)", timeout=6000)
288 return self.child.before.strip()
289
290 def x(self, address, size='w'):
291 output = self.command("x/%s %s" % (size, address))
292 value = int(output.split(':')[1].strip(), 0)
293 return value
294
295 def p_raw(self, obj):
296 output = self.command("p %s" % obj)
297 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
298 if m:
299 raise CannotAccess(int(m.group(1), 0))
300 return output.split('=')[-1].strip()
301
302 def p(self, obj):
303 output = self.command("p/x %s" % obj)
304 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)
305 if m:
306 raise CannotAccess(int(m.group(1), 0))
307 value = int(output.split('=')[-1].strip(), 0)
308 return value
309
310 def p_string(self, obj):
311 output = self.command("p %s" % obj)
312 value = shlex.split(output.split('=')[-1].strip())[1]
313 return value
314
315 def stepi(self):
316 output = self.command("stepi")
317 return output
318
319 def load(self):
320 output = self.command("load", timeout=6000)
321 assert "failed" not in output
322 assert "Transfer rate" in output
323
324 def b(self, location):
325 output = self.command("b %s" % location)
326 assert "not defined" not in output
327 assert "Breakpoint" in output
328 return output
329
330 def hbreak(self, location):
331 output = self.command("hbreak %s" % location)
332 assert "not defined" not in output
333 assert "Hardware assisted breakpoint" in output
334 return output
335
336 def run_all_tests(module, target, parsed):
337 good_results = set(('pass', 'not_applicable'))
338
339 start = time.time()
340
341 results = {}
342 count = 0
343
344 global gdb_cmd # pylint: disable=global-statement
345 gdb_cmd = parsed.gdb
346
347 todo = [("ExamineTarget", ExamineTarget)]
348 for name in dir(module):
349 definition = getattr(module, name)
350 if type(definition) == type and hasattr(definition, 'test') and \
351 (not parsed.test or any(test in name for test in parsed.test)):
352 todo.append((name, definition))
353
354 for name, definition in todo:
355 instance = definition(target)
356 result = instance.run()
357 results.setdefault(result, []).append(name)
358 count += 1
359 if result not in good_results and parsed.fail_fast:
360 break
361
362 header("ran %d tests in %.0fs" % (count, time.time() - start), dash=':')
363
364 result = 0
365 for key, value in results.iteritems():
366 print "%d tests returned %s" % (len(value), key)
367 if key not in good_results:
368 result = 1
369 for test in value:
370 print " ", test
371
372 return result
373
374 def add_test_run_options(parser):
375 parser.add_argument("--fail-fast", "-f", action="store_true",
376 help="Exit as soon as any test fails.")
377 parser.add_argument("test", nargs='*',
378 help="Run only tests that are named here.")
379 parser.add_argument("--gdb",
380 help="The command to use to start gdb.")
381
382 def header(title, dash='-'):
383 if title:
384 dashes = dash * (36 - len(title))
385 before = dashes[:len(dashes)/2]
386 after = dashes[len(dashes)/2:]
387 print "%s[ %s ]%s" % (before, title, after)
388 else:
389 print dash * 40
390
391 def print_log(path):
392 header(path)
393 lines = open(path, "r").readlines()
394 if len(lines) > 1000:
395 for l in lines[:500]:
396 sys.stdout.write(l)
397 print "..."
398 for l in lines[-500:]:
399 sys.stdout.write(l)
400 else:
401 for l in lines:
402 sys.stdout.write(l)
403
404 class BaseTest(object):
405 compiled = {}
406
407 def __init__(self, target):
408 self.target = target
409 self.server = None
410 self.target_process = None
411 self.binary = None
412 self.start = 0
413 self.logs = []
414
415 def early_applicable(self):
416 """Return a false value if the test has determined it cannot run
417 without ever needing to talk to the target or server."""
418 # pylint: disable=no-self-use
419 return True
420
421 def setup(self):
422 pass
423
424 def compile(self):
425 compile_args = getattr(self, 'compile_args', None)
426 if compile_args:
427 if compile_args not in BaseTest.compiled:
428 # pylint: disable=star-args
429 BaseTest.compiled[compile_args] = \
430 self.target.compile(*compile_args)
431 self.binary = BaseTest.compiled.get(compile_args)
432
433 def classSetup(self):
434 self.compile()
435 self.target_process = self.target.target()
436 self.server = self.target.server()
437 self.logs.append(self.server.logname)
438
439 def classTeardown(self):
440 del self.server
441 del self.target_process
442
443 def run(self):
444 """
445 If compile_args is set, compile a program and set self.binary.
446
447 Call setup().
448
449 Then call test() and return the result, displaying relevant information
450 if an exception is raised.
451 """
452
453 print "Running", type(self).__name__, "...",
454 sys.stdout.flush()
455
456 if not self.early_applicable():
457 print "not_applicable"
458 return "not_applicable"
459
460 self.start = time.time()
461
462 self.classSetup()
463
464 try:
465 self.setup()
466 result = self.test() # pylint: disable=no-member
467 except TestNotApplicable:
468 result = "not_applicable"
469 except Exception as e: # pylint: disable=broad-except
470 if isinstance(e, TestFailed):
471 result = "fail"
472 else:
473 result = "exception"
474 print "%s in %.2fs" % (result, time.time() - self.start)
475 print "=" * 40
476 if isinstance(e, TestFailed):
477 header("Message")
478 print e.message
479 header("Traceback")
480 traceback.print_exc(file=sys.stdout)
481 for log in self.logs:
482 print_log(log)
483 print "/" * 40
484 return result
485
486 finally:
487 self.classTeardown()
488
489 if not result:
490 result = 'pass'
491 print "%s in %.2fs" % (result, time.time() - self.start)
492 return result
493
494 gdb_cmd = None
495 class GdbTest(BaseTest):
496 def __init__(self, target):
497 BaseTest.__init__(self, target)
498 self.gdb = None
499
500 def classSetup(self):
501 BaseTest.classSetup(self)
502 self.logs.append("gdb.log")
503
504 if gdb_cmd:
505 self.gdb = Gdb(gdb_cmd)
506 else:
507 self.gdb = Gdb()
508
509 if self.binary:
510 self.gdb.command("file %s" % self.binary)
511 if self.target:
512 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)
513 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)
514 if self.server.port:
515 self.gdb.command(
516 "target extended-remote localhost:%d" % self.server.port)
517
518 self.gdb.p("$priv=3")
519
520 def classTeardown(self):
521 del self.gdb
522 BaseTest.classTeardown(self)
523
524 class ExamineTarget(GdbTest):
525 def test(self):
526 self.target.misa = self.gdb.p("$misa")
527
528 txt = "RV"
529 if (self.target.misa >> 30) == 1:
530 txt += "32"
531 elif (self.target.misa >> 62) == 2:
532 txt += "64"
533 elif (self.target.misa >> 126) == 3:
534 txt += "128"
535 else:
536 txt += "??"
537
538 for i in range(26):
539 if self.target.misa & (1<<i):
540 txt += chr(i + ord('A'))
541 print txt,
542
543 class TestFailed(Exception):
544 def __init__(self, message):
545 Exception.__init__(self)
546 self.message = message
547
548 class TestNotApplicable(Exception):
549 def __init__(self, message):
550 Exception.__init__(self)
551 self.message = message
552
553 def assertEqual(a, b):
554 if a != b:
555 raise TestFailed("%r != %r" % (a, b))
556
557 def assertNotEqual(a, b):
558 if a == b:
559 raise TestFailed("%r == %r" % (a, b))
560
561 def assertIn(a, b):
562 if a not in b:
563 raise TestFailed("%r not in %r" % (a, b))
564
565 def assertNotIn(a, b):
566 if a in b:
567 raise TestFailed("%r in %r" % (a, b))
568
569 def assertGreater(a, b):
570 if not a > b:
571 raise TestFailed("%r not greater than %r" % (a, b))
572
573 def assertLess(a, b):
574 if not a < b:
575 raise TestFailed("%r not less than %r" % (a, b))
576
577 def assertTrue(a):
578 if not a:
579 raise TestFailed("%r is not True" % a)
580
581 def assertRegexpMatches(text, regexp):
582 if not re.search(regexp, text):
583 raise TestFailed("can't find %r in %r" % (regexp, text))