flush spike log after initial write.
[riscv-tests.git] / debug / testlib.py
1 import os.path
2 import pexpect
3 import shlex
4 import subprocess
5 import tempfile
6 import testlib
7 import unittest
8
9 # Note that gdb comes with its own testsuite. I was unable to figure out how to
10 # run that testsuite against the spike simulator.
11
12 def find_file(path):
13 for directory in (os.getcwd(), os.path.dirname(testlib.__file__)):
14 fullpath = os.path.join(directory, path)
15 if os.path.exists(fullpath):
16 return fullpath
17 return None
18
19 def compile(args, xlen=32):
20 """Compile a single .c file into a binary."""
21 dst = os.path.splitext(args[0])[0]
22 cc = os.path.expandvars("$RISCV/bin/riscv%d-unknown-elf-gcc" % xlen)
23 cmd = [cc, "-g", "-o", dst]
24 for arg in args:
25 found = find_file(arg)
26 if found:
27 cmd.append(found)
28 else:
29 cmd.append(arg)
30 cmd = " ".join(cmd)
31 result = os.system(cmd)
32 assert result == 0, "%r failed" % cmd
33 return dst
34
35 def unused_port():
36 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
37 import socket
38 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
39 s.bind(("",0))
40 port = s.getsockname()[1]
41 s.close()
42 return port
43
44 class Spike(object):
45 def __init__(self, cmd, binary=None, halted=False, with_gdb=True, timeout=None,
46 xlen=64):
47 """Launch spike. Return tuple of its process and the port it's running on."""
48 if cmd:
49 cmd = shlex.split(cmd)
50 else:
51 cmd = ["spike"]
52 if (xlen == 32):
53 cmd += ["--isa", "RV32"]
54
55 if timeout:
56 cmd = ["timeout", str(timeout)] + cmd
57
58 if halted:
59 cmd.append('-H')
60 if with_gdb:
61 self.port = unused_port()
62 cmd += ['--gdb-port', str(self.port)]
63 cmd.append('pk')
64 if binary:
65 cmd.append(binary)
66 logfile = open("spike.log", "w")
67 logfile.write("+ %s\n" % " ".join(cmd))
68 logfile.flush()
69 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=logfile,
70 stderr=logfile)
71
72 def __del__(self):
73 try:
74 self.process.kill()
75 self.process.wait()
76 except OSError:
77 pass
78
79 def wait(self, *args, **kwargs):
80 return self.process.wait(*args, **kwargs)
81
82 class Openocd(object):
83 def __init__(self, cmd=None, config=None, debug=False):
84 if cmd:
85 cmd = shlex.split(cmd)
86 else:
87 cmd = ["openocd"]
88 if config:
89 cmd += ["-f", find_file(config)]
90 if debug:
91 cmd.append("-d")
92 logfile = open("openocd.log", "w")
93 logfile.write("+ %s\n" % " ".join(cmd))
94 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=logfile,
95 stderr=logfile)
96 # TODO: Pick a random port
97 self.port = 3333
98
99 def __del__(self):
100 try:
101 self.process.kill()
102 self.process.wait()
103 except OSError:
104 pass
105
106 class Gdb(object):
107 def __init__(self,
108 path=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):
109 self.child = pexpect.spawn(path)
110 self.child.logfile = file("gdb.log", "w")
111 self.child.logfile.write("+ %s\n" % path)
112 self.wait()
113 self.command("set confirm off")
114 self.command("set width 0")
115 self.command("set height 0")
116 # Force consistency.
117 self.command("set print entry-values no")
118
119 def wait(self):
120 """Wait for prompt."""
121 self.child.expect("\(gdb\)")
122
123 def command(self, command, timeout=-1):
124 self.child.sendline(command)
125 self.child.expect("\n", timeout=timeout)
126 self.child.expect("\(gdb\)", timeout=timeout)
127 return self.child.before.strip()
128
129 def c(self, wait=True):
130 if wait:
131 output = self.command("c")
132 assert "Continuing" in output
133 return output
134 else:
135 self.child.sendline("c")
136 self.child.expect("Continuing")
137
138 def interrupt(self):
139 self.child.send("\003");
140 self.child.expect("\(gdb\)")
141 return self.child.before.strip()
142
143 def x(self, address, size='w'):
144 output = self.command("x/%s %s" % (size, address))
145 value = int(output.split(':')[1].strip(), 0)
146 return value
147
148 def p(self, obj):
149 output = self.command("p/x %s" % obj)
150 value = int(output.split('=')[-1].strip(), 0)
151 return value
152
153 def stepi(self):
154 output = self.command("stepi")
155 assert "Cannot" not in output
156 return output
157
158 def load(self):
159 output = self.command("load", timeout=60)
160 assert "failed" not in output
161 assert "Transfer rate" in output
162
163 def b(self, location):
164 output = self.command("b %s" % location)
165 assert "not defined" not in output
166 assert "Breakpoint" in output
167 return output
168
169 def hbreak(self, location):
170 output = self.command("hbreak %s" % location)
171 assert "not defined" not in output
172 assert "Hardware assisted breakpoint" in output
173 return output