pass in flatten/processing function into _connect_in/out
[ieee754fpu.git] / src / add / test_outmux_pipe.py
1 from random import randint
2 from math import log
3 from nmigen import Module, Signal, Cat
4 from nmigen.compat.sim import run_simulation
5 from nmigen.cli import verilog, rtlil
6
7 from multipipe import CombMuxOutPipe
8 from singlepipe import SimpleHandshake
9
10
11 class PassInData:
12 def __init__(self):
13 self.mid = Signal(2, reset_less=True)
14 self.data = Signal(16, reset_less=True)
15
16 def eq(self, i):
17 return [self.mid.eq(i.mid), self.data.eq(i.data)]
18
19 def ports(self):
20 return [self.mid, self.data]
21
22
23 class PassThroughStage:
24
25 def ispec(self):
26 return PassInData()
27
28 def ospec(self):
29 return Signal(16, name="data_out", reset_less=True)
30
31 def process(self, i):
32 return i.data
33
34
35 class PassThroughDataStage:
36 def ispec(self):
37 return PassInData()
38 def ospec(self):
39 return self.ispec() # same as ospec
40
41 def process(self, i):
42 return i # pass-through
43
44
45
46 class PassThroughPipe(SimpleHandshake):
47 def __init__(self):
48 SimpleHandshake.__init__(self, PassThroughDataStage())
49
50
51
52
53 def testbench(dut):
54 stb = yield dut.out_op.stb
55 assert stb == 0
56 ack = yield dut.out_op.ack
57 assert ack == 0
58
59 # set row 1 input 0
60 yield dut.rs[1].in_op[0].eq(5)
61 yield dut.rs[1].stb.eq(0b01) # strobe indicate 1st op ready
62 #yield dut.rs[1].ack.eq(1)
63 yield
64
65 # check row 1 output (should be inactive)
66 decode = yield dut.rs[1].out_decode
67 assert decode == 0
68 if False:
69 op0 = yield dut.rs[1].out_op[0]
70 op1 = yield dut.rs[1].out_op[1]
71 assert op0 == 0 and op1 == 0
72
73 # output should be inactive
74 out_stb = yield dut.out_op.stb
75 assert out_stb == 1
76
77 # set row 0 input 1
78 yield dut.rs[1].in_op[1].eq(6)
79 yield dut.rs[1].stb.eq(0b11) # strobe indicate both ops ready
80
81 # set acknowledgement of output... takes 1 cycle to respond
82 yield dut.out_op.ack.eq(1)
83 yield
84 yield dut.out_op.ack.eq(0) # clear ack on output
85 yield dut.rs[1].stb.eq(0) # clear row 1 strobe
86
87 # output strobe should be active, MID should be 0 until "ack" is set...
88 out_stb = yield dut.out_op.stb
89 assert out_stb == 1
90 out_mid = yield dut.mid
91 assert out_mid == 0
92
93 # ... and output should not yet be passed through either
94 op0 = yield dut.out_op.v[0]
95 op1 = yield dut.out_op.v[1]
96 assert op0 == 0 and op1 == 0
97
98 # wait for out_op.ack to activate...
99 yield dut.rs[1].stb.eq(0b00) # set row 1 strobes to zero
100 yield
101
102 # *now* output should be passed through
103 op0 = yield dut.out_op.v[0]
104 op1 = yield dut.out_op.v[1]
105 assert op0 == 5 and op1 == 6
106
107 # set row 2 input
108 yield dut.rs[2].in_op[0].eq(3)
109 yield dut.rs[2].in_op[1].eq(4)
110 yield dut.rs[2].stb.eq(0b11) # strobe indicate 1st op ready
111 yield dut.out_op.ack.eq(1) # set output ack
112 yield
113 yield dut.rs[2].stb.eq(0) # clear row 2 strobe
114 yield dut.out_op.ack.eq(0) # set output ack
115 yield
116 op0 = yield dut.out_op.v[0]
117 op1 = yield dut.out_op.v[1]
118 assert op0 == 3 and op1 == 4, "op0 %d op1 %d" % (op0, op1)
119 out_mid = yield dut.mid
120 assert out_mid == 2
121
122 # set row 0 and 3 input
123 yield dut.rs[0].in_op[0].eq(9)
124 yield dut.rs[0].in_op[1].eq(8)
125 yield dut.rs[0].stb.eq(0b11) # strobe indicate 1st op ready
126 yield dut.rs[3].in_op[0].eq(1)
127 yield dut.rs[3].in_op[1].eq(2)
128 yield dut.rs[3].stb.eq(0b11) # strobe indicate 1st op ready
129
130 # set acknowledgement of output... takes 1 cycle to respond
131 yield dut.out_op.ack.eq(1)
132 yield
133 yield dut.rs[0].stb.eq(0) # clear row 1 strobe
134 yield
135 out_mid = yield dut.mid
136 assert out_mid == 0, "out mid %d" % out_mid
137
138 yield
139 yield dut.rs[3].stb.eq(0) # clear row 1 strobe
140 yield dut.out_op.ack.eq(0) # clear ack on output
141 yield
142 out_mid = yield dut.mid
143 assert out_mid == 3, "out mid %d" % out_mid
144
145
146 class OutputTest:
147 def __init__(self, dut):
148 self.dut = dut
149 self.di = []
150 self.do = {}
151 self.tlen = 10
152 for i in range(self.tlen * dut.num_rows):
153 if i < dut.num_rows:
154 mid = i
155 else:
156 mid = randint(0, dut.num_rows-1)
157 data = randint(0, 255) + (mid<<8)
158 if mid not in self.do:
159 self.do[mid] = []
160 self.di.append((data, mid))
161 self.do[mid].append(data)
162
163 def send(self):
164 for i in range(self.tlen * dut.num_rows):
165 op2 = self.di[i][0]
166 mid = self.di[i][1]
167 rs = dut.p
168 yield rs.i_valid.eq(1)
169 yield rs.i_data.data.eq(op2)
170 yield rs.i_data.mid.eq(mid)
171 yield
172 o_p_ready = yield rs.o_ready
173 while not o_p_ready:
174 yield
175 o_p_ready = yield rs.o_ready
176
177 print ("send", mid, i, hex(op2))
178
179 yield rs.i_valid.eq(0)
180 # wait random period of time before queueing another value
181 for i in range(randint(0, 3)):
182 yield
183
184 yield rs.i_valid.eq(0)
185
186 def rcv(self, mid):
187 out_i = 0
188 count = 0
189 stall_range = randint(0, 3)
190 while out_i != len(self.do[mid]):
191 count += 1
192 assert count != 2000, "timeout: too long"
193 n = self.dut.n[mid]
194 yield n.i_ready.eq(1)
195 yield
196 o_n_valid = yield n.o_valid
197 i_n_ready = yield n.i_ready
198 if not o_n_valid or not i_n_ready:
199 continue
200
201 out_v = yield n.o_data
202
203 print ("recv", mid, out_i, hex(out_v))
204
205 assert self.do[mid][out_i] == out_v # pass-through data
206
207 out_i += 1
208
209 if randint(0, 5) == 0:
210 stall_range = randint(0, 3)
211 stall = randint(0, stall_range) != 0
212 if stall:
213 yield n.i_ready.eq(0)
214 for i in range(stall_range):
215 yield
216
217
218 class TestPriorityMuxPipe(CombMuxOutPipe):
219 def __init__(self, num_rows):
220 self.num_rows = num_rows
221 stage = PassThroughStage()
222 CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows)
223
224
225 class TestSyncToPriorityPipe:
226 def __init__(self):
227 self.num_rows = 4
228 self.pipe = PassThroughPipe()
229 self.muxpipe = TestPriorityMuxPipe(self.num_rows)
230
231 self.p = self.pipe.p
232 self.n = self.muxpipe.n
233
234 def elaborate(self, platform):
235 m = Module()
236 m.submodules += self.pipe
237 m.submodules += self.muxpipe
238 m.d.comb += self.pipe.n.connect_to_next(self.muxpipe.p)
239 return m
240
241 def ports(self):
242 res = [self.p.i_valid, self.p.o_ready] + \
243 self.p.i_data.ports()
244 for i in range(len(self.n)):
245 res += [self.n[i].i_ready, self.n[i].o_valid] + \
246 [self.n[i].o_data]
247 #self.n[i].o_data.ports()
248 return res
249
250
251 if __name__ == '__main__':
252 dut = TestSyncToPriorityPipe()
253 vl = rtlil.convert(dut, ports=dut.ports())
254 with open("test_outmux_pipe.il", "w") as f:
255 f.write(vl)
256 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
257
258 test = OutputTest(dut)
259 run_simulation(dut, [test.rcv(1), test.rcv(0),
260 test.rcv(3), test.rcv(2),
261 test.send()],
262 vcd_name="test_outmux_pipe.vcd")
263