use RecordObject in output mux pipe
[ieee754fpu.git] / src / add / test_outmux_pipe.py
1 from random import randint
2 from math import log
3 from nmigen import Module, Signal, Cat
4 from nmigen.compat.sim import run_simulation
5 from nmigen.cli import verilog, rtlil
6
7 from multipipe import CombMuxOutPipe
8 from singlepipe import SimpleHandshake, PassThroughHandshake, RecordObject
9
10
11 class PassInData(RecordObject):
12 def __init__(self):
13 RecordObject.__init__(self)
14 self.mid = Signal(2, reset_less=True)
15 self.data = Signal(16, reset_less=True)
16
17
18 class PassThroughStage:
19
20 def ispec(self):
21 return PassInData()
22
23 def ospec(self):
24 return Signal(16, name="data_out", reset_less=True)
25
26 def process(self, i):
27 return i.data
28
29
30 class PassThroughDataStage:
31 def ispec(self):
32 return PassInData()
33 def ospec(self):
34 return self.ispec() # same as ospec
35
36 def process(self, i):
37 return i # pass-through
38
39
40
41 class PassThroughPipe(PassThroughHandshake):
42 def __init__(self):
43 PassThroughHandshake.__init__(self, PassThroughDataStage())
44
45
46
47
48 def testbench(dut):
49 stb = yield dut.out_op.stb
50 assert stb == 0
51 ack = yield dut.out_op.ack
52 assert ack == 0
53
54 # set row 1 input 0
55 yield dut.rs[1].in_op[0].eq(5)
56 yield dut.rs[1].stb.eq(0b01) # strobe indicate 1st op ready
57 #yield dut.rs[1].ack.eq(1)
58 yield
59
60 # check row 1 output (should be inactive)
61 decode = yield dut.rs[1].out_decode
62 assert decode == 0
63 if False:
64 op0 = yield dut.rs[1].out_op[0]
65 op1 = yield dut.rs[1].out_op[1]
66 assert op0 == 0 and op1 == 0
67
68 # output should be inactive
69 out_stb = yield dut.out_op.stb
70 assert out_stb == 1
71
72 # set row 0 input 1
73 yield dut.rs[1].in_op[1].eq(6)
74 yield dut.rs[1].stb.eq(0b11) # strobe indicate both ops ready
75
76 # set acknowledgement of output... takes 1 cycle to respond
77 yield dut.out_op.ack.eq(1)
78 yield
79 yield dut.out_op.ack.eq(0) # clear ack on output
80 yield dut.rs[1].stb.eq(0) # clear row 1 strobe
81
82 # output strobe should be active, MID should be 0 until "ack" is set...
83 out_stb = yield dut.out_op.stb
84 assert out_stb == 1
85 out_mid = yield dut.mid
86 assert out_mid == 0
87
88 # ... and output should not yet be passed through either
89 op0 = yield dut.out_op.v[0]
90 op1 = yield dut.out_op.v[1]
91 assert op0 == 0 and op1 == 0
92
93 # wait for out_op.ack to activate...
94 yield dut.rs[1].stb.eq(0b00) # set row 1 strobes to zero
95 yield
96
97 # *now* output should be passed through
98 op0 = yield dut.out_op.v[0]
99 op1 = yield dut.out_op.v[1]
100 assert op0 == 5 and op1 == 6
101
102 # set row 2 input
103 yield dut.rs[2].in_op[0].eq(3)
104 yield dut.rs[2].in_op[1].eq(4)
105 yield dut.rs[2].stb.eq(0b11) # strobe indicate 1st op ready
106 yield dut.out_op.ack.eq(1) # set output ack
107 yield
108 yield dut.rs[2].stb.eq(0) # clear row 2 strobe
109 yield dut.out_op.ack.eq(0) # set output ack
110 yield
111 op0 = yield dut.out_op.v[0]
112 op1 = yield dut.out_op.v[1]
113 assert op0 == 3 and op1 == 4, "op0 %d op1 %d" % (op0, op1)
114 out_mid = yield dut.mid
115 assert out_mid == 2
116
117 # set row 0 and 3 input
118 yield dut.rs[0].in_op[0].eq(9)
119 yield dut.rs[0].in_op[1].eq(8)
120 yield dut.rs[0].stb.eq(0b11) # strobe indicate 1st op ready
121 yield dut.rs[3].in_op[0].eq(1)
122 yield dut.rs[3].in_op[1].eq(2)
123 yield dut.rs[3].stb.eq(0b11) # strobe indicate 1st op ready
124
125 # set acknowledgement of output... takes 1 cycle to respond
126 yield dut.out_op.ack.eq(1)
127 yield
128 yield dut.rs[0].stb.eq(0) # clear row 1 strobe
129 yield
130 out_mid = yield dut.mid
131 assert out_mid == 0, "out mid %d" % out_mid
132
133 yield
134 yield dut.rs[3].stb.eq(0) # clear row 1 strobe
135 yield dut.out_op.ack.eq(0) # clear ack on output
136 yield
137 out_mid = yield dut.mid
138 assert out_mid == 3, "out mid %d" % out_mid
139
140
141 class OutputTest:
142 def __init__(self, dut):
143 self.dut = dut
144 self.di = []
145 self.do = {}
146 self.tlen = 10
147 for i in range(self.tlen * dut.num_rows):
148 if i < dut.num_rows:
149 mid = i
150 else:
151 mid = randint(0, dut.num_rows-1)
152 data = randint(0, 255) + (mid<<8)
153 if mid not in self.do:
154 self.do[mid] = []
155 self.di.append((data, mid))
156 self.do[mid].append(data)
157
158 def send(self):
159 for i in range(self.tlen * dut.num_rows):
160 op2 = self.di[i][0]
161 mid = self.di[i][1]
162 rs = dut.p
163 yield rs.i_valid.eq(1)
164 yield rs.i_data.data.eq(op2)
165 yield rs.i_data.mid.eq(mid)
166 yield
167 o_p_ready = yield rs.o_ready
168 while not o_p_ready:
169 yield
170 o_p_ready = yield rs.o_ready
171
172 print ("send", mid, i, hex(op2))
173
174 yield rs.i_valid.eq(0)
175 # wait random period of time before queueing another value
176 for i in range(randint(0, 3)):
177 yield
178
179 yield rs.i_valid.eq(0)
180
181 def rcv(self, mid):
182 out_i = 0
183 count = 0
184 stall_range = randint(0, 3)
185 while out_i != len(self.do[mid]):
186 count += 1
187 assert count != 2000, "timeout: too long"
188 n = self.dut.n[mid]
189 yield n.i_ready.eq(1)
190 yield
191 o_n_valid = yield n.o_valid
192 i_n_ready = yield n.i_ready
193 if not o_n_valid or not i_n_ready:
194 continue
195
196 out_v = yield n.o_data
197
198 print ("recv", mid, out_i, hex(out_v))
199
200 assert self.do[mid][out_i] == out_v # pass-through data
201
202 out_i += 1
203
204 if randint(0, 5) == 0:
205 stall_range = randint(0, 3)
206 stall = randint(0, stall_range) != 0
207 if stall:
208 yield n.i_ready.eq(0)
209 for i in range(stall_range):
210 yield
211
212
213 class TestPriorityMuxPipe(CombMuxOutPipe):
214 def __init__(self, num_rows):
215 self.num_rows = num_rows
216 stage = PassThroughStage()
217 CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows)
218
219
220 class TestSyncToPriorityPipe:
221 def __init__(self):
222 self.num_rows = 4
223 self.pipe = PassThroughPipe()
224 self.muxpipe = TestPriorityMuxPipe(self.num_rows)
225
226 self.p = self.pipe.p
227 self.n = self.muxpipe.n
228
229 def elaborate(self, platform):
230 m = Module()
231 m.submodules.pipe = self.pipe
232 m.submodules.muxpipe = self.muxpipe
233 m.d.comb += self.pipe.n.connect_to_next(self.muxpipe.p)
234 return m
235
236 def ports(self):
237 res = [self.p.i_valid, self.p.o_ready] + \
238 self.p.i_data.ports()
239 for i in range(len(self.n)):
240 res += [self.n[i].i_ready, self.n[i].o_valid] + \
241 [self.n[i].o_data]
242 #self.n[i].o_data.ports()
243 return res
244
245
246 if __name__ == '__main__':
247 dut = TestSyncToPriorityPipe()
248 vl = rtlil.convert(dut, ports=dut.ports())
249 with open("test_outmux_pipe.il", "w") as f:
250 f.write(vl)
251 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
252
253 test = OutputTest(dut)
254 run_simulation(dut, [test.rcv(1), test.rcv(0),
255 test.rcv(3), test.rcv(2),
256 test.send()],
257 vcd_name="test_outmux_pipe.vcd")
258