switch to exact version of cython
[ieee754fpu.git] / src / nmutil / test / test_inout_unary_mux_cancel_pipe.py
1 """ key strategic example showing how to do multi-input fan-in into a
2 multi-stage pipeline, then multi-output fanout, with an unary muxid
3 and cancellation
4
5 the multiplex ID from the fan-in is passed in to the pipeline, preserved,
6 and used as a routing ID on the fanout.
7 """
8
9 from random import randint
10 from math import log
11 from nmigen import Module, Signal, Cat, Value, Elaboratable
12 from nmigen.compat.sim import run_simulation
13 from nmigen.cli import verilog, rtlil
14
15 from nmutil.multipipe import CombMultiOutPipeline, CombMuxOutPipe
16 from nmutil.multipipe import PriorityCombMuxInPipe
17 from nmutil.singlepipe import MaskCancellable, RecordObject, Object
18
19
20 class PassData2(RecordObject):
21 def __init__(self):
22 RecordObject.__init__(self)
23 self.muxid = Signal(2, reset_less=True)
24 self.idx = Signal(8, reset_less=True)
25 self.data = Signal(16, reset_less=True)
26
27
28 class PassData(Object):
29 def __init__(self):
30 Object.__init__(self)
31 self.muxid = Signal(2, reset_less=True)
32 self.idx = Signal(8, reset_less=True)
33 self.data = Signal(16, reset_less=True)
34
35
36
37 class PassThroughStage:
38 def ispec(self):
39 return PassData()
40 def ospec(self):
41 return self.ispec() # same as ospec
42
43 def process(self, i):
44 return i # pass-through
45
46
47
48 class PassThroughPipe(MaskCancellable):
49 def __init__(self, maskwid):
50 MaskCancellable.__init__(self, PassThroughStage(), maskwid)
51
52
53 class InputTest:
54 def __init__(self, dut, tlen):
55 self.dut = dut
56 self.di = {}
57 self.do = {}
58 self.sent = {}
59 self.tlen = tlen
60 for muxid in range(dut.num_rows):
61 self.di[muxid] = {}
62 self.do[muxid] = {}
63 self.sent[muxid] = []
64 for i in range(self.tlen):
65 self.di[muxid][i] = randint(0, 255) + (muxid<<8)
66 self.do[muxid][i] = self.di[muxid][i]
67
68 def send(self, muxid):
69 for i in range(self.tlen):
70 op2 = self.di[muxid][i]
71 rs = self.dut.p[muxid]
72 yield rs.valid_i.eq(1)
73 yield rs.data_i.data.eq(op2)
74 yield rs.data_i.idx.eq(i)
75 yield rs.data_i.muxid.eq(muxid)
76 yield rs.mask_i.eq(1)
77 yield
78 o_p_ready = yield rs.ready_o
79 while not o_p_ready:
80 yield
81 o_p_ready = yield rs.ready_o
82
83 print ("send", muxid, i, hex(op2), op2)
84 self.sent[muxid].append(i)
85
86 yield rs.valid_i.eq(0)
87 # wait until it's received
88 while i in self.do[muxid]:
89 yield
90
91 # wait random period of time before queueing another value
92 for i in range(randint(0, 3)):
93 yield
94
95 yield rs.valid_i.eq(0)
96 yield
97
98 print ("send ended", muxid)
99
100 ## wait random period of time before queueing another value
101 #for i in range(randint(0, 3)):
102 # yield
103
104 #send_range = randint(0, 3)
105 #if send_range == 0:
106 # send = True
107 #else:
108 # send = randint(0, send_range) != 0
109
110 def rcv(self, muxid):
111 rs = self.dut.p[muxid]
112 while True:
113
114 # check cancellation
115 if self.sent[muxid] and randint(0, 2) == 0:
116 todel = self.sent[muxid].pop()
117 print ("to delete", muxid, self.sent[muxid], todel)
118 if todel in self.do[muxid]:
119 del self.do[muxid][todel]
120 yield rs.stop_i.eq(1)
121 print ("left", muxid, self.do[muxid])
122 if len(self.do[muxid]) == 0:
123 break
124
125 #stall_range = randint(0, 3)
126 #for j in range(randint(1,10)):
127 # stall = randint(0, stall_range) != 0
128 # yield self.dut.n[0].ready_i.eq(stall)
129 # yield
130 n = self.dut.n[muxid]
131 yield n.ready_i.eq(1)
132 yield
133 yield rs.stop_i.eq(0) # resets cancel mask
134 o_n_valid = yield n.valid_o
135 i_n_ready = yield n.ready_i
136 if not o_n_valid or not i_n_ready:
137 continue
138
139 out_muxid = yield n.data_o.muxid
140 out_i = yield n.data_o.idx
141 out_v = yield n.data_o.data
142
143 print ("recv", out_muxid, out_i, hex(out_v), out_v)
144
145 # see if this output has occurred already, delete it if it has
146 assert muxid == out_muxid, \
147 "out_muxid %d not correct %d" % (out_muxid, muxid)
148 if out_i not in self.sent[muxid]:
149 print ("cancelled/recv", muxid, out_i)
150 continue
151 assert out_i in self.do[muxid], "out_i %d not in array %s" % \
152 (out_i, repr(self.do[muxid]))
153 assert self.do[muxid][out_i] == out_v # pass-through data
154 del self.do[muxid][out_i]
155 todel = self.sent[muxid].index(out_i)
156 del self.sent[muxid][todel]
157
158 # check if there's any more outputs
159 if len(self.do[muxid]) == 0:
160 break
161
162 print ("recv ended", muxid)
163
164
165 class TestPriorityMuxPipe(PriorityCombMuxInPipe):
166 def __init__(self, num_rows):
167 self.num_rows = num_rows
168 stage = PassThroughStage()
169 PriorityCombMuxInPipe.__init__(self, stage,
170 p_len=self.num_rows, maskwid=1)
171
172
173 class TestMuxOutPipe(CombMuxOutPipe):
174 def __init__(self, num_rows):
175 self.num_rows = num_rows
176 stage = PassThroughStage()
177 CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows,
178 maskwid=1)
179
180
181 class TestInOutPipe(Elaboratable):
182 def __init__(self, num_rows=4):
183 self.num_rows = nr = num_rows
184 self.inpipe = TestPriorityMuxPipe(nr) # fan-in (combinatorial)
185 self.pipe1 = PassThroughPipe(nr) # stage 1 (clock-sync)
186 self.pipe2 = PassThroughPipe(nr) # stage 2 (clock-sync)
187 self.pipe3 = PassThroughPipe(nr) # stage 3 (clock-sync)
188 self.pipe4 = PassThroughPipe(nr) # stage 4 (clock-sync)
189 self.outpipe = TestMuxOutPipe(nr) # fan-out (combinatorial)
190
191 self.p = self.inpipe.p # kinda annoying,
192 self.n = self.outpipe.n # use pipe in/out as this class in/out
193 self._ports = self.inpipe.ports() + self.outpipe.ports()
194
195 def elaborate(self, platform):
196 m = Module()
197 m.submodules.inpipe = self.inpipe
198 m.submodules.pipe1 = self.pipe1
199 m.submodules.pipe2 = self.pipe2
200 m.submodules.pipe3 = self.pipe3
201 m.submodules.pipe4 = self.pipe4
202 m.submodules.outpipe = self.outpipe
203
204 m.d.comb += self.inpipe.n.connect_to_next(self.pipe1.p)
205 m.d.comb += self.pipe1.connect_to_next(self.pipe2)
206 m.d.comb += self.pipe2.connect_to_next(self.pipe3)
207 m.d.comb += self.pipe3.connect_to_next(self.pipe4)
208 m.d.comb += self.pipe4.connect_to_next(self.outpipe)
209
210 return m
211
212 def ports(self):
213 return self._ports
214
215
216 def test1():
217 dut = TestInOutPipe()
218 vl = rtlil.convert(dut, ports=dut.ports())
219 with open("test_inoutmux_unarycancel_pipe.il", "w") as f:
220 f.write(vl)
221
222 tlen = 20
223
224 test = InputTest(dut, tlen)
225 run_simulation(dut, [test.rcv(1), test.rcv(0),
226 test.rcv(3), test.rcv(2),
227 test.send(0), test.send(1),
228 test.send(3), test.send(2),
229 ],
230 vcd_name="test_inoutmux_unarycancel_pipe.vcd")
231
232 if __name__ == '__main__':
233 test1()