add 2 extra stages to cancel test
[ieee754fpu.git] / src / nmutil / test / test_inout_unary_mux_cancel_pipe.py
1 """ key strategic example showing how to do multi-input fan-in into a
2 multi-stage pipeline, then multi-output fanout, with an unary muxid
3 and cancellation
4
5 the multiplex ID from the fan-in is passed in to the pipeline, preserved,
6 and used as a routing ID on the fanout.
7 """
8
9 from random import randint
10 from math import log
11 from nmigen import Module, Signal, Cat, Value, Elaboratable
12 from nmigen.compat.sim import run_simulation
13 from nmigen.cli import verilog, rtlil
14
15 from nmutil.multipipe import CombMultiOutPipeline, CombMuxOutPipe
16 from nmutil.multipipe import PriorityCombMuxInPipe
17 from nmutil.singlepipe import MaskCancellable, RecordObject, Object
18
19
20 class PassData2(RecordObject):
21 def __init__(self):
22 RecordObject.__init__(self)
23 self.muxid = Signal(2, reset_less=True)
24 self.idx = Signal(8, reset_less=True)
25 self.data = Signal(16, reset_less=True)
26
27
28 class PassData(Object):
29 def __init__(self):
30 Object.__init__(self)
31 self.muxid = Signal(2, reset_less=True)
32 self.idx = Signal(8, reset_less=True)
33 self.data = Signal(16, reset_less=True)
34
35
36
37 class PassThroughStage:
38 def ispec(self):
39 return PassData()
40 def ospec(self):
41 return self.ispec() # same as ospec
42
43 def process(self, i):
44 return i # pass-through
45
46
47
48 class PassThroughPipe(MaskCancellable):
49 def __init__(self, maskwid):
50 MaskCancellable.__init__(self, PassThroughStage(), maskwid)
51
52
53 class InputTest:
54 def __init__(self, dut):
55 self.dut = dut
56 self.di = {}
57 self.do = {}
58 self.tlen = 10
59 for muxid in range(dut.num_rows):
60 self.di[muxid] = {}
61 self.do[muxid] = {}
62 for i in range(self.tlen):
63 self.di[muxid][i] = randint(0, 255) + (muxid<<8)
64 self.do[muxid][i] = self.di[muxid][i]
65
66 def send(self, muxid):
67 for i in range(self.tlen):
68 op2 = self.di[muxid][i]
69 rs = self.dut.p[muxid]
70 yield rs.valid_i.eq(1)
71 yield rs.data_i.data.eq(op2)
72 yield rs.data_i.idx.eq(i)
73 yield rs.data_i.muxid.eq(muxid)
74 yield rs.mask_i.eq(1)
75 yield
76 o_p_ready = yield rs.ready_o
77 while not o_p_ready:
78 yield
79 o_p_ready = yield rs.ready_o
80
81 print ("send", muxid, i, hex(op2))
82
83 yield rs.valid_i.eq(0)
84 # wait random period of time before queueing another value
85 for i in range(randint(0, 3)):
86 yield
87
88 yield rs.valid_i.eq(0)
89 yield
90
91 print ("send ended", muxid)
92
93 ## wait random period of time before queueing another value
94 #for i in range(randint(0, 3)):
95 # yield
96
97 #send_range = randint(0, 3)
98 #if send_range == 0:
99 # send = True
100 #else:
101 # send = randint(0, send_range) != 0
102
103 def rcv(self, muxid):
104 while True:
105 #stall_range = randint(0, 3)
106 #for j in range(randint(1,10)):
107 # stall = randint(0, stall_range) != 0
108 # yield self.dut.n[0].ready_i.eq(stall)
109 # yield
110 n = self.dut.n[muxid]
111 yield n.ready_i.eq(1)
112 yield
113 o_n_valid = yield n.valid_o
114 i_n_ready = yield n.ready_i
115 if not o_n_valid or not i_n_ready:
116 continue
117
118 out_muxid = yield n.data_o.muxid
119 out_i = yield n.data_o.idx
120 out_v = yield n.data_o.data
121
122 print ("recv", out_muxid, out_i, hex(out_v))
123
124 # see if this output has occurred already, delete it if it has
125 assert muxid == out_muxid, \
126 "out_muxid %d not correct %d" % (out_muxid, muxid)
127 assert out_i in self.do[muxid], "out_i %d not in array %s" % \
128 (out_i, repr(self.do[muxid]))
129 assert self.do[muxid][out_i] == out_v # pass-through data
130 del self.do[muxid][out_i]
131
132 # check if there's any more outputs
133 if len(self.do[muxid]) == 0:
134 break
135 print ("recv ended", muxid)
136
137
138 class TestPriorityMuxPipe(PriorityCombMuxInPipe):
139 def __init__(self, num_rows):
140 self.num_rows = num_rows
141 stage = PassThroughStage()
142 PriorityCombMuxInPipe.__init__(self, stage,
143 p_len=self.num_rows, maskwid=1)
144
145
146 class OutputTest:
147 def __init__(self, dut):
148 self.dut = dut
149 self.di = []
150 self.do = {}
151 self.tlen = 10
152 for i in range(self.tlen * dut.num_rows):
153 if i < dut.num_rows:
154 muxid = i
155 else:
156 muxid = randint(0, dut.num_rows-1)
157 data = randint(0, 255) + (muxid<<8)
158
159 def send(self):
160 for i in range(self.tlen * dut.num_rows):
161 op2 = self.di[i][0]
162 muxid = self.di[i][1]
163 rs = dut.p
164 yield rs.valid_i.eq(1)
165 yield rs.data_i.data.eq(op2)
166 yield rs.data_i.muxid.eq(muxid)
167 yield
168 o_p_ready = yield rs.ready_o
169 while not o_p_ready:
170 yield
171 o_p_ready = yield rs.ready_o
172
173 print ("send", muxid, i, hex(op2))
174
175 yield rs.valid_i.eq(0)
176 # wait random period of time before queueing another value
177 for i in range(randint(0, 3)):
178 yield
179
180 yield rs.valid_i.eq(0)
181
182
183 class TestMuxOutPipe(CombMuxOutPipe):
184 def __init__(self, num_rows):
185 self.num_rows = num_rows
186 stage = PassThroughStage()
187 CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows,
188 maskwid=1)
189
190
191 class TestInOutPipe(Elaboratable):
192 def __init__(self, num_rows=4):
193 self.num_rows = nr = num_rows
194 self.inpipe = TestPriorityMuxPipe(nr) # fan-in (combinatorial)
195 self.pipe1 = PassThroughPipe(nr) # stage 1 (clock-sync)
196 self.pipe2 = PassThroughPipe(nr) # stage 2 (clock-sync)
197 self.pipe3 = PassThroughPipe(nr) # stage 3 (clock-sync)
198 self.pipe4 = PassThroughPipe(nr) # stage 4 (clock-sync)
199 self.outpipe = TestMuxOutPipe(nr) # fan-out (combinatorial)
200
201 self.p = self.inpipe.p # kinda annoying,
202 self.n = self.outpipe.n # use pipe in/out as this class in/out
203 self._ports = self.inpipe.ports() + self.outpipe.ports()
204
205 def elaborate(self, platform):
206 m = Module()
207 m.submodules.inpipe = self.inpipe
208 m.submodules.pipe1 = self.pipe1
209 m.submodules.pipe2 = self.pipe2
210 m.submodules.pipe3 = self.pipe3
211 m.submodules.pipe4 = self.pipe4
212 m.submodules.outpipe = self.outpipe
213
214 m.d.comb += self.inpipe.n.connect_to_next(self.pipe1.p)
215 m.d.comb += self.pipe1.connect_to_next(self.pipe2)
216 m.d.comb += self.pipe2.connect_to_next(self.pipe3)
217 m.d.comb += self.pipe3.connect_to_next(self.pipe4)
218 m.d.comb += self.pipe4.connect_to_next(self.outpipe)
219
220 return m
221
222 def ports(self):
223 return self._ports
224
225
226 def test1():
227 dut = TestInOutPipe()
228 vl = rtlil.convert(dut, ports=dut.ports())
229 with open("test_inoutmux_pipe.il", "w") as f:
230 f.write(vl)
231 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
232
233 test = InputTest(dut)
234 run_simulation(dut, [test.rcv(1), test.rcv(0),
235 test.rcv(3), test.rcv(2),
236 test.send(0), test.send(1),
237 test.send(3), test.send(2),
238 ],
239 vcd_name="test_inoutmux_pipe.vcd")
240
241 if __name__ == '__main__':
242 test1()