pass in flatten/processing function into _connect_in/out
[ieee754fpu.git] / src / add / test_inout_mux_pipe.py
1 """ key strategic example showing how to do multi-input fan-in into a
2 multi-stage pipeline, then multi-output fanout.
3
4 the multiplex ID from the fan-in is passed in to the pipeline, preserved,
5 and used as a routing ID on the fanout.
6 """
7
8 from random import randint
9 from math import log
10 from nmigen import Module, Signal, Cat, Value
11 from nmigen.compat.sim import run_simulation
12 from nmigen.cli import verilog, rtlil
13
14 from multipipe import CombMultiOutPipeline, CombMuxOutPipe
15 from multipipe import PriorityCombMuxInPipe
16 from singlepipe import SimpleHandshake
17
18
19 class PassData: # (Value):
20 def __init__(self):
21 self.mid = Signal(2, reset_less=True)
22 self.idx = Signal(8, reset_less=True)
23 self.data = Signal(16, reset_less=True)
24
25 def _rhs_signals(self):
26 return self.ports()
27
28 def shape(self):
29 bits, sign = 0, False
30 for elem_bits, elem_sign in (elem.shape() for elem in self.ports()):
31 bits = max(bits, elem_bits + elem_sign)
32 sign = max(sign, elem_sign)
33 return bits, sign
34
35 def eq(self, i):
36 return [self.mid.eq(i.mid), self.idx.eq(i.idx), self.data.eq(i.data)]
37
38 def ports(self):
39 return [self.mid, self.idx, self.data]
40
41
42 class PassThroughStage:
43 def ispec(self):
44 return PassData()
45 def ospec(self):
46 return self.ispec() # same as ospec
47
48 def process(self, i):
49 return i # pass-through
50
51
52
53 class PassThroughPipe(SimpleHandshake):
54 def __init__(self):
55 SimpleHandshake.__init__(self, PassThroughStage())
56
57
58 class InputTest:
59 def __init__(self, dut):
60 self.dut = dut
61 self.di = {}
62 self.do = {}
63 self.tlen = 100
64 for mid in range(dut.num_rows):
65 self.di[mid] = {}
66 self.do[mid] = {}
67 for i in range(self.tlen):
68 self.di[mid][i] = randint(0, 255) + (mid<<8)
69 self.do[mid][i] = self.di[mid][i]
70
71 def send(self, mid):
72 for i in range(self.tlen):
73 op2 = self.di[mid][i]
74 rs = dut.p[mid]
75 yield rs.i_valid.eq(1)
76 yield rs.i_data.data.eq(op2)
77 yield rs.i_data.idx.eq(i)
78 yield rs.i_data.mid.eq(mid)
79 yield
80 o_p_ready = yield rs.o_ready
81 while not o_p_ready:
82 yield
83 o_p_ready = yield rs.o_ready
84
85 print ("send", mid, i, hex(op2))
86
87 yield rs.i_valid.eq(0)
88 # wait random period of time before queueing another value
89 for i in range(randint(0, 3)):
90 yield
91
92 yield rs.i_valid.eq(0)
93 yield
94
95 print ("send ended", mid)
96
97 ## wait random period of time before queueing another value
98 #for i in range(randint(0, 3)):
99 # yield
100
101 #send_range = randint(0, 3)
102 #if send_range == 0:
103 # send = True
104 #else:
105 # send = randint(0, send_range) != 0
106
107 def rcv(self, mid):
108 while True:
109 #stall_range = randint(0, 3)
110 #for j in range(randint(1,10)):
111 # stall = randint(0, stall_range) != 0
112 # yield self.dut.n[0].i_ready.eq(stall)
113 # yield
114 n = self.dut.n[mid]
115 yield n.i_ready.eq(1)
116 yield
117 o_n_valid = yield n.o_valid
118 i_n_ready = yield n.i_ready
119 if not o_n_valid or not i_n_ready:
120 continue
121
122 out_mid = yield n.o_data.mid
123 out_i = yield n.o_data.idx
124 out_v = yield n.o_data.data
125
126 print ("recv", out_mid, out_i, hex(out_v))
127
128 # see if this output has occurred already, delete it if it has
129 assert mid == out_mid, "out_mid %d not correct %d" % (out_mid, mid)
130 assert out_i in self.do[mid], "out_i %d not in array %s" % \
131 (out_i, repr(self.do[mid]))
132 assert self.do[mid][out_i] == out_v # pass-through data
133 del self.do[mid][out_i]
134
135 # check if there's any more outputs
136 if len(self.do[mid]) == 0:
137 break
138 print ("recv ended", mid)
139
140
141 class TestPriorityMuxPipe(PriorityCombMuxInPipe):
142 def __init__(self, num_rows):
143 self.num_rows = num_rows
144 stage = PassThroughStage()
145 PriorityCombMuxInPipe.__init__(self, stage, p_len=self.num_rows)
146
147
148 class OutputTest:
149 def __init__(self, dut):
150 self.dut = dut
151 self.di = []
152 self.do = {}
153 self.tlen = 100
154 for i in range(self.tlen * dut.num_rows):
155 if i < dut.num_rows:
156 mid = i
157 else:
158 mid = randint(0, dut.num_rows-1)
159 data = randint(0, 255) + (mid<<8)
160
161 def send(self):
162 for i in range(self.tlen * dut.num_rows):
163 op2 = self.di[i][0]
164 mid = self.di[i][1]
165 rs = dut.p
166 yield rs.i_valid.eq(1)
167 yield rs.i_data.data.eq(op2)
168 yield rs.i_data.mid.eq(mid)
169 yield
170 o_p_ready = yield rs.o_ready
171 while not o_p_ready:
172 yield
173 o_p_ready = yield rs.o_ready
174
175 print ("send", mid, i, hex(op2))
176
177 yield rs.i_valid.eq(0)
178 # wait random period of time before queueing another value
179 for i in range(randint(0, 3)):
180 yield
181
182 yield rs.i_valid.eq(0)
183
184
185 class TestMuxOutPipe(CombMuxOutPipe):
186 def __init__(self, num_rows):
187 self.num_rows = num_rows
188 stage = PassThroughStage()
189 CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows)
190
191
192 class TestInOutPipe:
193 def __init__(self, num_rows=4):
194 self.num_rows = num_rows
195 self.inpipe = TestPriorityMuxPipe(num_rows) # fan-in (combinatorial)
196 self.pipe1 = PassThroughPipe() # stage 1 (clock-sync)
197 self.pipe2 = PassThroughPipe() # stage 2 (clock-sync)
198 self.outpipe = TestMuxOutPipe(num_rows) # fan-out (combinatorial)
199
200 self.p = self.inpipe.p # kinda annoying,
201 self.n = self.outpipe.n # use pipe in/out as this class in/out
202 self._ports = self.inpipe.ports() + self.outpipe.ports()
203
204 def elaborate(self, platform):
205 m = Module()
206 m.submodules.inpipe = self.inpipe
207 m.submodules.pipe1 = self.pipe1
208 m.submodules.pipe2 = self.pipe2
209 m.submodules.outpipe = self.outpipe
210
211 m.d.comb += self.inpipe.n.connect_to_next(self.pipe1.p)
212 m.d.comb += self.pipe1.connect_to_next(self.pipe2)
213 m.d.comb += self.pipe2.connect_to_next(self.outpipe)
214
215 return m
216
217 def ports(self):
218 return self._ports
219
220
221 if __name__ == '__main__':
222 dut = TestInOutPipe()
223 vl = rtlil.convert(dut, ports=dut.ports())
224 with open("test_inoutmux_pipe.il", "w") as f:
225 f.write(vl)
226 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
227
228 test = InputTest(dut)
229 run_simulation(dut, [test.rcv(1), test.rcv(0),
230 test.rcv(3), test.rcv(2),
231 test.send(0), test.send(1),
232 test.send(3), test.send(2),
233 ],
234 vcd_name="test_inoutmux_pipe.vcd")
235