608046533c06e7e0e21290fdf64ade95a84f43a7
[ieee754fpu.git] / src / add / test_buf_pipe.py
1 from nmigen import Module, Signal, Mux
2 from nmigen.compat.sim import run_simulation
3 from example_buf_pipe import ExampleBufPipe, ExampleBufPipeAdd
4 from example_buf_pipe import ExampleCombPipe, CombPipe
5 from example_buf_pipe import PrevControl, NextControl
6 from random import randint
7
8
9 def check_o_n_valid(dut, val):
10 o_n_valid = yield dut.n.o_valid
11 assert o_n_valid == val
12
13
14 def testbench(dut):
15 #yield dut.i_p_rst.eq(1)
16 yield dut.n.i_ready.eq(0)
17 yield dut.p.o_ready.eq(0)
18 yield
19 yield
20 #yield dut.i_p_rst.eq(0)
21 yield dut.n.i_ready.eq(1)
22 yield dut.p.data.eq(5)
23 yield dut.p.i_valid.eq(1)
24 yield
25
26 yield dut.p.data.eq(7)
27 yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed
28 yield
29 yield from check_o_n_valid(dut, 1) # ok *now* i_p_valid effect is felt
30
31 yield dut.p.data.eq(2)
32 yield
33 yield dut.n.i_ready.eq(0) # begin going into "stall" (next stage says ready)
34 yield dut.p.data.eq(9)
35 yield
36 yield dut.p.i_valid.eq(0)
37 yield dut.p.data.eq(12)
38 yield
39 yield dut.p.data.eq(32)
40 yield dut.n.i_ready.eq(1)
41 yield
42 yield from check_o_n_valid(dut, 1) # buffer still needs to output
43 yield
44 yield from check_o_n_valid(dut, 1) # buffer still needs to output
45 yield
46 yield from check_o_n_valid(dut, 0) # buffer outputted, *now* we're done.
47 yield
48
49
50 def testbench2(dut):
51 #yield dut.p.i_rst.eq(1)
52 yield dut.n.i_ready.eq(0)
53 #yield dut.p.o_ready.eq(0)
54 yield
55 yield
56 #yield dut.p.i_rst.eq(0)
57 yield dut.n.i_ready.eq(1)
58 yield dut.p.data.eq(5)
59 yield dut.p.i_valid.eq(1)
60 yield
61
62 yield dut.p.data.eq(7)
63 yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed 2 clocks
64 yield
65 yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed 2 clocks
66
67 yield dut.p.data.eq(2)
68 yield
69 yield from check_o_n_valid(dut, 1) # ok *now* i_p_valid effect is felt
70 yield dut.n.i_ready.eq(0) # begin going into "stall" (next stage says ready)
71 yield dut.p.data.eq(9)
72 yield
73 yield dut.p.i_valid.eq(0)
74 yield dut.p.data.eq(12)
75 yield
76 yield dut.p.data.eq(32)
77 yield dut.n.i_ready.eq(1)
78 yield
79 yield from check_o_n_valid(dut, 1) # buffer still needs to output
80 yield
81 yield from check_o_n_valid(dut, 1) # buffer still needs to output
82 yield
83 yield from check_o_n_valid(dut, 1) # buffer still needs to output
84 yield
85 yield from check_o_n_valid(dut, 0) # buffer outputted, *now* we're done.
86 yield
87 yield
88 yield
89
90
91 class Test3:
92 def __init__(self, dut, resultfn):
93 self.dut = dut
94 self.resultfn = resultfn
95 self.data = []
96 for i in range(num_tests):
97 #data.append(randint(0, 1<<16-1))
98 self.data.append(i+1)
99 self.i = 0
100 self.o = 0
101
102 def send(self):
103 while self.o != len(self.data):
104 send_range = randint(0, 3)
105 for j in range(randint(1,10)):
106 if send_range == 0:
107 send = True
108 else:
109 send = randint(0, send_range) != 0
110 o_p_ready = yield self.dut.p.o_ready
111 if not o_p_ready:
112 yield
113 continue
114 if send and self.i != len(self.data):
115 yield self.dut.p.i_valid.eq(1)
116 yield self.dut.p.data.eq(self.data[self.i])
117 self.i += 1
118 else:
119 yield self.dut.p.i_valid.eq(0)
120 yield
121
122 def rcv(self):
123 while self.o != len(self.data):
124 stall_range = randint(0, 3)
125 for j in range(randint(1,10)):
126 stall = randint(0, stall_range) != 0
127 yield self.dut.n.i_ready.eq(stall)
128 yield
129 o_n_valid = yield self.dut.n.o_valid
130 i_n_ready = yield self.dut.n.i_ready
131 if not o_n_valid or not i_n_ready:
132 continue
133 o_data = yield self.dut.n.data
134 self.resultfn(o_data, self.data[self.o], self.i, self.o)
135 self.o += 1
136 if self.o == len(self.data):
137 break
138
139 def test3_resultfn(o_data, expected, i, o):
140 assert o_data == expected + 1, \
141 "%d-%d data %x not match %x\n" \
142 % (i, o, o_data, expected)
143
144 class Test5:
145 def __init__(self, dut, resultfn):
146 self.dut = dut
147 self.resultfn = resultfn
148 self.data = []
149 for i in range(num_tests):
150 self.data.append((randint(0, 1<<16-1), randint(0, 1<<16-1)))
151 self.i = 0
152 self.o = 0
153
154 def send(self):
155 while self.o != len(self.data):
156 send_range = randint(0, 3)
157 for j in range(randint(1,10)):
158 if send_range == 0:
159 send = True
160 else:
161 send = randint(0, send_range) != 0
162 o_p_ready = yield self.dut.p.o_ready
163 if not o_p_ready:
164 yield
165 continue
166 if send and self.i != len(self.data):
167 yield self.dut.p.i_valid.eq(1)
168 for v in self.dut.set_input(self.data[self.i]):
169 yield v
170 self.i += 1
171 else:
172 yield self.dut.p.i_valid.eq(0)
173 yield
174
175 def rcv(self):
176 while self.o != len(self.data):
177 stall_range = randint(0, 3)
178 for j in range(randint(1,10)):
179 stall = randint(0, stall_range) != 0
180 yield self.dut.n.i_ready.eq(stall)
181 yield
182 o_n_valid = yield self.dut.n.o_valid
183 i_n_ready = yield self.dut.n.i_ready
184 if not o_n_valid or not i_n_ready:
185 continue
186 o_data = yield self.dut.n.data
187 self.resultfn(o_data, self.data[self.o], self.i, self.o)
188 self.o += 1
189 if self.o == len(self.data):
190 break
191
192 def test5_resultfn(o_data, expected, i, o):
193 res = expected[0] + expected[1]
194 assert o_data == res, \
195 "%d-%d data %x not match %s\n" \
196 % (i, o, o_data, repr(expected))
197
198 def testbench4(dut):
199 data = []
200 for i in range(num_tests):
201 #data.append(randint(0, 1<<16-1))
202 data.append(i+1)
203 i = 0
204 o = 0
205 while True:
206 stall = randint(0, 3) != 0
207 send = randint(0, 5) != 0
208 yield dut.n.i_ready.eq(stall)
209 o_p_ready = yield dut.p.o_ready
210 if o_p_ready:
211 if send and i != len(data):
212 yield dut.p.i_valid.eq(1)
213 yield dut.p.data.eq(data[i])
214 i += 1
215 else:
216 yield dut.p.i_valid.eq(0)
217 yield
218 o_n_valid = yield dut.n.o_valid
219 i_n_ready = yield dut.n.i_ready
220 if o_n_valid and i_n_ready:
221 o_data = yield dut.n.data
222 assert o_data == data[o] + 2, "%d-%d data %x not match %x\n" \
223 % (i, o, o_data, data[o])
224 o += 1
225 if o == len(data):
226 break
227
228
229 class ExampleBufPipe2:
230 """
231 connect these: ------|---------------|
232 v v
233 i_p_valid >>in pipe1 o_n_valid out>> i_p_valid >>in pipe2
234 o_p_ready <<out pipe1 i_n_ready <<in o_p_ready <<out pipe2
235 p_data >>in pipe1 o_data out>> p_data >>in pipe2
236 """
237 def __init__(self):
238 self.pipe1 = ExampleBufPipe()
239 self.pipe2 = ExampleBufPipe()
240
241 # input
242 self.p = PrevControl()
243 self.p.data = Signal(32) # >>in - comes in from the PREVIOUS stage
244
245 # output
246 self.n = NextControl()
247 self.n.data = Signal(32) # out>> - goes out to the NEXT stage
248
249 def elaborate(self, platform):
250 m = Module()
251 m.submodules.pipe1 = self.pipe1
252 m.submodules.pipe2 = self.pipe2
253
254 # connect inter-pipe input/output valid/ready/data
255 m.d.comb += self.pipe1.connect_to_next(self.pipe2)
256
257 # inputs/outputs to the module: pipe1 connections here (LHS)
258 m.d.comb += self.pipe1.connect_in(self)
259
260 # now pipe2 connections (RHS)
261 m.d.comb += self.pipe2.connect_out(self)
262
263 return m
264
265 class SetLessThan:
266 def __init__(self, width, signed):
267 self.src1 = Signal((width, signed))
268 self.src2 = Signal((width, signed))
269 self.output = Signal(width)
270
271 def elaborate(self, platform):
272 m = Module()
273 m.d.comb += self.output.eq(Mux(self.src1 < self.src2, 1, 0))
274 return m
275
276
277 class LTStage:
278 def __init__(self):
279 self.slt = SetLessThan(16, True)
280
281 def ispec(self):
282 return (Signal(16), Signal(16))
283
284 def ospec(self):
285 return Signal(16)
286
287 def setup(self, m, i):
288 self.o = Signal(16)
289 m.submodules.slt = self.slt
290 m.d.comb += self.slt.src1.eq(i[0])
291 m.d.comb += self.slt.src2.eq(i[1])
292 m.d.comb += self.o.eq(self.slt.output)
293
294 def process(self, i):
295 return self.o
296
297
298 class ExampleLTCombPipe(CombPipe):
299 """ an example of how to use the combinatorial pipeline.
300 """
301
302 def __init__(self):
303 stage = LTStage()
304 CombPipe.__init__(self, stage)
305
306
307 def test6_resultfn(o_data, expected, i, o):
308 res = 1 if expected[0] < expected[1] else 0
309 assert o_data == res, \
310 "%d-%d data %x not match %s\n" \
311 % (i, o, o_data, repr(expected))
312
313
314 num_tests = 1000
315
316 if __name__ == '__main__':
317 print ("test 1")
318 dut = ExampleBufPipe()
319 run_simulation(dut, testbench(dut), vcd_name="test_bufpipe.vcd")
320
321 print ("test 2")
322 dut = ExampleBufPipe2()
323 run_simulation(dut, testbench2(dut), vcd_name="test_bufpipe2.vcd")
324
325 print ("test 3")
326 dut = ExampleBufPipe()
327 test = Test3(dut, test3_resultfn)
328 run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe3.vcd")
329
330 print ("test 3.5")
331 dut = ExampleCombPipe()
332 test = Test3(dut, test3_resultfn)
333 run_simulation(dut, [test.send, test.rcv], vcd_name="test_combpipe3.vcd")
334
335 print ("test 4")
336 dut = ExampleBufPipe2()
337 run_simulation(dut, testbench4(dut), vcd_name="test_bufpipe4.vcd")
338
339 print ("test 5")
340 dut = ExampleBufPipeAdd()
341 test = Test5(dut, test5_resultfn)
342 run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe5.vcd")
343
344 print ("test 6")
345 dut = ExampleLTCombPipe()
346 test = Test5(dut, test6_resultfn)
347 run_simulation(dut, [test.send, test.rcv], vcd_name="test_ltcomb6.vcd")
348