update docstrings
[ieee754fpu.git] / src / add / test_buf_pipe.py
1 from nmigen import Module, Signal, Mux
2 from nmigen.hdl.rec import Record
3 from nmigen.compat.sim import run_simulation
4 from nmigen.cli import verilog, rtlil
5
6 from example_buf_pipe import ExampleBufPipe, ExampleBufPipeAdd
7 from example_buf_pipe import ExamplePipeline, UnbufferedPipeline
8 from example_buf_pipe import ExampleStageCls
9 from example_buf_pipe import PrevControl, NextControl, BufferedPipeline
10 from example_buf_pipe import StageChain
11
12 from random import randint
13
14
15 def check_o_n_valid(dut, val):
16 o_n_valid = yield dut.n.o_valid
17 assert o_n_valid == val
18
19
20 def testbench(dut):
21 #yield dut.i_p_rst.eq(1)
22 yield dut.n.i_ready.eq(0)
23 yield dut.p.o_ready.eq(0)
24 yield
25 yield
26 #yield dut.i_p_rst.eq(0)
27 yield dut.n.i_ready.eq(1)
28 yield dut.p.i_data.eq(5)
29 yield dut.p.i_valid.eq(1)
30 yield
31
32 yield dut.p.i_data.eq(7)
33 yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed
34 yield
35 yield from check_o_n_valid(dut, 1) # ok *now* i_p_valid effect is felt
36
37 yield dut.p.i_data.eq(2)
38 yield
39 yield dut.n.i_ready.eq(0) # begin going into "stall" (next stage says ready)
40 yield dut.p.i_data.eq(9)
41 yield
42 yield dut.p.i_valid.eq(0)
43 yield dut.p.i_data.eq(12)
44 yield
45 yield dut.p.i_data.eq(32)
46 yield dut.n.i_ready.eq(1)
47 yield
48 yield from check_o_n_valid(dut, 1) # buffer still needs to output
49 yield
50 yield from check_o_n_valid(dut, 1) # buffer still needs to output
51 yield
52 yield from check_o_n_valid(dut, 0) # buffer outputted, *now* we're done.
53 yield
54
55
56 def testbench2(dut):
57 #yield dut.p.i_rst.eq(1)
58 yield dut.n.i_ready.eq(0)
59 #yield dut.p.o_ready.eq(0)
60 yield
61 yield
62 #yield dut.p.i_rst.eq(0)
63 yield dut.n.i_ready.eq(1)
64 yield dut.p.i_data.eq(5)
65 yield dut.p.i_valid.eq(1)
66 yield
67
68 yield dut.p.i_data.eq(7)
69 yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed 2 clocks
70 yield
71 yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed 2 clocks
72
73 yield dut.p.i_data.eq(2)
74 yield
75 yield from check_o_n_valid(dut, 1) # ok *now* i_p_valid effect is felt
76 yield dut.n.i_ready.eq(0) # begin going into "stall" (next stage says ready)
77 yield dut.p.i_data.eq(9)
78 yield
79 yield dut.p.i_valid.eq(0)
80 yield dut.p.i_data.eq(12)
81 yield
82 yield dut.p.i_data.eq(32)
83 yield dut.n.i_ready.eq(1)
84 yield
85 yield from check_o_n_valid(dut, 1) # buffer still needs to output
86 yield
87 yield from check_o_n_valid(dut, 1) # buffer still needs to output
88 yield
89 yield from check_o_n_valid(dut, 1) # buffer still needs to output
90 yield
91 yield from check_o_n_valid(dut, 0) # buffer outputted, *now* we're done.
92 yield
93 yield
94 yield
95
96
97 class Test3:
98 def __init__(self, dut, resultfn):
99 self.dut = dut
100 self.resultfn = resultfn
101 self.data = []
102 for i in range(num_tests):
103 #data.append(randint(0, 1<<16-1))
104 self.data.append(i+1)
105 self.i = 0
106 self.o = 0
107
108 def send(self):
109 while self.o != len(self.data):
110 send_range = randint(0, 3)
111 for j in range(randint(1,10)):
112 if send_range == 0:
113 send = True
114 else:
115 send = randint(0, send_range) != 0
116 o_p_ready = yield self.dut.p.o_ready
117 if not o_p_ready:
118 yield
119 continue
120 if send and self.i != len(self.data):
121 yield self.dut.p.i_valid.eq(1)
122 yield self.dut.p.i_data.eq(self.data[self.i])
123 self.i += 1
124 else:
125 yield self.dut.p.i_valid.eq(0)
126 yield
127
128 def rcv(self):
129 while self.o != len(self.data):
130 stall_range = randint(0, 3)
131 for j in range(randint(1,10)):
132 stall = randint(0, stall_range) != 0
133 yield self.dut.n.i_ready.eq(stall)
134 yield
135 o_n_valid = yield self.dut.n.o_valid
136 i_n_ready = yield self.dut.n.i_ready
137 if not o_n_valid or not i_n_ready:
138 continue
139 o_data = yield self.dut.n.o_data
140 self.resultfn(o_data, self.data[self.o], self.i, self.o)
141 self.o += 1
142 if self.o == len(self.data):
143 break
144
145 def test3_resultfn(o_data, expected, i, o):
146 assert o_data == expected + 1, \
147 "%d-%d data %x not match %x\n" \
148 % (i, o, o_data, expected)
149
150 def data_placeholder():
151 data = []
152 for i in range(num_tests):
153 d = PlaceHolder()
154 d.src1 = randint(0, 1<<16-1)
155 d.src2 = randint(0, 1<<16-1)
156 data.append(d)
157 return data
158
159 def data_dict():
160 data = []
161 for i in range(num_tests):
162 data.append({'src1': randint(0, 1<<16-1),
163 'src2': randint(0, 1<<16-1)})
164 return data
165
166
167 class Test5:
168 def __init__(self, dut, resultfn, data=None):
169 self.dut = dut
170 self.resultfn = resultfn
171 if data:
172 self.data = data
173 else:
174 self.data = []
175 for i in range(num_tests):
176 self.data.append((randint(0, 1<<16-1), randint(0, 1<<16-1)))
177 self.i = 0
178 self.o = 0
179
180 def send(self):
181 while self.o != len(self.data):
182 send_range = randint(0, 3)
183 for j in range(randint(1,10)):
184 if send_range == 0:
185 send = True
186 else:
187 send = randint(0, send_range) != 0
188 o_p_ready = yield self.dut.p.o_ready
189 if not o_p_ready:
190 yield
191 continue
192 if send and self.i != len(self.data):
193 yield self.dut.p.i_valid.eq(1)
194 for v in self.dut.set_input(self.data[self.i]):
195 yield v
196 self.i += 1
197 else:
198 yield self.dut.p.i_valid.eq(0)
199 yield
200
201 def rcv(self):
202 while self.o != len(self.data):
203 stall_range = randint(0, 3)
204 for j in range(randint(1,10)):
205 stall = randint(0, stall_range) != 0
206 yield self.dut.n.i_ready.eq(stall)
207 yield
208 o_n_valid = yield self.dut.n.o_valid
209 i_n_ready = yield self.dut.n.i_ready
210 if not o_n_valid or not i_n_ready:
211 continue
212 if isinstance(self.dut.n.o_data, Record):
213 o_data = {}
214 dod = self.dut.n.o_data
215 for k, v in dod.fields.items():
216 o_data[k] = yield v
217 else:
218 o_data = yield self.dut.n.o_data
219 self.resultfn(o_data, self.data[self.o], self.i, self.o)
220 self.o += 1
221 if self.o == len(self.data):
222 break
223
224 def test5_resultfn(o_data, expected, i, o):
225 res = expected[0] + expected[1]
226 assert o_data == res, \
227 "%d-%d data %x not match %s\n" \
228 % (i, o, o_data, repr(expected))
229
230 def testbench4(dut):
231 data = []
232 for i in range(num_tests):
233 #data.append(randint(0, 1<<16-1))
234 data.append(i+1)
235 i = 0
236 o = 0
237 while True:
238 stall = randint(0, 3) != 0
239 send = randint(0, 5) != 0
240 yield dut.n.i_ready.eq(stall)
241 o_p_ready = yield dut.p.o_ready
242 if o_p_ready:
243 if send and i != len(data):
244 yield dut.p.i_valid.eq(1)
245 yield dut.p.i_data.eq(data[i])
246 i += 1
247 else:
248 yield dut.p.i_valid.eq(0)
249 yield
250 o_n_valid = yield dut.n.o_valid
251 i_n_ready = yield dut.n.i_ready
252 if o_n_valid and i_n_ready:
253 o_data = yield dut.n.o_data
254 assert o_data == data[o] + 2, "%d-%d data %x not match %x\n" \
255 % (i, o, o_data, data[o])
256 o += 1
257 if o == len(data):
258 break
259
260
261 class ExampleBufPipe2:
262 """
263 connect these: ------|---------------|
264 v v
265 i_p_valid >>in pipe1 o_n_valid out>> i_p_valid >>in pipe2
266 o_p_ready <<out pipe1 i_n_ready <<in o_p_ready <<out pipe2
267 p_i_data >>in pipe1 p_i_data out>> n_o_data >>in pipe2
268 """
269 def __init__(self):
270 self.pipe1 = ExampleBufPipe()
271 self.pipe2 = ExampleBufPipe()
272
273 # input
274 self.p = PrevControl()
275 self.p.i_data = Signal(32) # >>in - comes in from the PREVIOUS stage
276
277 # output
278 self.n = NextControl()
279 self.n.o_data = Signal(32) # out>> - goes out to the NEXT stage
280
281 def elaborate(self, platform):
282 m = Module()
283 m.submodules.pipe1 = self.pipe1
284 m.submodules.pipe2 = self.pipe2
285
286 # connect inter-pipe input/output valid/ready/data
287 m.d.comb += self.pipe1.connect_to_next(self.pipe2)
288
289 # inputs/outputs to the module: pipe1 connections here (LHS)
290 m.d.comb += self.pipe1.connect_in(self)
291
292 # now pipe2 connections (RHS)
293 m.d.comb += self.pipe2.connect_out(self)
294
295 return m
296
297
298 class ExampleBufPipeChain2(BufferedPipeline):
299 """ connects two stages together as a *single* combinatorial stage.
300 """
301 def __init__(self):
302 stage1 = ExampleStageCls()
303 stage2 = ExampleStageCls()
304 combined = StageChain([stage1, stage2])
305 BufferedPipeline.__init__(self, combined)
306
307
308 def data_chain2():
309 data = []
310 for i in range(num_tests):
311 data.append(randint(0, 1<<16-2))
312 return data
313
314
315 def test9_resultfn(o_data, expected, i, o):
316 res = expected + 2
317 assert o_data == res, \
318 "%d-%d data %x not match %s\n" \
319 % (i, o, o_data, repr(expected))
320
321
322 class SetLessThan:
323 def __init__(self, width, signed):
324 self.src1 = Signal((width, signed))
325 self.src2 = Signal((width, signed))
326 self.output = Signal(width)
327
328 def elaborate(self, platform):
329 m = Module()
330 m.d.comb += self.output.eq(Mux(self.src1 < self.src2, 1, 0))
331 return m
332
333
334 class LTStage:
335 def __init__(self):
336 self.slt = SetLessThan(16, True)
337
338 def ispec(self):
339 return (Signal(16), Signal(16))
340
341 def ospec(self):
342 return Signal(16)
343
344 def setup(self, m, i):
345 self.o = Signal(16)
346 m.submodules.slt = self.slt
347 m.d.comb += self.slt.src1.eq(i[0])
348 m.d.comb += self.slt.src2.eq(i[1])
349 m.d.comb += self.o.eq(self.slt.output)
350
351 def process(self, i):
352 return self.o
353
354
355 class LTStageDerived(SetLessThan):
356
357 def __init__(self):
358 SetLessThan.__init__(self, 16, True)
359
360 def ispec(self):
361 return (Signal(16), Signal(16))
362
363 def ospec(self):
364 return Signal(16)
365
366 def setup(self, m, i):
367 m.submodules.slt = self
368 m.d.comb += self.src1.eq(i[0])
369 m.d.comb += self.src2.eq(i[1])
370
371 def process(self, i):
372 return self.output
373
374
375 class ExampleLTPipeline(UnbufferedPipeline):
376 """ an example of how to use the combinatorial pipeline.
377 """
378
379 def __init__(self):
380 stage = LTStage()
381 UnbufferedPipeline.__init__(self, stage)
382
383
384 class ExampleLTBufferedPipeDerived(BufferedPipeline):
385 """ an example of how to use the combinatorial pipeline.
386 """
387
388 def __init__(self):
389 stage = LTStageDerived()
390 BufferedPipeline.__init__(self, stage)
391
392
393 def test6_resultfn(o_data, expected, i, o):
394 res = 1 if expected[0] < expected[1] else 0
395 assert o_data == res, \
396 "%d-%d data %x not match %s\n" \
397 % (i, o, o_data, repr(expected))
398
399
400 class ExampleAddRecordStage:
401 """ example use of a Record
402 """
403
404 record_spec = [('src1', 16), ('src2', 16)]
405 def ispec(self):
406 """ returns a Record using the specification
407 """
408 return Record(self.record_spec)
409
410 def ospec(self):
411 return Record(self.record_spec)
412
413 def process(self, i):
414 """ process the input data, returning a dictionary with key names
415 that exactly match the Record's attributes.
416 """
417 return {'src1': i.src1 + 1,
418 'src2': i.src2 + 1}
419
420
421 class ExampleAddRecordPlaceHolderStage:
422 """ example use of a Record, with a placeholder as the processing result
423 """
424
425 record_spec = [('src1', 16), ('src2', 16)]
426 def ispec(self):
427 """ returns a Record using the specification
428 """
429 return Record(self.record_spec)
430
431 def ospec(self):
432 return Record(self.record_spec)
433
434 def process(self, i):
435 """ process the input data, returning a PlaceHolder class instance
436 with attributes that exactly match those of the Record.
437 """
438 o = PlaceHolder()
439 o.src1 = i.src1 + 1
440 o.src2 = i.src2 + 1
441 return o
442
443 class PlaceHolder: pass
444
445
446 class ExampleAddRecordPipe(UnbufferedPipeline):
447 """ an example of how to use the combinatorial pipeline.
448 """
449
450 def __init__(self):
451 stage = ExampleAddRecordStage()
452 UnbufferedPipeline.__init__(self, stage)
453
454
455 def test7_resultfn(o_data, expected, i, o):
456 res = (expected['src1'] + 1, expected['src2'] + 1)
457 assert o_data['src1'] == res[0] and o_data['src2'] == res[1], \
458 "%d-%d data %s not match %s\n" \
459 % (i, o, repr(o_data), repr(expected))
460
461
462 class ExampleAddRecordPlaceHolderPipe(UnbufferedPipeline):
463 """ an example of how to use the combinatorial pipeline.
464 """
465
466 def __init__(self):
467 stage = ExampleAddRecordPlaceHolderStage()
468 UnbufferedPipeline.__init__(self, stage)
469
470
471 def test11_resultfn(o_data, expected, i, o):
472 res1 = expected.src1 + 1
473 res2 = expected.src2 + 1
474 assert o_data['src1'] == res1 and o_data['src2'] == res2, \
475 "%d-%d data %s not match %s\n" \
476 % (i, o, repr(o_data), repr(expected))
477
478
479 class Example2OpClass:
480 """ an example of a class used to store 2 operands.
481 requires an eq function, to conform with the pipeline stage API
482 """
483
484 def __init__(self):
485 self.op1 = Signal(16)
486 self.op2 = Signal(16)
487
488 def eq(self, i):
489 return [self.op1.eq(i.op1), self.op2.eq(i.op2)]
490
491
492 class ExampleAddClassStage:
493 """ an example of how to use the buffered pipeline, as a class instance
494 """
495
496 def ispec(self):
497 """ returns an instance of an Example2OpClass.
498 """
499 return Example2OpClass()
500
501 def ospec(self):
502 """ returns an output signal which will happen to contain the sum
503 of the two inputs
504 """
505 return Signal(16)
506
507 def process(self, i):
508 """ process the input data (sums the values in the tuple) and returns it
509 """
510 return i.op1 + i.op2
511
512
513 class ExampleBufPipeAddClass(BufferedPipeline):
514 """ an example of how to use the buffered pipeline, using a class instance
515 """
516
517 def __init__(self):
518 addstage = ExampleAddClassStage()
519 BufferedPipeline.__init__(self, addstage)
520
521
522 class TestInputAdd:
523 """ the eq function, called by set_input, needs an incoming object
524 that conforms to the Example2OpClass.eq function requirements
525 easiest way to do that is to create a class that has the exact
526 same member layout (self.op1, self.op2) as Example2OpClass
527 """
528 def __init__(self, op1, op2):
529 self.op1 = op1
530 self.op2 = op2
531
532
533 def test8_resultfn(o_data, expected, i, o):
534 res = expected.op1 + expected.op2 # these are a TestInputAdd instance
535 assert o_data == res, \
536 "%d-%d data %x not match %s\n" \
537 % (i, o, o_data, repr(expected))
538
539 def data_2op():
540 data = []
541 for i in range(num_tests):
542 data.append(TestInputAdd(randint(0, 1<<16-1), randint(0, 1<<16-1)))
543 return data
544
545
546 num_tests = 100
547
548 if __name__ == '__main__':
549 print ("test 1")
550 dut = ExampleBufPipe()
551 run_simulation(dut, testbench(dut), vcd_name="test_bufpipe.vcd")
552
553 print ("test 2")
554 dut = ExampleBufPipe2()
555 run_simulation(dut, testbench2(dut), vcd_name="test_bufpipe2.vcd")
556
557 print ("test 3")
558 dut = ExampleBufPipe()
559 test = Test3(dut, test3_resultfn)
560 run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe3.vcd")
561
562 print ("test 3.5")
563 dut = ExamplePipeline()
564 test = Test3(dut, test3_resultfn)
565 run_simulation(dut, [test.send, test.rcv], vcd_name="test_combpipe3.vcd")
566
567 print ("test 4")
568 dut = ExampleBufPipe2()
569 run_simulation(dut, testbench4(dut), vcd_name="test_bufpipe4.vcd")
570
571 print ("test 5")
572 dut = ExampleBufPipeAdd()
573 test = Test5(dut, test5_resultfn)
574 run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe5.vcd")
575
576 print ("test 6")
577 dut = ExampleLTPipeline()
578 test = Test5(dut, test6_resultfn)
579 run_simulation(dut, [test.send, test.rcv], vcd_name="test_ltcomb6.vcd")
580
581 ports = [dut.p.i_valid, dut.n.i_ready,
582 dut.n.o_valid, dut.p.o_ready] + \
583 list(dut.p.i_data) + [dut.n.o_data]
584 vl = rtlil.convert(dut, ports=ports)
585 with open("test_ltcomb_pipe.il", "w") as f:
586 f.write(vl)
587
588 print ("test 7")
589 dut = ExampleAddRecordPipe()
590 data=data_dict()
591 test = Test5(dut, test7_resultfn, data=data)
592 run_simulation(dut, [test.send, test.rcv], vcd_name="test_addrecord.vcd")
593
594 ports = [dut.p.i_valid, dut.n.i_ready,
595 dut.n.o_valid, dut.p.o_ready,
596 dut.p.i_data.src1, dut.p.i_data.src2,
597 dut.n.o_data.src1, dut.n.o_data.src2]
598 vl = rtlil.convert(dut, ports=ports)
599 with open("test_recordcomb_pipe.il", "w") as f:
600 f.write(vl)
601
602 print ("test 8")
603 dut = ExampleBufPipeAddClass()
604 data=data_2op()
605 test = Test5(dut, test8_resultfn, data=data)
606 run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe8.vcd")
607
608 print ("test 9")
609 dut = ExampleBufPipeChain2()
610 ports = [dut.p.i_valid, dut.n.i_ready,
611 dut.n.o_valid, dut.p.o_ready] + \
612 [dut.p.i_data] + [dut.n.o_data]
613 vl = rtlil.convert(dut, ports=ports)
614 with open("test_bufpipechain2.il", "w") as f:
615 f.write(vl)
616
617 data = data_chain2()
618 test = Test5(dut, test9_resultfn, data=data)
619 run_simulation(dut, [test.send, test.rcv],
620 vcd_name="test_bufpipechain2.vcd")
621
622 print ("test 10")
623 dut = ExampleLTBufferedPipeDerived()
624 test = Test5(dut, test6_resultfn)
625 run_simulation(dut, [test.send, test.rcv], vcd_name="test_ltbufpipe10.vcd")
626 vl = rtlil.convert(dut, ports=ports)
627 with open("test_ltbufpipe10.il", "w") as f:
628 f.write(vl)
629
630 print ("test 11")
631 dut = ExampleAddRecordPlaceHolderPipe()
632 data=data_placeholder()
633 test = Test5(dut, test11_resultfn, data=data)
634 run_simulation(dut, [test.send, test.rcv], vcd_name="test_addrecord.vcd")
635
636