add __iter__ to several classes, add global shape() function use in FIFOControl
[ieee754fpu.git] / src / add / test_outmux_pipe.py
1 from random import randint
2 from math import log
3 from nmigen import Module, Signal, Cat
4 from nmigen.compat.sim import run_simulation
5 from nmigen.cli import verilog, rtlil
6
7 from multipipe import CombMuxOutPipe
8 from singlepipe import SimpleHandshake, PassThroughHandshake
9
10
11 class PassInData:
12 def __init__(self):
13 self.mid = Signal(2, reset_less=True)
14 self.data = Signal(16, reset_less=True)
15
16 def __iter__(self):
17 yield self.mid
18 yield self.data
19
20 def eq(self, i):
21 return [self.mid.eq(i.mid), self.data.eq(i.data)]
22
23 def ports(self):
24 return [self.mid, self.data]
25
26
27 class PassThroughStage:
28
29 def ispec(self):
30 return PassInData()
31
32 def ospec(self):
33 return Signal(16, name="data_out", reset_less=True)
34
35 def process(self, i):
36 return i.data
37
38
39 class PassThroughDataStage:
40 def ispec(self):
41 return PassInData()
42 def ospec(self):
43 return self.ispec() # same as ospec
44
45 def process(self, i):
46 return i # pass-through
47
48
49
50 class PassThroughPipe(PassThroughHandshake):
51 def __init__(self):
52 PassThroughHandshake.__init__(self, PassThroughDataStage())
53
54
55
56
57 def testbench(dut):
58 stb = yield dut.out_op.stb
59 assert stb == 0
60 ack = yield dut.out_op.ack
61 assert ack == 0
62
63 # set row 1 input 0
64 yield dut.rs[1].in_op[0].eq(5)
65 yield dut.rs[1].stb.eq(0b01) # strobe indicate 1st op ready
66 #yield dut.rs[1].ack.eq(1)
67 yield
68
69 # check row 1 output (should be inactive)
70 decode = yield dut.rs[1].out_decode
71 assert decode == 0
72 if False:
73 op0 = yield dut.rs[1].out_op[0]
74 op1 = yield dut.rs[1].out_op[1]
75 assert op0 == 0 and op1 == 0
76
77 # output should be inactive
78 out_stb = yield dut.out_op.stb
79 assert out_stb == 1
80
81 # set row 0 input 1
82 yield dut.rs[1].in_op[1].eq(6)
83 yield dut.rs[1].stb.eq(0b11) # strobe indicate both ops ready
84
85 # set acknowledgement of output... takes 1 cycle to respond
86 yield dut.out_op.ack.eq(1)
87 yield
88 yield dut.out_op.ack.eq(0) # clear ack on output
89 yield dut.rs[1].stb.eq(0) # clear row 1 strobe
90
91 # output strobe should be active, MID should be 0 until "ack" is set...
92 out_stb = yield dut.out_op.stb
93 assert out_stb == 1
94 out_mid = yield dut.mid
95 assert out_mid == 0
96
97 # ... and output should not yet be passed through either
98 op0 = yield dut.out_op.v[0]
99 op1 = yield dut.out_op.v[1]
100 assert op0 == 0 and op1 == 0
101
102 # wait for out_op.ack to activate...
103 yield dut.rs[1].stb.eq(0b00) # set row 1 strobes to zero
104 yield
105
106 # *now* output should be passed through
107 op0 = yield dut.out_op.v[0]
108 op1 = yield dut.out_op.v[1]
109 assert op0 == 5 and op1 == 6
110
111 # set row 2 input
112 yield dut.rs[2].in_op[0].eq(3)
113 yield dut.rs[2].in_op[1].eq(4)
114 yield dut.rs[2].stb.eq(0b11) # strobe indicate 1st op ready
115 yield dut.out_op.ack.eq(1) # set output ack
116 yield
117 yield dut.rs[2].stb.eq(0) # clear row 2 strobe
118 yield dut.out_op.ack.eq(0) # set output ack
119 yield
120 op0 = yield dut.out_op.v[0]
121 op1 = yield dut.out_op.v[1]
122 assert op0 == 3 and op1 == 4, "op0 %d op1 %d" % (op0, op1)
123 out_mid = yield dut.mid
124 assert out_mid == 2
125
126 # set row 0 and 3 input
127 yield dut.rs[0].in_op[0].eq(9)
128 yield dut.rs[0].in_op[1].eq(8)
129 yield dut.rs[0].stb.eq(0b11) # strobe indicate 1st op ready
130 yield dut.rs[3].in_op[0].eq(1)
131 yield dut.rs[3].in_op[1].eq(2)
132 yield dut.rs[3].stb.eq(0b11) # strobe indicate 1st op ready
133
134 # set acknowledgement of output... takes 1 cycle to respond
135 yield dut.out_op.ack.eq(1)
136 yield
137 yield dut.rs[0].stb.eq(0) # clear row 1 strobe
138 yield
139 out_mid = yield dut.mid
140 assert out_mid == 0, "out mid %d" % out_mid
141
142 yield
143 yield dut.rs[3].stb.eq(0) # clear row 1 strobe
144 yield dut.out_op.ack.eq(0) # clear ack on output
145 yield
146 out_mid = yield dut.mid
147 assert out_mid == 3, "out mid %d" % out_mid
148
149
150 class OutputTest:
151 def __init__(self, dut):
152 self.dut = dut
153 self.di = []
154 self.do = {}
155 self.tlen = 10
156 for i in range(self.tlen * dut.num_rows):
157 if i < dut.num_rows:
158 mid = i
159 else:
160 mid = randint(0, dut.num_rows-1)
161 data = randint(0, 255) + (mid<<8)
162 if mid not in self.do:
163 self.do[mid] = []
164 self.di.append((data, mid))
165 self.do[mid].append(data)
166
167 def send(self):
168 for i in range(self.tlen * dut.num_rows):
169 op2 = self.di[i][0]
170 mid = self.di[i][1]
171 rs = dut.p
172 yield rs.i_valid.eq(1)
173 yield rs.i_data.data.eq(op2)
174 yield rs.i_data.mid.eq(mid)
175 yield
176 o_p_ready = yield rs.o_ready
177 while not o_p_ready:
178 yield
179 o_p_ready = yield rs.o_ready
180
181 print ("send", mid, i, hex(op2))
182
183 yield rs.i_valid.eq(0)
184 # wait random period of time before queueing another value
185 for i in range(randint(0, 3)):
186 yield
187
188 yield rs.i_valid.eq(0)
189
190 def rcv(self, mid):
191 out_i = 0
192 count = 0
193 stall_range = randint(0, 3)
194 while out_i != len(self.do[mid]):
195 count += 1
196 assert count != 2000, "timeout: too long"
197 n = self.dut.n[mid]
198 yield n.i_ready.eq(1)
199 yield
200 o_n_valid = yield n.o_valid
201 i_n_ready = yield n.i_ready
202 if not o_n_valid or not i_n_ready:
203 continue
204
205 out_v = yield n.o_data
206
207 print ("recv", mid, out_i, hex(out_v))
208
209 assert self.do[mid][out_i] == out_v # pass-through data
210
211 out_i += 1
212
213 if randint(0, 5) == 0:
214 stall_range = randint(0, 3)
215 stall = randint(0, stall_range) != 0
216 if stall:
217 yield n.i_ready.eq(0)
218 for i in range(stall_range):
219 yield
220
221
222 class TestPriorityMuxPipe(CombMuxOutPipe):
223 def __init__(self, num_rows):
224 self.num_rows = num_rows
225 stage = PassThroughStage()
226 CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows)
227
228
229 class TestSyncToPriorityPipe:
230 def __init__(self):
231 self.num_rows = 4
232 self.pipe = PassThroughPipe()
233 self.muxpipe = TestPriorityMuxPipe(self.num_rows)
234
235 self.p = self.pipe.p
236 self.n = self.muxpipe.n
237
238 def elaborate(self, platform):
239 m = Module()
240 m.submodules.pipe = self.pipe
241 m.submodules.muxpipe = self.muxpipe
242 m.d.comb += self.pipe.n.connect_to_next(self.muxpipe.p)
243 return m
244
245 def ports(self):
246 res = [self.p.i_valid, self.p.o_ready] + \
247 self.p.i_data.ports()
248 for i in range(len(self.n)):
249 res += [self.n[i].i_ready, self.n[i].o_valid] + \
250 [self.n[i].o_data]
251 #self.n[i].o_data.ports()
252 return res
253
254
255 if __name__ == '__main__':
256 dut = TestSyncToPriorityPipe()
257 vl = rtlil.convert(dut, ports=dut.ports())
258 with open("test_outmux_pipe.il", "w") as f:
259 f.write(vl)
260 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
261
262 test = OutputTest(dut)
263 run_simulation(dut, [test.rcv(1), test.rcv(0),
264 test.rcv(3), test.rcv(2),
265 test.send()],
266 vcd_name="test_outmux_pipe.vcd")
267