refactor the buffered pipeline to a cleaner API with better separation
[ieee754fpu.git] / src / add / test_buf_pipe.py
1 from nmigen import Module, Signal
2 from nmigen.compat.sim import run_simulation
3 from example_buf_pipe import ExampleBufPipe, ExampleBufPipeAdd
4 from random import randint
5
6
7 def check_o_n_valid(dut, val):
8 o_n_valid = yield dut.o.n_valid
9 assert o_n_valid == val
10
11 def check_o_n_valid2(dut, val):
12 o_n_valid = yield dut.o_n_valid
13 assert o_n_valid == val
14
15
16 def testbench(dut):
17 #yield dut.i_p_rst.eq(1)
18 yield dut.i.n_ready.eq(0)
19 yield dut.o.p_ready.eq(0)
20 yield
21 yield
22 #yield dut.i_p_rst.eq(0)
23 yield dut.i.n_ready.eq(1)
24 yield dut.i.data.eq(5)
25 yield dut.i.p_valid.eq(1)
26 yield
27
28 yield dut.i.data.eq(7)
29 yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed
30 yield
31 yield from check_o_n_valid(dut, 1) # ok *now* i_p_valid effect is felt
32
33 yield dut.i.data.eq(2)
34 yield
35 yield dut.i.n_ready.eq(0) # begin going into "stall" (next stage says ready)
36 yield dut.i.data.eq(9)
37 yield
38 yield dut.i.p_valid.eq(0)
39 yield dut.i.data.eq(12)
40 yield
41 yield dut.i.data.eq(32)
42 yield dut.i.n_ready.eq(1)
43 yield
44 yield from check_o_n_valid(dut, 1) # buffer still needs to output
45 yield
46 yield from check_o_n_valid(dut, 1) # buffer still needs to output
47 yield
48 yield from check_o_n_valid(dut, 0) # buffer outputted, *now* we're done.
49 yield
50
51
52 def testbench2(dut):
53 #yield dut.i.p_rst.eq(1)
54 yield dut.i_n_ready.eq(0)
55 #yield dut.o.p_ready.eq(0)
56 yield
57 yield
58 #yield dut.i.p_rst.eq(0)
59 yield dut.i_n_ready.eq(1)
60 yield dut.i_data.eq(5)
61 yield dut.i_p_valid.eq(1)
62 yield
63
64 yield dut.i_data.eq(7)
65 yield from check_o_n_valid2(dut, 0) # effects of i_p_valid delayed 2 clocks
66 yield
67 yield from check_o_n_valid2(dut, 0) # effects of i_p_valid delayed 2 clocks
68
69 yield dut.i_data.eq(2)
70 yield
71 yield from check_o_n_valid2(dut, 1) # ok *now* i_p_valid effect is felt
72 yield dut.i_n_ready.eq(0) # begin going into "stall" (next stage says ready)
73 yield dut.i_data.eq(9)
74 yield
75 yield dut.i_p_valid.eq(0)
76 yield dut.i_data.eq(12)
77 yield
78 yield dut.i_data.eq(32)
79 yield dut.i_n_ready.eq(1)
80 yield
81 yield from check_o_n_valid2(dut, 1) # buffer still needs to output
82 yield
83 yield from check_o_n_valid2(dut, 1) # buffer still needs to output
84 yield
85 yield from check_o_n_valid2(dut, 1) # buffer still needs to output
86 yield
87 yield from check_o_n_valid2(dut, 0) # buffer outputted, *now* we're done.
88 yield
89 yield
90 yield
91
92
93 class Test3:
94 def __init__(self, dut):
95 self.dut = dut
96 self.data = []
97 for i in range(num_tests):
98 #data.append(randint(0, 1<<16-1))
99 self.data.append(i+1)
100 self.i = 0
101 self.o = 0
102
103 def send(self):
104 while self.o != len(self.data):
105 send_range = randint(0, 3)
106 for j in range(randint(1,10)):
107 if send_range == 0:
108 send = True
109 else:
110 send = randint(0, send_range) != 0
111 o_p_ready = yield self.dut.o.p_ready
112 if not o_p_ready:
113 yield
114 continue
115 if send and self.i != len(self.data):
116 yield self.dut.i.p_valid.eq(1)
117 yield self.dut.i.data.eq(self.data[self.i])
118 self.i += 1
119 else:
120 yield self.dut.i.p_valid.eq(0)
121 yield
122
123 def rcv(self):
124 while self.o != len(self.data):
125 stall_range = randint(0, 3)
126 for j in range(randint(1,10)):
127 stall = randint(0, stall_range) != 0
128 yield self.dut.i.n_ready.eq(stall)
129 yield
130 o_n_valid = yield self.dut.o.n_valid
131 i_n_ready = yield self.dut.i.n_ready
132 if not o_n_valid or not i_n_ready:
133 continue
134 o_data = yield self.dut.o.data
135 assert o_data == self.data[self.o] + 1, \
136 "%d-%d data %x not match %x\n" \
137 % (self.i, self.o, o_data, self.data[self.o])
138 self.o += 1
139 if self.o == len(self.data):
140 break
141
142
143 class Test5:
144 def __init__(self, dut):
145 self.dut = dut
146 self.data = []
147 for i in range(num_tests):
148 self.data.append((randint(0, 1<<14), randint(0, 1<<14)))
149 self.i = 0
150 self.o = 0
151
152 def send(self):
153 while self.o != len(self.data):
154 send_range = randint(0, 3)
155 for j in range(randint(1,10)):
156 if send_range == 0:
157 send = True
158 else:
159 send = randint(0, send_range) != 0
160 o_p_ready = yield self.dut.o.p_ready
161 if not o_p_ready:
162 yield
163 continue
164 if send and self.i != len(self.data):
165 yield self.dut.i.p_valid.eq(1)
166 for v in self.dut.set_input(self.data[self.i]):
167 yield v
168 self.i += 1
169 else:
170 yield self.dut.i.p_valid.eq(0)
171 yield
172
173 def rcv(self):
174 while self.o != len(self.data):
175 stall_range = randint(0, 3)
176 for j in range(randint(1,10)):
177 stall = randint(0, stall_range) != 0
178 yield self.dut.i.n_ready.eq(stall)
179 yield
180 o_n_valid = yield self.dut.o.n_valid
181 i_n_ready = yield self.dut.i.n_ready
182 if not o_n_valid or not i_n_ready:
183 continue
184 o_data = yield self.dut.o.data
185 res = self.data[self.o][0] + self.data[self.o][1]
186 assert o_data == res, \
187 "%d-%d data %x not match %s\n" \
188 % (self.i, self.o, o_data, repr(self.data[self.o]))
189 self.o += 1
190 if self.o == len(self.data):
191 break
192
193
194 def testbench4(dut):
195 data = []
196 for i in range(num_tests):
197 #data.append(randint(0, 1<<16-1))
198 data.append(i+1)
199 i = 0
200 o = 0
201 while True:
202 stall = randint(0, 3) != 0
203 send = randint(0, 5) != 0
204 yield dut.i_n_ready.eq(stall)
205 o_p_ready = yield dut.o_p_ready
206 if o_p_ready:
207 if send and i != len(data):
208 yield dut.i_p_valid.eq(1)
209 yield dut.i_data.eq(data[i])
210 i += 1
211 else:
212 yield dut.i_p_valid.eq(0)
213 yield
214 o_n_valid = yield dut.o_n_valid
215 i_n_ready = yield dut.i_n_ready
216 if o_n_valid and i_n_ready:
217 o_data = yield dut.o_data
218 assert o_data == data[o] + 2, "%d-%d data %x not match %x\n" \
219 % (i, o, o_data, data[o])
220 o += 1
221 if o == len(data):
222 break
223
224
225 class ExampleBufPipe2:
226 """
227 connect these: ------|---------------|
228 v v
229 i_p_valid >>in pipe1 o_n_valid out>> i_p_valid >>in pipe2
230 o_p_ready <<out pipe1 i_n_ready <<in o_p_ready <<out pipe2
231 i_data >>in pipe1 o_data out>> i_data >>in pipe2
232 """
233 def __init__(self):
234 self.pipe1 = ExampleBufPipe()
235 self.pipe2 = ExampleBufPipe()
236
237 # input
238 self.i_p_valid = Signal() # >>in - comes in from PREVIOUS stage
239 self.i_n_ready = Signal() # in<< - comes in from the NEXT stage
240 self.i_data = Signal(32) # >>in - comes in from the PREVIOUS stage
241
242 # output
243 self.o_n_valid = Signal() # out>> - goes out to the NEXT stage
244 self.o_p_ready = Signal() # <<out - goes out to the PREVIOUS stage
245 self.o_data = Signal(32) # out>> - goes out to the NEXT stage
246
247 def elaborate(self, platform):
248 m = Module()
249 m.submodules.pipe1 = self.pipe1
250 m.submodules.pipe2 = self.pipe2
251
252 # connect inter-pipe input/output valid/ready/data
253 m.d.comb += self.pipe2.i.p_valid.eq(self.pipe1.o.n_valid)
254 m.d.comb += self.pipe1.i.n_ready.eq(self.pipe2.o.p_ready)
255 m.d.comb += self.pipe2.i.data.eq(self.pipe1.o.data)
256
257 # inputs/outputs to the module: pipe1 connections here (LHS)
258 m.d.comb += self.pipe1.i.p_valid.eq(self.i_p_valid)
259 m.d.comb += self.o_p_ready.eq(self.pipe1.o.p_ready)
260 m.d.comb += self.pipe1.i.data.eq(self.i_data)
261
262 # now pipe2 connections (RHS)
263 m.d.comb += self.o_n_valid.eq(self.pipe2.o.n_valid)
264 m.d.comb += self.pipe2.i.n_ready.eq(self.i_n_ready)
265 m.d.comb += self.o_data.eq(self.pipe2.o.data)
266
267 return m
268
269 num_tests = 10000
270
271 if __name__ == '__main__':
272 print ("test 1")
273 dut = ExampleBufPipe()
274 run_simulation(dut, testbench(dut), vcd_name="test_bufpipe.vcd")
275
276 print ("test 2")
277 dut = ExampleBufPipe2()
278 run_simulation(dut, testbench2(dut), vcd_name="test_bufpipe2.vcd")
279
280 print ("test 3")
281 dut = ExampleBufPipe()
282 test = Test3(dut)
283 run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe3.vcd")
284
285 print ("test 4")
286 dut = ExampleBufPipe2()
287 run_simulation(dut, testbench4(dut), vcd_name="test_bufpipe4.vcd")
288
289 print ("test 5")
290 dut = ExampleBufPipeAdd()
291 test = Test5(dut)
292 run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe5.vcd")
293