debugging RecordObject __setattr__
[ieee754fpu.git] / src / add / test_inout_mux_pipe.py
1 """ key strategic example showing how to do multi-input fan-in into a
2 multi-stage pipeline, then multi-output fanout.
3
4 the multiplex ID from the fan-in is passed in to the pipeline, preserved,
5 and used as a routing ID on the fanout.
6 """
7
8 from random import randint
9 from math import log
10 from nmigen import Module, Signal, Cat, Value
11 from nmigen.compat.sim import run_simulation
12 from nmigen.cli import verilog, rtlil
13
14 from multipipe import CombMultiOutPipeline, CombMuxOutPipe
15 from multipipe import PriorityCombMuxInPipe
16 from singlepipe import SimpleHandshake, RecordObject
17
18
19 class PassData2(RecordObject):
20 def __init__(self):
21 RecordObject.__init__(self)
22 self.mid = Signal(2, reset_less=True)
23 self.idx = Signal(8, reset_less=True)
24 self.data = Signal(16, reset_less=True)
25
26
27 class PassData:
28 def __init__(self):
29 self.mid = Signal(2, reset_less=True)
30 self.idx = Signal(8, reset_less=True)
31 self.data = Signal(16, reset_less=True)
32
33 def __iter__(self):
34 yield self.mid
35 yield self.idx
36 yield self.data
37
38 def shape(self):
39 bits, sign = 0, False
40 for elem_bits, elem_sign in (elem.shape() for elem in self.ports()):
41 bits = max(bits, elem_bits + elem_sign)
42 sign = max(sign, elem_sign)
43 return bits, sign
44
45 def eq(self, i):
46 return [self.mid.eq(i.mid), self.idx.eq(i.idx), self.data.eq(i.data)]
47
48 def ports(self):
49 return list(self)
50
51
52 class PassThroughStage:
53 def ispec(self):
54 return PassData()
55 def ospec(self):
56 return self.ispec() # same as ospec
57
58 def process(self, i):
59 return i # pass-through
60
61
62
63 class PassThroughPipe(SimpleHandshake):
64 def __init__(self):
65 SimpleHandshake.__init__(self, PassThroughStage())
66
67
68 class InputTest:
69 def __init__(self, dut):
70 self.dut = dut
71 self.di = {}
72 self.do = {}
73 self.tlen = 100
74 for mid in range(dut.num_rows):
75 self.di[mid] = {}
76 self.do[mid] = {}
77 for i in range(self.tlen):
78 self.di[mid][i] = randint(0, 255) + (mid<<8)
79 self.do[mid][i] = self.di[mid][i]
80
81 def send(self, mid):
82 for i in range(self.tlen):
83 op2 = self.di[mid][i]
84 rs = dut.p[mid]
85 yield rs.i_valid.eq(1)
86 yield rs.i_data.data.eq(op2)
87 yield rs.i_data.idx.eq(i)
88 yield rs.i_data.mid.eq(mid)
89 yield
90 o_p_ready = yield rs.o_ready
91 while not o_p_ready:
92 yield
93 o_p_ready = yield rs.o_ready
94
95 print ("send", mid, i, hex(op2))
96
97 yield rs.i_valid.eq(0)
98 # wait random period of time before queueing another value
99 for i in range(randint(0, 3)):
100 yield
101
102 yield rs.i_valid.eq(0)
103 yield
104
105 print ("send ended", mid)
106
107 ## wait random period of time before queueing another value
108 #for i in range(randint(0, 3)):
109 # yield
110
111 #send_range = randint(0, 3)
112 #if send_range == 0:
113 # send = True
114 #else:
115 # send = randint(0, send_range) != 0
116
117 def rcv(self, mid):
118 while True:
119 #stall_range = randint(0, 3)
120 #for j in range(randint(1,10)):
121 # stall = randint(0, stall_range) != 0
122 # yield self.dut.n[0].i_ready.eq(stall)
123 # yield
124 n = self.dut.n[mid]
125 yield n.i_ready.eq(1)
126 yield
127 o_n_valid = yield n.o_valid
128 i_n_ready = yield n.i_ready
129 if not o_n_valid or not i_n_ready:
130 continue
131
132 out_mid = yield n.o_data.mid
133 out_i = yield n.o_data.idx
134 out_v = yield n.o_data.data
135
136 print ("recv", out_mid, out_i, hex(out_v))
137
138 # see if this output has occurred already, delete it if it has
139 assert mid == out_mid, "out_mid %d not correct %d" % (out_mid, mid)
140 assert out_i in self.do[mid], "out_i %d not in array %s" % \
141 (out_i, repr(self.do[mid]))
142 assert self.do[mid][out_i] == out_v # pass-through data
143 del self.do[mid][out_i]
144
145 # check if there's any more outputs
146 if len(self.do[mid]) == 0:
147 break
148 print ("recv ended", mid)
149
150
151 class TestPriorityMuxPipe(PriorityCombMuxInPipe):
152 def __init__(self, num_rows):
153 self.num_rows = num_rows
154 stage = PassThroughStage()
155 PriorityCombMuxInPipe.__init__(self, stage, p_len=self.num_rows)
156
157
158 class OutputTest:
159 def __init__(self, dut):
160 self.dut = dut
161 self.di = []
162 self.do = {}
163 self.tlen = 100
164 for i in range(self.tlen * dut.num_rows):
165 if i < dut.num_rows:
166 mid = i
167 else:
168 mid = randint(0, dut.num_rows-1)
169 data = randint(0, 255) + (mid<<8)
170
171 def send(self):
172 for i in range(self.tlen * dut.num_rows):
173 op2 = self.di[i][0]
174 mid = self.di[i][1]
175 rs = dut.p
176 yield rs.i_valid.eq(1)
177 yield rs.i_data.data.eq(op2)
178 yield rs.i_data.mid.eq(mid)
179 yield
180 o_p_ready = yield rs.o_ready
181 while not o_p_ready:
182 yield
183 o_p_ready = yield rs.o_ready
184
185 print ("send", mid, i, hex(op2))
186
187 yield rs.i_valid.eq(0)
188 # wait random period of time before queueing another value
189 for i in range(randint(0, 3)):
190 yield
191
192 yield rs.i_valid.eq(0)
193
194
195 class TestMuxOutPipe(CombMuxOutPipe):
196 def __init__(self, num_rows):
197 self.num_rows = num_rows
198 stage = PassThroughStage()
199 CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows)
200
201
202 class TestInOutPipe:
203 def __init__(self, num_rows=4):
204 self.num_rows = num_rows
205 self.inpipe = TestPriorityMuxPipe(num_rows) # fan-in (combinatorial)
206 self.pipe1 = PassThroughPipe() # stage 1 (clock-sync)
207 self.pipe2 = PassThroughPipe() # stage 2 (clock-sync)
208 self.outpipe = TestMuxOutPipe(num_rows) # fan-out (combinatorial)
209
210 self.p = self.inpipe.p # kinda annoying,
211 self.n = self.outpipe.n # use pipe in/out as this class in/out
212 self._ports = self.inpipe.ports() + self.outpipe.ports()
213
214 def elaborate(self, platform):
215 m = Module()
216 m.submodules.inpipe = self.inpipe
217 m.submodules.pipe1 = self.pipe1
218 m.submodules.pipe2 = self.pipe2
219 m.submodules.outpipe = self.outpipe
220
221 m.d.comb += self.inpipe.n.connect_to_next(self.pipe1.p)
222 m.d.comb += self.pipe1.connect_to_next(self.pipe2)
223 m.d.comb += self.pipe2.connect_to_next(self.outpipe)
224
225 return m
226
227 def ports(self):
228 return self._ports
229
230
231 if __name__ == '__main__':
232 dut = TestInOutPipe()
233 vl = rtlil.convert(dut, ports=dut.ports())
234 with open("test_inoutmux_pipe.il", "w") as f:
235 f.write(vl)
236 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
237
238 test = InputTest(dut)
239 run_simulation(dut, [test.rcv(1), test.rcv(0),
240 test.rcv(3), test.rcv(2),
241 test.send(0), test.send(1),
242 test.send(3), test.send(2),
243 ],
244 vcd_name="test_inoutmux_pipe.vcd")
245