remove copy of output from use of module
[ieee754fpu.git] / src / add / test_buf_pipe.py
1 from nmigen import Module, Signal, Mux
2 from nmigen.hdl.rec import Record
3 from nmigen.compat.sim import run_simulation
4 from nmigen.cli import verilog, rtlil
5
6 from example_buf_pipe import ExampleBufPipe, ExampleBufPipeAdd
7 from example_buf_pipe import ExampleCombPipe, CombPipe, ExampleStageCls
8 from example_buf_pipe import PrevControl, NextControl, BufferedPipeline
9 from example_buf_pipe import StageChain
10
11 from random import randint
12
13
14 def check_o_n_valid(dut, val):
15 o_n_valid = yield dut.n.o_valid
16 assert o_n_valid == val
17
18
19 def testbench(dut):
20 #yield dut.i_p_rst.eq(1)
21 yield dut.n.i_ready.eq(0)
22 yield dut.p.o_ready.eq(0)
23 yield
24 yield
25 #yield dut.i_p_rst.eq(0)
26 yield dut.n.i_ready.eq(1)
27 yield dut.p.i_data.eq(5)
28 yield dut.p.i_valid.eq(1)
29 yield
30
31 yield dut.p.i_data.eq(7)
32 yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed
33 yield
34 yield from check_o_n_valid(dut, 1) # ok *now* i_p_valid effect is felt
35
36 yield dut.p.i_data.eq(2)
37 yield
38 yield dut.n.i_ready.eq(0) # begin going into "stall" (next stage says ready)
39 yield dut.p.i_data.eq(9)
40 yield
41 yield dut.p.i_valid.eq(0)
42 yield dut.p.i_data.eq(12)
43 yield
44 yield dut.p.i_data.eq(32)
45 yield dut.n.i_ready.eq(1)
46 yield
47 yield from check_o_n_valid(dut, 1) # buffer still needs to output
48 yield
49 yield from check_o_n_valid(dut, 1) # buffer still needs to output
50 yield
51 yield from check_o_n_valid(dut, 0) # buffer outputted, *now* we're done.
52 yield
53
54
55 def testbench2(dut):
56 #yield dut.p.i_rst.eq(1)
57 yield dut.n.i_ready.eq(0)
58 #yield dut.p.o_ready.eq(0)
59 yield
60 yield
61 #yield dut.p.i_rst.eq(0)
62 yield dut.n.i_ready.eq(1)
63 yield dut.p.i_data.eq(5)
64 yield dut.p.i_valid.eq(1)
65 yield
66
67 yield dut.p.i_data.eq(7)
68 yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed 2 clocks
69 yield
70 yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed 2 clocks
71
72 yield dut.p.i_data.eq(2)
73 yield
74 yield from check_o_n_valid(dut, 1) # ok *now* i_p_valid effect is felt
75 yield dut.n.i_ready.eq(0) # begin going into "stall" (next stage says ready)
76 yield dut.p.i_data.eq(9)
77 yield
78 yield dut.p.i_valid.eq(0)
79 yield dut.p.i_data.eq(12)
80 yield
81 yield dut.p.i_data.eq(32)
82 yield dut.n.i_ready.eq(1)
83 yield
84 yield from check_o_n_valid(dut, 1) # buffer still needs to output
85 yield
86 yield from check_o_n_valid(dut, 1) # buffer still needs to output
87 yield
88 yield from check_o_n_valid(dut, 1) # buffer still needs to output
89 yield
90 yield from check_o_n_valid(dut, 0) # buffer outputted, *now* we're done.
91 yield
92 yield
93 yield
94
95
96 class Test3:
97 def __init__(self, dut, resultfn):
98 self.dut = dut
99 self.resultfn = resultfn
100 self.data = []
101 for i in range(num_tests):
102 #data.append(randint(0, 1<<16-1))
103 self.data.append(i+1)
104 self.i = 0
105 self.o = 0
106
107 def send(self):
108 while self.o != len(self.data):
109 send_range = randint(0, 3)
110 for j in range(randint(1,10)):
111 if send_range == 0:
112 send = True
113 else:
114 send = randint(0, send_range) != 0
115 o_p_ready = yield self.dut.p.o_ready
116 if not o_p_ready:
117 yield
118 continue
119 if send and self.i != len(self.data):
120 yield self.dut.p.i_valid.eq(1)
121 yield self.dut.p.i_data.eq(self.data[self.i])
122 self.i += 1
123 else:
124 yield self.dut.p.i_valid.eq(0)
125 yield
126
127 def rcv(self):
128 while self.o != len(self.data):
129 stall_range = randint(0, 3)
130 for j in range(randint(1,10)):
131 stall = randint(0, stall_range) != 0
132 yield self.dut.n.i_ready.eq(stall)
133 yield
134 o_n_valid = yield self.dut.n.o_valid
135 i_n_ready = yield self.dut.n.i_ready
136 if not o_n_valid or not i_n_ready:
137 continue
138 o_data = yield self.dut.n.o_data
139 self.resultfn(o_data, self.data[self.o], self.i, self.o)
140 self.o += 1
141 if self.o == len(self.data):
142 break
143
144 def test3_resultfn(o_data, expected, i, o):
145 assert o_data == expected + 1, \
146 "%d-%d data %x not match %x\n" \
147 % (i, o, o_data, expected)
148
149 def data_dict():
150 data = []
151 for i in range(num_tests):
152 data.append({'src1': randint(0, 1<<16-1),
153 'src2': randint(0, 1<<16-1)})
154 return data
155
156
157 class Test5:
158 def __init__(self, dut, resultfn, data=None):
159 self.dut = dut
160 self.resultfn = resultfn
161 if data:
162 self.data = data
163 else:
164 self.data = []
165 for i in range(num_tests):
166 self.data.append((randint(0, 1<<16-1), randint(0, 1<<16-1)))
167 self.i = 0
168 self.o = 0
169
170 def send(self):
171 while self.o != len(self.data):
172 send_range = randint(0, 3)
173 for j in range(randint(1,10)):
174 if send_range == 0:
175 send = True
176 else:
177 send = randint(0, send_range) != 0
178 o_p_ready = yield self.dut.p.o_ready
179 if not o_p_ready:
180 yield
181 continue
182 if send and self.i != len(self.data):
183 yield self.dut.p.i_valid.eq(1)
184 for v in self.dut.set_input(self.data[self.i]):
185 yield v
186 self.i += 1
187 else:
188 yield self.dut.p.i_valid.eq(0)
189 yield
190
191 def rcv(self):
192 while self.o != len(self.data):
193 stall_range = randint(0, 3)
194 for j in range(randint(1,10)):
195 stall = randint(0, stall_range) != 0
196 yield self.dut.n.i_ready.eq(stall)
197 yield
198 o_n_valid = yield self.dut.n.o_valid
199 i_n_ready = yield self.dut.n.i_ready
200 if not o_n_valid or not i_n_ready:
201 continue
202 if isinstance(self.dut.n.o_data, Record):
203 o_data = {}
204 dod = self.dut.n.o_data
205 for k, v in dod.fields.items():
206 o_data[k] = yield v
207 else:
208 o_data = yield self.dut.n.o_data
209 self.resultfn(o_data, self.data[self.o], self.i, self.o)
210 self.o += 1
211 if self.o == len(self.data):
212 break
213
214 def test5_resultfn(o_data, expected, i, o):
215 res = expected[0] + expected[1]
216 assert o_data == res, \
217 "%d-%d data %x not match %s\n" \
218 % (i, o, o_data, repr(expected))
219
220 def testbench4(dut):
221 data = []
222 for i in range(num_tests):
223 #data.append(randint(0, 1<<16-1))
224 data.append(i+1)
225 i = 0
226 o = 0
227 while True:
228 stall = randint(0, 3) != 0
229 send = randint(0, 5) != 0
230 yield dut.n.i_ready.eq(stall)
231 o_p_ready = yield dut.p.o_ready
232 if o_p_ready:
233 if send and i != len(data):
234 yield dut.p.i_valid.eq(1)
235 yield dut.p.i_data.eq(data[i])
236 i += 1
237 else:
238 yield dut.p.i_valid.eq(0)
239 yield
240 o_n_valid = yield dut.n.o_valid
241 i_n_ready = yield dut.n.i_ready
242 if o_n_valid and i_n_ready:
243 o_data = yield dut.n.o_data
244 assert o_data == data[o] + 2, "%d-%d data %x not match %x\n" \
245 % (i, o, o_data, data[o])
246 o += 1
247 if o == len(data):
248 break
249
250
251 class ExampleBufPipe2:
252 """
253 connect these: ------|---------------|
254 v v
255 i_p_valid >>in pipe1 o_n_valid out>> i_p_valid >>in pipe2
256 o_p_ready <<out pipe1 i_n_ready <<in o_p_ready <<out pipe2
257 p_i_data >>in pipe1 p_i_data out>> n_o_data >>in pipe2
258 """
259 def __init__(self):
260 self.pipe1 = ExampleBufPipe()
261 self.pipe2 = ExampleBufPipe()
262
263 # input
264 self.p = PrevControl()
265 self.p.i_data = Signal(32) # >>in - comes in from the PREVIOUS stage
266
267 # output
268 self.n = NextControl()
269 self.n.o_data = Signal(32) # out>> - goes out to the NEXT stage
270
271 def elaborate(self, platform):
272 m = Module()
273 m.submodules.pipe1 = self.pipe1
274 m.submodules.pipe2 = self.pipe2
275
276 # connect inter-pipe input/output valid/ready/data
277 m.d.comb += self.pipe1.connect_to_next(self.pipe2)
278
279 # inputs/outputs to the module: pipe1 connections here (LHS)
280 m.d.comb += self.pipe1.connect_in(self)
281
282 # now pipe2 connections (RHS)
283 m.d.comb += self.pipe2.connect_out(self)
284
285 return m
286
287
288 class ExampleBufPipeChain2(BufferedPipeline):
289 """ connects two stages together as a *single* combinatorial stage.
290 """
291 def __init__(self):
292 stage1 = ExampleStageCls()
293 stage2 = ExampleStageCls()
294 combined = StageChain([stage1, stage2])
295 BufferedPipeline.__init__(self, combined)
296
297
298 def data_chain2():
299 data = []
300 for i in range(num_tests):
301 data.append(randint(0, 1<<16-2))
302 return data
303
304
305 def test9_resultfn(o_data, expected, i, o):
306 res = expected + 2
307 assert o_data == res, \
308 "%d-%d data %x not match %s\n" \
309 % (i, o, o_data, repr(expected))
310
311
312 class SetLessThan:
313 def __init__(self, width, signed):
314 self.src1 = Signal((width, signed))
315 self.src2 = Signal((width, signed))
316 self.output = Signal(width)
317
318 def elaborate(self, platform):
319 m = Module()
320 m.d.comb += self.output.eq(Mux(self.src1 < self.src2, 1, 0))
321 return m
322
323
324 class LTStage:
325 def __init__(self):
326 self.slt = SetLessThan(16, True)
327
328 def ispec(self):
329 return (Signal(16), Signal(16))
330
331 def ospec(self):
332 return Signal(16)
333
334 def setup(self, m, i):
335 self.o = Signal(16)
336 m.submodules.slt = self.slt
337 m.d.comb += self.slt.src1.eq(i[0])
338 m.d.comb += self.slt.src2.eq(i[1])
339 m.d.comb += self.o.eq(self.slt.output)
340
341 def process(self, i):
342 return self.o
343
344
345 class LTStageDerived(SetLessThan):
346
347 def __init__(self):
348 SetLessThan.__init__(self, 16, True)
349
350 def ispec(self):
351 return (Signal(16), Signal(16))
352
353 def ospec(self):
354 return Signal(16)
355
356 def setup(self, m, i):
357 m.submodules.slt = self
358 m.d.comb += self.src1.eq(i[0])
359 m.d.comb += self.src2.eq(i[1])
360
361 def process(self, i):
362 return self.output
363
364
365 class ExampleLTCombPipe(CombPipe):
366 """ an example of how to use the combinatorial pipeline.
367 """
368
369 def __init__(self):
370 stage = LTStage()
371 CombPipe.__init__(self, stage)
372
373
374 class ExampleLTBufferedPipeDerived(CombPipe):
375 """ an example of how to use the combinatorial pipeline.
376 """
377
378 def __init__(self):
379 stage = LTStageDerived()
380 CombPipe.__init__(self, stage)
381
382
383 def test6_resultfn(o_data, expected, i, o):
384 res = 1 if expected[0] < expected[1] else 0
385 assert o_data == res, \
386 "%d-%d data %x not match %s\n" \
387 % (i, o, o_data, repr(expected))
388
389
390 class ExampleAddRecordStage:
391 """ example use of a Record
392 """
393
394 record_spec = [('src1', 16), ('src2', 16)]
395 def ispec(self):
396 """ returns a tuple of input signals which will be the incoming data
397 """
398 return Record(self.record_spec)
399
400 def ospec(self):
401 return Record(self.record_spec)
402
403 def process(self, i):
404 """ process the input data (sums the values in the tuple) and returns it
405 """
406 return {'src1': i.src1 + 1,
407 'src2': i.src2 + 1}
408
409
410 class ExampleAddRecordPipe(CombPipe):
411 """ an example of how to use the combinatorial pipeline.
412 """
413
414 def __init__(self):
415 stage = ExampleAddRecordStage()
416 CombPipe.__init__(self, stage)
417
418
419 def test7_resultfn(o_data, expected, i, o):
420 res = (expected['src1'] + 1, expected['src2'] + 1)
421 assert o_data['src1'] == res[0] and o_data['src2'] == res[1], \
422 "%d-%d data %s not match %s\n" \
423 % (i, o, repr(o_data), repr(expected))
424
425
426 class Example2OpClass:
427 """ an example of a class used to store 2 operands.
428 requires an eq function, to conform with the pipeline stage API
429 """
430
431 def __init__(self):
432 self.op1 = Signal(16)
433 self.op2 = Signal(16)
434
435 def eq(self, i):
436 return [self.op1.eq(i.op1), self.op2.eq(i.op2)]
437
438
439 class ExampleAddClassStage:
440 """ an example of how to use the buffered pipeline, as a class instance
441 """
442
443 def ispec(self):
444 """ returns an instance of an Example2OpClass.
445 """
446 return Example2OpClass()
447
448 def ospec(self):
449 """ returns an output signal which will happen to contain the sum
450 of the two inputs
451 """
452 return Signal(16)
453
454 def process(self, i):
455 """ process the input data (sums the values in the tuple) and returns it
456 """
457 return i.op1 + i.op2
458
459
460 class ExampleBufPipeAddClass(BufferedPipeline):
461 """ an example of how to use the buffered pipeline, using a class instance
462 """
463
464 def __init__(self):
465 addstage = ExampleAddClassStage()
466 BufferedPipeline.__init__(self, addstage)
467
468
469 class TestInputAdd:
470 """ the eq function, called by set_input, needs an incoming object
471 that conforms to the Example2OpClass.eq function requirements
472 easiest way to do that is to create a class that has the exact
473 same member layout (self.op1, self.op2) as Example2OpClass
474 """
475 def __init__(self, op1, op2):
476 self.op1 = op1
477 self.op2 = op2
478
479
480 def test8_resultfn(o_data, expected, i, o):
481 res = expected.op1 + expected.op2 # these are a TestInputAdd instance
482 assert o_data == res, \
483 "%d-%d data %x not match %s\n" \
484 % (i, o, o_data, repr(expected))
485
486 def data_2op():
487 data = []
488 for i in range(num_tests):
489 data.append(TestInputAdd(randint(0, 1<<16-1), randint(0, 1<<16-1)))
490 return data
491
492
493 num_tests = 100
494
495 if __name__ == '__main__':
496 print ("test 1")
497 dut = ExampleBufPipe()
498 run_simulation(dut, testbench(dut), vcd_name="test_bufpipe.vcd")
499
500 print ("test 2")
501 dut = ExampleBufPipe2()
502 run_simulation(dut, testbench2(dut), vcd_name="test_bufpipe2.vcd")
503
504 print ("test 3")
505 dut = ExampleBufPipe()
506 test = Test3(dut, test3_resultfn)
507 run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe3.vcd")
508
509 print ("test 3.5")
510 dut = ExampleCombPipe()
511 test = Test3(dut, test3_resultfn)
512 run_simulation(dut, [test.send, test.rcv], vcd_name="test_combpipe3.vcd")
513
514 print ("test 4")
515 dut = ExampleBufPipe2()
516 run_simulation(dut, testbench4(dut), vcd_name="test_bufpipe4.vcd")
517
518 print ("test 5")
519 dut = ExampleBufPipeAdd()
520 test = Test5(dut, test5_resultfn)
521 run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe5.vcd")
522
523 print ("test 6")
524 dut = ExampleLTCombPipe()
525 test = Test5(dut, test6_resultfn)
526 run_simulation(dut, [test.send, test.rcv], vcd_name="test_ltcomb6.vcd")
527
528 ports = [dut.p.i_valid, dut.n.i_ready,
529 dut.n.o_valid, dut.p.o_ready] + \
530 list(dut.p.i_data) + [dut.n.o_data]
531 vl = rtlil.convert(dut, ports=ports)
532 with open("test_ltcomb_pipe.il", "w") as f:
533 f.write(vl)
534
535 print ("test 7")
536 dut = ExampleAddRecordPipe()
537 data=data_dict()
538 test = Test5(dut, test7_resultfn, data=data)
539 run_simulation(dut, [test.send, test.rcv], vcd_name="test_addrecord.vcd")
540
541 ports = [dut.p.i_valid, dut.n.i_ready,
542 dut.n.o_valid, dut.p.o_ready,
543 dut.p.i_data.src1, dut.p.i_data.src2,
544 dut.n.o_data.src1, dut.n.o_data.src2]
545 vl = rtlil.convert(dut, ports=ports)
546 with open("test_recordcomb_pipe.il", "w") as f:
547 f.write(vl)
548
549 print ("test 8")
550 dut = ExampleBufPipeAddClass()
551 data=data_2op()
552 test = Test5(dut, test8_resultfn, data=data)
553 run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe8.vcd")
554
555 print ("test 9")
556 dut = ExampleBufPipeChain2()
557 ports = [dut.p.i_valid, dut.n.i_ready,
558 dut.n.o_valid, dut.p.o_ready] + \
559 [dut.p.i_data] + [dut.n.o_data]
560 vl = rtlil.convert(dut, ports=ports)
561 with open("test_bufpipechain2.il", "w") as f:
562 f.write(vl)
563
564 data = data_chain2()
565 test = Test5(dut, test9_resultfn, data=data)
566 run_simulation(dut, [test.send, test.rcv],
567 vcd_name="test_bufpipechain2.vcd")
568
569 print ("test 10")
570 dut = ExampleLTBufferedPipeDerived()
571 test = Test5(dut, test6_resultfn)
572 run_simulation(dut, [test.send, test.rcv], vcd_name="test_ltbufpipe10.vcd")
573 vl = rtlil.convert(dut, ports=ports)
574 with open("test_ltbufpipe10.il", "w") as f:
575 f.write(vl)
576
577