format code
[nmutil.git] / src / nmutil / test / test_inout_unary_mux_cancel_pipe.py
1 """ key strategic example showing how to do multi-input fan-in into a
2 multi-stage pipeline, then multi-output fanout, with an unary muxid
3 and cancellation
4
5 the multiplex ID from the fan-in is passed in to the pipeline, preserved,
6 and used as a routing ID on the fanout.
7 """
8
9 from random import randint
10 from math import log
11 from nmigen import Module, Signal, Cat, Value, Elaboratable
12 from nmigen.compat.sim import run_simulation
13 from nmigen.cli import verilog, rtlil
14
15 from nmutil.multipipe import CombMultiOutPipeline, CombMuxOutPipe
16 from nmutil.multipipe import PriorityCombMuxInPipe
17 from nmutil.singlepipe import MaskCancellable, RecordObject, Object
18
19
20 class PassData2(RecordObject):
21 def __init__(self):
22 RecordObject.__init__(self)
23 self.muxid = Signal(2, reset_less=True)
24 self.idx = Signal(8, reset_less=True)
25 self.data = Signal(16, reset_less=True)
26
27
28 class PassData(Object):
29 def __init__(self):
30 Object.__init__(self)
31 self.muxid = Signal(2, reset_less=True)
32 self.idx = Signal(8, reset_less=True)
33 self.data = Signal(16, reset_less=True)
34
35
36 class PassThroughStage:
37 def ispec(self):
38 return PassData()
39
40 def ospec(self):
41 return self.ispec() # same as ospec
42
43 def process(self, i):
44 return i # pass-through
45
46
47 class PassThroughPipe(MaskCancellable):
48 def __init__(self, maskwid):
49 MaskCancellable.__init__(self, PassThroughStage(), maskwid)
50
51
52 class InputTest:
53 def __init__(self, dut, tlen):
54 self.dut = dut
55 self.di = {}
56 self.do = {}
57 self.sent = {}
58 self.tlen = tlen
59 for muxid in range(dut.num_rows):
60 self.di[muxid] = {}
61 self.do[muxid] = {}
62 self.sent[muxid] = []
63 for i in range(self.tlen):
64 self.di[muxid][i] = randint(0, 255) + (muxid << 8)
65 self.do[muxid][i] = self.di[muxid][i]
66
67 def send(self, muxid):
68 for i in range(self.tlen):
69 op2 = self.di[muxid][i]
70 rs = self.dut.p[muxid]
71 yield rs.i_valid.eq(1)
72 yield rs.i_data.data.eq(op2)
73 yield rs.i_data.idx.eq(i)
74 yield rs.i_data.muxid.eq(muxid)
75 yield rs.mask_i.eq(1)
76 yield
77 o_p_ready = yield rs.o_ready
78 while not o_p_ready:
79 yield
80 o_p_ready = yield rs.o_ready
81
82 print("send", muxid, i, hex(op2), op2)
83 self.sent[muxid].append(i)
84
85 yield rs.i_valid.eq(0)
86 yield rs.mask_i.eq(0)
87 # wait until it's received
88 while i in self.do[muxid]:
89 yield
90
91 # wait random period of time before queueing another value
92 for i in range(randint(0, 3)):
93 yield
94
95 yield rs.i_valid.eq(0)
96 yield
97
98 print("send ended", muxid)
99
100 # wait random period of time before queueing another value
101 # for i in range(randint(0, 3)):
102 # yield
103
104 #send_range = randint(0, 3)
105 # if send_range == 0:
106 # send = True
107 # else:
108 # send = randint(0, send_range) != 0
109
110 def rcv(self, muxid):
111 rs = self.dut.p[muxid]
112 while True:
113
114 # check cancellation
115 if self.sent[muxid] and randint(0, 2) == 0:
116 todel = self.sent[muxid].pop()
117 print("to delete", muxid, self.sent[muxid], todel)
118 if todel in self.do[muxid]:
119 del self.do[muxid][todel]
120 yield rs.stop_i.eq(1)
121 print("left", muxid, self.do[muxid])
122 if len(self.do[muxid]) == 0:
123 break
124
125 stall_range = randint(0, 3)
126 for j in range(randint(1, 10)):
127 stall = randint(0, stall_range) != 0
128 yield self.dut.n[0].i_ready.eq(stall)
129 yield
130
131 n = self.dut.n[muxid]
132 yield n.i_ready.eq(1)
133 yield
134 yield rs.stop_i.eq(0) # resets cancel mask
135 o_n_valid = yield n.o_valid
136 i_n_ready = yield n.i_ready
137 if not o_n_valid or not i_n_ready:
138 continue
139
140 out_muxid = yield n.o_data.muxid
141 out_i = yield n.o_data.idx
142 out_v = yield n.o_data.data
143
144 print("recv", out_muxid, out_i, hex(out_v), out_v)
145
146 # see if this output has occurred already, delete it if it has
147 assert muxid == out_muxid, \
148 "out_muxid %d not correct %d" % (out_muxid, muxid)
149 if out_i not in self.sent[muxid]:
150 print("cancelled/recv", muxid, out_i)
151 continue
152 assert out_i in self.do[muxid], "out_i %d not in array %s" % \
153 (out_i, repr(self.do[muxid]))
154 assert self.do[muxid][out_i] == out_v # pass-through data
155 del self.do[muxid][out_i]
156 todel = self.sent[muxid].index(out_i)
157 del self.sent[muxid][todel]
158
159 # check if there's any more outputs
160 if len(self.do[muxid]) == 0:
161 break
162
163 print("recv ended", muxid)
164
165
166 class TestPriorityMuxPipe(PriorityCombMuxInPipe):
167 def __init__(self, num_rows):
168 self.num_rows = num_rows
169 stage = PassThroughStage()
170 PriorityCombMuxInPipe.__init__(self, stage,
171 p_len=self.num_rows, maskwid=1)
172
173
174 class TestMuxOutPipe(CombMuxOutPipe):
175 def __init__(self, num_rows):
176 self.num_rows = num_rows
177 stage = PassThroughStage()
178 CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows,
179 maskwid=1)
180
181
182 class TestInOutPipe(Elaboratable):
183 def __init__(self, num_rows=4):
184 self.num_rows = nr = num_rows
185 self.inpipe = TestPriorityMuxPipe(nr) # fan-in (combinatorial)
186 self.pipe1 = PassThroughPipe(nr) # stage 1 (clock-sync)
187 self.pipe2 = PassThroughPipe(nr) # stage 2 (clock-sync)
188 self.pipe3 = PassThroughPipe(nr) # stage 3 (clock-sync)
189 self.pipe4 = PassThroughPipe(nr) # stage 4 (clock-sync)
190 self.outpipe = TestMuxOutPipe(nr) # fan-out (combinatorial)
191
192 self.p = self.inpipe.p # kinda annoying,
193 self.n = self.outpipe.n # use pipe in/out as this class in/out
194 self._ports = self.inpipe.ports() + self.outpipe.ports()
195
196 def elaborate(self, platform):
197 m = Module()
198 m.submodules.inpipe = self.inpipe
199 m.submodules.pipe1 = self.pipe1
200 m.submodules.pipe2 = self.pipe2
201 m.submodules.pipe3 = self.pipe3
202 m.submodules.pipe4 = self.pipe4
203 m.submodules.outpipe = self.outpipe
204
205 m.d.comb += self.inpipe.n.connect_to_next(self.pipe1.p)
206 m.d.comb += self.pipe1.connect_to_next(self.pipe2)
207 m.d.comb += self.pipe2.connect_to_next(self.pipe3)
208 m.d.comb += self.pipe3.connect_to_next(self.pipe4)
209 m.d.comb += self.pipe4.connect_to_next(self.outpipe)
210
211 return m
212
213 def ports(self):
214 return self._ports
215
216
217 def test1():
218 dut = TestInOutPipe()
219 vl = rtlil.convert(dut, ports=dut.ports())
220 with open("test_inoutmux_unarycancel_pipe.il", "w") as f:
221 f.write(vl)
222
223 tlen = 20
224
225 test = InputTest(dut, tlen)
226 run_simulation(dut, [test.rcv(1), test.rcv(0),
227 test.rcv(3), test.rcv(2),
228 test.send(0), test.send(1),
229 test.send(3), test.send(2),
230 ],
231 vcd_name="test_inoutmux_unarycancel_pipe.vcd")
232
233
234 if __name__ == '__main__':
235 test1()