move flexible ports fn to MultiOutControlBase
[ieee754fpu.git] / src / add / test_inout_mux_pipe.py
1 """ key strategic example showing how to do multi-input fan-in into a
2 multi-stage pipeline, then multi-output fanout.
3
4 the multiplex ID from the fan-in is passed in to the pipeline, preserved,
5 and used as a routing ID on the fanout.
6 """
7
8 from random import randint
9 from math import log
10 from nmigen import Module, Signal, Cat, Value
11 from nmigen.compat.sim import run_simulation
12 from nmigen.cli import verilog, rtlil
13
14 from multipipe import CombMultiOutPipeline, CombMuxOutPipe
15 from multipipe import PriorityCombMuxInPipe
16 from singlepipe import UnbufferedPipeline
17
18
19 class PassData: # (Value):
20 def __init__(self):
21 self.mid = Signal(2, reset_less=True)
22 self.idx = Signal(8, reset_less=True)
23 self.data = Signal(16, reset_less=True)
24
25 def _rhs_signals(self):
26 return self.ports()
27
28 def shape(self):
29 bits, sign = 0, False
30 for elem_bits, elem_sign in (elem.shape() for elem in self.ports()):
31 bits = max(bits, elem_bits + elem_sign)
32 sign = max(sign, elem_sign)
33 return bits, sign
34
35 def eq(self, i):
36 return [self.mid.eq(i.mid), self.idx.eq(i.idx), self.data.eq(i.data)]
37
38 def ports(self):
39 return [self.mid, self.idx, self.data]
40
41
42 class PassThroughStage:
43 def ispec(self):
44 return PassData()
45 def ospec(self):
46 return self.ispec() # same as ospec
47
48 def process(self, i):
49 return i # pass-through
50
51
52
53 class PassThroughPipe(UnbufferedPipeline):
54 def __init__(self):
55 UnbufferedPipeline.__init__(self, PassThroughStage())
56
57
58 class InputTest:
59 def __init__(self, dut):
60 self.dut = dut
61 self.di = {}
62 self.do = {}
63 self.tlen = 100
64 for mid in range(dut.num_rows):
65 self.di[mid] = {}
66 self.do[mid] = {}
67 for i in range(self.tlen):
68 self.di[mid][i] = randint(0, 255) + (mid<<8)
69 self.do[mid][i] = self.di[mid][i]
70
71 def send(self, mid):
72 for i in range(self.tlen):
73 op2 = self.di[mid][i]
74 rs = dut.p[mid]
75 yield rs.i_valid.eq(1)
76 yield rs.i_data.data.eq(op2)
77 yield rs.i_data.idx.eq(i)
78 yield rs.i_data.mid.eq(mid)
79 yield
80 o_p_ready = yield rs.o_ready
81 while not o_p_ready:
82 yield
83 o_p_ready = yield rs.o_ready
84
85 print ("send", mid, i, hex(op2))
86
87 yield rs.i_valid.eq(0)
88 # wait random period of time before queueing another value
89 for i in range(randint(0, 3)):
90 yield
91
92 yield rs.i_valid.eq(0)
93 yield
94
95 print ("send ended", mid)
96
97 ## wait random period of time before queueing another value
98 #for i in range(randint(0, 3)):
99 # yield
100
101 #send_range = randint(0, 3)
102 #if send_range == 0:
103 # send = True
104 #else:
105 # send = randint(0, send_range) != 0
106
107 def rcv(self, mid):
108 while True:
109 #stall_range = randint(0, 3)
110 #for j in range(randint(1,10)):
111 # stall = randint(0, stall_range) != 0
112 # yield self.dut.n[0].i_ready.eq(stall)
113 # yield
114 n = self.dut.n[mid]
115 yield n.i_ready.eq(1)
116 yield
117 o_n_valid = yield n.o_valid
118 i_n_ready = yield n.i_ready
119 if not o_n_valid or not i_n_ready:
120 continue
121
122 out_mid = yield n.o_data.mid
123 out_i = yield n.o_data.idx
124 out_v = yield n.o_data.data
125
126 print ("recv", out_mid, out_i, hex(out_v))
127
128 # see if this output has occurred already, delete it if it has
129 assert mid == out_mid, "out_mid %d not correct %d" % (out_mid, mid)
130 assert out_i in self.do[mid], "out_i %d not in array %s" % \
131 (out_i, repr(self.do[mid]))
132 assert self.do[mid][out_i] == out_v # pass-through data
133 del self.do[mid][out_i]
134
135 # check if there's any more outputs
136 if len(self.do[mid]) == 0:
137 break
138 print ("recv ended", mid)
139
140
141 class TestPriorityMuxPipe(PriorityCombMuxInPipe):
142 def __init__(self, num_rows):
143 self.num_rows = num_rows
144 stage = PassThroughStage()
145 PriorityCombMuxInPipe.__init__(self, stage, p_len=self.num_rows)
146
147 def ports(self):
148 res = []
149 for i in range(len(self.p)):
150 res += [self.p[i].i_valid, self.p[i].o_ready] + \
151 self.p[i].i_data.ports()
152 res += [self.n.i_ready, self.n.o_valid] + \
153 self.n.o_data.ports()
154 return res
155
156
157
158 class OutputTest:
159 def __init__(self, dut):
160 self.dut = dut
161 self.di = []
162 self.do = {}
163 self.tlen = 100
164 for i in range(self.tlen * dut.num_rows):
165 if i < dut.num_rows:
166 mid = i
167 else:
168 mid = randint(0, dut.num_rows-1)
169 data = randint(0, 255) + (mid<<8)
170
171 def send(self):
172 for i in range(self.tlen * dut.num_rows):
173 op2 = self.di[i][0]
174 mid = self.di[i][1]
175 rs = dut.p
176 yield rs.i_valid.eq(1)
177 yield rs.i_data.data.eq(op2)
178 yield rs.i_data.mid.eq(mid)
179 yield
180 o_p_ready = yield rs.o_ready
181 while not o_p_ready:
182 yield
183 o_p_ready = yield rs.o_ready
184
185 print ("send", mid, i, hex(op2))
186
187 yield rs.i_valid.eq(0)
188 # wait random period of time before queueing another value
189 for i in range(randint(0, 3)):
190 yield
191
192 yield rs.i_valid.eq(0)
193
194
195 class TestMuxOutPipe(CombMuxOutPipe):
196 def __init__(self, num_rows):
197 self.num_rows = num_rows
198 stage = PassThroughStage()
199 CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows)
200
201
202 class TestInOutPipe:
203 def __init__(self, num_rows=4):
204 self.num_rows = num_rows
205 self.inpipe = TestPriorityMuxPipe(num_rows) # fan-in (combinatorial)
206 self.pipe1 = PassThroughPipe() # stage 1 (clock-sync)
207 self.pipe2 = PassThroughPipe() # stage 2 (clock-sync)
208 self.outpipe = TestMuxOutPipe(num_rows) # fan-out (combinatorial)
209
210 self.p = self.inpipe.p # kinda annoying,
211 self.n = self.outpipe.n # use pipe in/out as this class in/out
212 self._ports = self.inpipe.ports() + self.outpipe.ports()
213
214 def elaborate(self, platform):
215 m = Module()
216 m.submodules.inpipe = self.inpipe
217 m.submodules.pipe1 = self.pipe1
218 m.submodules.pipe2 = self.pipe2
219 m.submodules.outpipe = self.outpipe
220
221 m.d.comb += self.inpipe.n.connect_to_next(self.pipe1.p)
222 m.d.comb += self.pipe1.connect_to_next(self.pipe2)
223 m.d.comb += self.pipe2.connect_to_next(self.outpipe)
224
225 return m
226
227 def ports(self):
228 return self._ports
229
230
231 if __name__ == '__main__':
232 dut = TestInOutPipe()
233 vl = rtlil.convert(dut, ports=dut.ports())
234 with open("test_inoutmux_pipe.il", "w") as f:
235 f.write(vl)
236 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
237
238 test = InputTest(dut)
239 run_simulation(dut, [test.rcv(1), test.rcv(0),
240 test.rcv(3), test.rcv(2),
241 test.send(0), test.send(1),
242 test.send(3), test.send(2),
243 ],
244 vcd_name="test_inoutmux_pipe.vcd")
245