add sync pipe to outmux test
[ieee754fpu.git] / src / add / test_outmux_pipe.py
1 from random import randint
2 from math import log
3 from nmigen import Module, Signal, Cat
4 from nmigen.compat.sim import run_simulation
5 from nmigen.cli import verilog, rtlil
6
7 from multipipe import CombMultiOutPipeline
8 from singlepipe import UnbufferedPipeline
9
10
11 class MuxUnbufferedPipeline(CombMultiOutPipeline):
12 def __init__(self, stage, n_len):
13 # HACK: stage is also the n-way multiplexer
14 CombMultiOutPipeline.__init__(self, stage, n_len=n_len, n_mux=stage)
15
16 # HACK: n-mux is also the stage... so set the muxid equal to input mid
17 stage.m_id = self.p.i_data.mid
18
19 def ports(self):
20 return self.p_mux.ports()
21
22
23 class PassInData:
24 def __init__(self):
25 self.mid = Signal(2, reset_less=True)
26 self.data = Signal(16, reset_less=True)
27
28 def eq(self, i):
29 return [self.mid.eq(i.mid), self.data.eq(i.data)]
30
31 def ports(self):
32 return [self.mid, self.data]
33
34
35 class PassThroughStage:
36
37 def ispec(self):
38 return PassInData()
39
40 def ospec(self):
41 return Signal(16, name="data_out", reset_less=True)
42
43 def process(self, i):
44 return i.data
45
46
47 class PassThroughDataStage:
48 def ispec(self):
49 return PassInData()
50 def ospec(self):
51 return self.ispec() # same as ospec
52
53 def process(self, i):
54 return i # pass-through
55
56
57
58 class PassThroughPipe(UnbufferedPipeline):
59 def __init__(self):
60 UnbufferedPipeline.__init__(self, PassThroughDataStage())
61
62
63
64
65 def testbench(dut):
66 stb = yield dut.out_op.stb
67 assert stb == 0
68 ack = yield dut.out_op.ack
69 assert ack == 0
70
71 # set row 1 input 0
72 yield dut.rs[1].in_op[0].eq(5)
73 yield dut.rs[1].stb.eq(0b01) # strobe indicate 1st op ready
74 #yield dut.rs[1].ack.eq(1)
75 yield
76
77 # check row 1 output (should be inactive)
78 decode = yield dut.rs[1].out_decode
79 assert decode == 0
80 if False:
81 op0 = yield dut.rs[1].out_op[0]
82 op1 = yield dut.rs[1].out_op[1]
83 assert op0 == 0 and op1 == 0
84
85 # output should be inactive
86 out_stb = yield dut.out_op.stb
87 assert out_stb == 1
88
89 # set row 0 input 1
90 yield dut.rs[1].in_op[1].eq(6)
91 yield dut.rs[1].stb.eq(0b11) # strobe indicate both ops ready
92
93 # set acknowledgement of output... takes 1 cycle to respond
94 yield dut.out_op.ack.eq(1)
95 yield
96 yield dut.out_op.ack.eq(0) # clear ack on output
97 yield dut.rs[1].stb.eq(0) # clear row 1 strobe
98
99 # output strobe should be active, MID should be 0 until "ack" is set...
100 out_stb = yield dut.out_op.stb
101 assert out_stb == 1
102 out_mid = yield dut.mid
103 assert out_mid == 0
104
105 # ... and output should not yet be passed through either
106 op0 = yield dut.out_op.v[0]
107 op1 = yield dut.out_op.v[1]
108 assert op0 == 0 and op1 == 0
109
110 # wait for out_op.ack to activate...
111 yield dut.rs[1].stb.eq(0b00) # set row 1 strobes to zero
112 yield
113
114 # *now* output should be passed through
115 op0 = yield dut.out_op.v[0]
116 op1 = yield dut.out_op.v[1]
117 assert op0 == 5 and op1 == 6
118
119 # set row 2 input
120 yield dut.rs[2].in_op[0].eq(3)
121 yield dut.rs[2].in_op[1].eq(4)
122 yield dut.rs[2].stb.eq(0b11) # strobe indicate 1st op ready
123 yield dut.out_op.ack.eq(1) # set output ack
124 yield
125 yield dut.rs[2].stb.eq(0) # clear row 2 strobe
126 yield dut.out_op.ack.eq(0) # set output ack
127 yield
128 op0 = yield dut.out_op.v[0]
129 op1 = yield dut.out_op.v[1]
130 assert op0 == 3 and op1 == 4, "op0 %d op1 %d" % (op0, op1)
131 out_mid = yield dut.mid
132 assert out_mid == 2
133
134 # set row 0 and 3 input
135 yield dut.rs[0].in_op[0].eq(9)
136 yield dut.rs[0].in_op[1].eq(8)
137 yield dut.rs[0].stb.eq(0b11) # strobe indicate 1st op ready
138 yield dut.rs[3].in_op[0].eq(1)
139 yield dut.rs[3].in_op[1].eq(2)
140 yield dut.rs[3].stb.eq(0b11) # strobe indicate 1st op ready
141
142 # set acknowledgement of output... takes 1 cycle to respond
143 yield dut.out_op.ack.eq(1)
144 yield
145 yield dut.rs[0].stb.eq(0) # clear row 1 strobe
146 yield
147 out_mid = yield dut.mid
148 assert out_mid == 0, "out mid %d" % out_mid
149
150 yield
151 yield dut.rs[3].stb.eq(0) # clear row 1 strobe
152 yield dut.out_op.ack.eq(0) # clear ack on output
153 yield
154 out_mid = yield dut.mid
155 assert out_mid == 3, "out mid %d" % out_mid
156
157
158 class OutputTest:
159 def __init__(self, dut):
160 self.dut = dut
161 self.di = []
162 self.do = {}
163 self.tlen = 10
164 for i in range(self.tlen * dut.num_rows):
165 if i < dut.num_rows:
166 mid = i
167 else:
168 mid = randint(0, dut.num_rows-1)
169 data = randint(0, 255) + (mid<<8)
170 if mid not in self.do:
171 self.do[mid] = []
172 self.di.append((data, mid))
173 self.do[mid].append(data)
174
175 def send(self):
176 for i in range(self.tlen * dut.num_rows):
177 op2 = self.di[i][0]
178 mid = self.di[i][1]
179 rs = dut.p
180 yield rs.i_valid.eq(1)
181 yield rs.i_data.data.eq(op2)
182 yield rs.i_data.mid.eq(mid)
183 yield
184 o_p_ready = yield rs.o_ready
185 while not o_p_ready:
186 yield
187 o_p_ready = yield rs.o_ready
188
189 print ("send", mid, i, hex(op2))
190
191 yield rs.i_valid.eq(0)
192 # wait random period of time before queueing another value
193 for i in range(randint(0, 3)):
194 yield
195
196 yield rs.i_valid.eq(0)
197
198 def rcv(self, mid):
199 out_i = 0
200 count = 0
201 stall_range = randint(0, 3)
202 while out_i != len(self.do[mid]):
203 count += 1
204 assert count != 2000, "timeout: too long"
205 n = self.dut.n[mid]
206 yield n.i_ready.eq(1)
207 yield
208 o_n_valid = yield n.o_valid
209 i_n_ready = yield n.i_ready
210 if not o_n_valid or not i_n_ready:
211 continue
212
213 out_v = yield n.o_data
214
215 print ("recv", mid, out_i, hex(out_v))
216
217 assert self.do[mid][out_i] == out_v # pass-through data
218
219 out_i += 1
220
221 if randint(0, 5) == 0:
222 stall_range = randint(0, 3)
223 stall = randint(0, stall_range) != 0
224 if stall:
225 yield n.i_ready.eq(0)
226 for i in range(stall_range):
227 yield
228
229
230 class TestPriorityMuxPipe(MuxUnbufferedPipeline):
231 def __init__(self, num_rows):
232 self.num_rows = num_rows
233 stage = PassThroughStage()
234 MuxUnbufferedPipeline.__init__(self, stage, n_len=self.num_rows)
235
236 def ports(self):
237 res = [self.p.i_valid, self.p.o_ready] + \
238 self.p.i_data.ports()
239 for i in range(len(self.n)):
240 res += [self.n[i].i_ready, self.n[i].o_valid] + \
241 [self.n[i].o_data]
242 #self.n[i].o_data.ports()
243 return res
244
245
246 class TestSyncToPriorityPipe:
247 def __init__(self):
248 self.num_rows = 4
249 self.pipe = PassThroughPipe()
250 self.muxpipe = TestPriorityMuxPipe(self.num_rows)
251
252 self.p = self.pipe.p
253 self.n = self.muxpipe.n
254
255 def elaborate(self, platform):
256 m = Module()
257 m.submodules += self.pipe
258 m.submodules += self.muxpipe
259 m.d.comb += self.pipe.n.connect_to_next(self.muxpipe.p)
260 return m
261
262 def ports(self):
263 res = [self.p.i_valid, self.p.o_ready] + \
264 self.p.i_data.ports()
265 for i in range(len(self.n)):
266 res += [self.n[i].i_ready, self.n[i].o_valid] + \
267 [self.n[i].o_data]
268 #self.n[i].o_data.ports()
269 return res
270
271
272 if __name__ == '__main__':
273 dut = TestSyncToPriorityPipe()
274 vl = rtlil.convert(dut, ports=dut.ports())
275 with open("test_outmux_pipe.il", "w") as f:
276 f.write(vl)
277 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
278
279 test = OutputTest(dut)
280 run_simulation(dut, [test.rcv(1), test.rcv(0),
281 test.rcv(3), test.rcv(2),
282 test.send()],
283 vcd_name="test_outmux_pipe.vcd")
284