add __iter__ to several classes, add global shape() function use in FIFOControl
[ieee754fpu.git] / src / add / test_inout_mux_pipe.py
1 """ key strategic example showing how to do multi-input fan-in into a
2 multi-stage pipeline, then multi-output fanout.
3
4 the multiplex ID from the fan-in is passed in to the pipeline, preserved,
5 and used as a routing ID on the fanout.
6 """
7
8 from random import randint
9 from math import log
10 from nmigen import Module, Signal, Cat, Value
11 from nmigen.compat.sim import run_simulation
12 from nmigen.cli import verilog, rtlil
13
14 from multipipe import CombMultiOutPipeline, CombMuxOutPipe
15 from multipipe import PriorityCombMuxInPipe
16 from singlepipe import SimpleHandshake
17
18
19 class PassData: # (Value):
20 def __init__(self):
21 self.mid = Signal(2, reset_less=True)
22 self.idx = Signal(8, reset_less=True)
23 self.data = Signal(16, reset_less=True)
24
25 def __iter__(self):
26 yield self.mid
27 yield self.idx
28 yield self.data
29
30 def shape(self):
31 bits, sign = 0, False
32 for elem_bits, elem_sign in (elem.shape() for elem in self.ports()):
33 bits = max(bits, elem_bits + elem_sign)
34 sign = max(sign, elem_sign)
35 return bits, sign
36
37 def eq(self, i):
38 return [self.mid.eq(i.mid), self.idx.eq(i.idx), self.data.eq(i.data)]
39
40 def ports(self):
41 return list(self)
42
43
44 class PassThroughStage:
45 def ispec(self):
46 return PassData()
47 def ospec(self):
48 return self.ispec() # same as ospec
49
50 def process(self, i):
51 return i # pass-through
52
53
54
55 class PassThroughPipe(SimpleHandshake):
56 def __init__(self):
57 SimpleHandshake.__init__(self, PassThroughStage())
58
59
60 class InputTest:
61 def __init__(self, dut):
62 self.dut = dut
63 self.di = {}
64 self.do = {}
65 self.tlen = 100
66 for mid in range(dut.num_rows):
67 self.di[mid] = {}
68 self.do[mid] = {}
69 for i in range(self.tlen):
70 self.di[mid][i] = randint(0, 255) + (mid<<8)
71 self.do[mid][i] = self.di[mid][i]
72
73 def send(self, mid):
74 for i in range(self.tlen):
75 op2 = self.di[mid][i]
76 rs = dut.p[mid]
77 yield rs.i_valid.eq(1)
78 yield rs.i_data.data.eq(op2)
79 yield rs.i_data.idx.eq(i)
80 yield rs.i_data.mid.eq(mid)
81 yield
82 o_p_ready = yield rs.o_ready
83 while not o_p_ready:
84 yield
85 o_p_ready = yield rs.o_ready
86
87 print ("send", mid, i, hex(op2))
88
89 yield rs.i_valid.eq(0)
90 # wait random period of time before queueing another value
91 for i in range(randint(0, 3)):
92 yield
93
94 yield rs.i_valid.eq(0)
95 yield
96
97 print ("send ended", mid)
98
99 ## wait random period of time before queueing another value
100 #for i in range(randint(0, 3)):
101 # yield
102
103 #send_range = randint(0, 3)
104 #if send_range == 0:
105 # send = True
106 #else:
107 # send = randint(0, send_range) != 0
108
109 def rcv(self, mid):
110 while True:
111 #stall_range = randint(0, 3)
112 #for j in range(randint(1,10)):
113 # stall = randint(0, stall_range) != 0
114 # yield self.dut.n[0].i_ready.eq(stall)
115 # yield
116 n = self.dut.n[mid]
117 yield n.i_ready.eq(1)
118 yield
119 o_n_valid = yield n.o_valid
120 i_n_ready = yield n.i_ready
121 if not o_n_valid or not i_n_ready:
122 continue
123
124 out_mid = yield n.o_data.mid
125 out_i = yield n.o_data.idx
126 out_v = yield n.o_data.data
127
128 print ("recv", out_mid, out_i, hex(out_v))
129
130 # see if this output has occurred already, delete it if it has
131 assert mid == out_mid, "out_mid %d not correct %d" % (out_mid, mid)
132 assert out_i in self.do[mid], "out_i %d not in array %s" % \
133 (out_i, repr(self.do[mid]))
134 assert self.do[mid][out_i] == out_v # pass-through data
135 del self.do[mid][out_i]
136
137 # check if there's any more outputs
138 if len(self.do[mid]) == 0:
139 break
140 print ("recv ended", mid)
141
142
143 class TestPriorityMuxPipe(PriorityCombMuxInPipe):
144 def __init__(self, num_rows):
145 self.num_rows = num_rows
146 stage = PassThroughStage()
147 PriorityCombMuxInPipe.__init__(self, stage, p_len=self.num_rows)
148
149
150 class OutputTest:
151 def __init__(self, dut):
152 self.dut = dut
153 self.di = []
154 self.do = {}
155 self.tlen = 100
156 for i in range(self.tlen * dut.num_rows):
157 if i < dut.num_rows:
158 mid = i
159 else:
160 mid = randint(0, dut.num_rows-1)
161 data = randint(0, 255) + (mid<<8)
162
163 def send(self):
164 for i in range(self.tlen * dut.num_rows):
165 op2 = self.di[i][0]
166 mid = self.di[i][1]
167 rs = dut.p
168 yield rs.i_valid.eq(1)
169 yield rs.i_data.data.eq(op2)
170 yield rs.i_data.mid.eq(mid)
171 yield
172 o_p_ready = yield rs.o_ready
173 while not o_p_ready:
174 yield
175 o_p_ready = yield rs.o_ready
176
177 print ("send", mid, i, hex(op2))
178
179 yield rs.i_valid.eq(0)
180 # wait random period of time before queueing another value
181 for i in range(randint(0, 3)):
182 yield
183
184 yield rs.i_valid.eq(0)
185
186
187 class TestMuxOutPipe(CombMuxOutPipe):
188 def __init__(self, num_rows):
189 self.num_rows = num_rows
190 stage = PassThroughStage()
191 CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows)
192
193
194 class TestInOutPipe:
195 def __init__(self, num_rows=4):
196 self.num_rows = num_rows
197 self.inpipe = TestPriorityMuxPipe(num_rows) # fan-in (combinatorial)
198 self.pipe1 = PassThroughPipe() # stage 1 (clock-sync)
199 self.pipe2 = PassThroughPipe() # stage 2 (clock-sync)
200 self.outpipe = TestMuxOutPipe(num_rows) # fan-out (combinatorial)
201
202 self.p = self.inpipe.p # kinda annoying,
203 self.n = self.outpipe.n # use pipe in/out as this class in/out
204 self._ports = self.inpipe.ports() + self.outpipe.ports()
205
206 def elaborate(self, platform):
207 m = Module()
208 m.submodules.inpipe = self.inpipe
209 m.submodules.pipe1 = self.pipe1
210 m.submodules.pipe2 = self.pipe2
211 m.submodules.outpipe = self.outpipe
212
213 m.d.comb += self.inpipe.n.connect_to_next(self.pipe1.p)
214 m.d.comb += self.pipe1.connect_to_next(self.pipe2)
215 m.d.comb += self.pipe2.connect_to_next(self.outpipe)
216
217 return m
218
219 def ports(self):
220 return self._ports
221
222
223 if __name__ == '__main__':
224 dut = TestInOutPipe()
225 vl = rtlil.convert(dut, ports=dut.ports())
226 with open("test_inoutmux_pipe.il", "w") as f:
227 f.write(vl)
228 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
229
230 test = InputTest(dut)
231 run_simulation(dut, [test.rcv(1), test.rcv(0),
232 test.rcv(3), test.rcv(2),
233 test.send(0), test.send(1),
234 test.send(3), test.send(2),
235 ],
236 vcd_name="test_inoutmux_pipe.vcd")
237