small tidyup of priority-encoding pipe mux
[ieee754fpu.git] / src / add / test_prioritymux_pipe.py
1 from random import randint
2 from math import log
3 from nmigen import Module, Signal, Cat
4 from nmigen.compat.sim import run_simulation
5 from nmigen.cli import verilog, rtlil
6 from nmigen.lib.coding import PriorityEncoder
7
8 from example_buf_pipe import UnbufferedPipeline
9
10
11 class InputPriorityArbiter:
12 def __init__(self, pipe, num_rows):
13 self.pipe = pipe
14 self.num_rows = num_rows
15 self.mmax = int(log(self.num_rows) / log(2))
16 self.m_id = Signal(self.mmax, reset_less=True) # multiplex id
17 self.active = Signal(reset_less=True)
18
19 def elaborate(self, platform):
20 m = Module()
21
22 assert len(self.pipe.p) == self.num_rows, \
23 "must declare input to be same size"
24 pe = PriorityEncoder(self.num_rows)
25 m.submodules.selector = pe
26
27 # connect priority encoder
28 in_ready = []
29 for i in range(self.num_rows):
30 p_i_valid = Signal(reset_less=True)
31 m.d.comb += p_i_valid.eq(self.pipe.p[i].i_valid_logic())
32 in_ready.append(p_i_valid)
33 m.d.comb += pe.i.eq(Cat(*in_ready)) # array of input "valids"
34 m.d.comb += self.active.eq(~pe.n) # encoder active (one input valid)
35 m.d.comb += self.m_id.eq(pe.o) # output one active input
36
37 return m
38
39 def ports(self):
40 return [self.m_id, self.active]
41
42
43 class PriorityUnbufferedPipeline(UnbufferedPipeline):
44 def __init__(self, stage, p_len=4):
45 p_mux = InputPriorityArbiter(self, p_len)
46 UnbufferedPipeline.__init__(self, stage, p_len=p_len, p_mux=p_mux)
47
48 def ports(self):
49 return self.p_mux.ports()
50 #return UnbufferedPipeline.ports(self) + self.p_mux.ports()
51
52 class PassData:
53 def __init__(self):
54 self.mid = Signal(2, reset_less=True)
55 self.idx = Signal(6, reset_less=True)
56 self.data = Signal(16, reset_less=True)
57
58 def eq(self, i):
59 return [self.mid.eq(i.mid), self.idx.eq(i.idx), self.data.eq(i.data)]
60
61 def ports(self):
62 return [self.mid, self.idx, self.data]
63
64 class PassThroughStage:
65 def ispec(self):
66 return PassData()
67 def ospec(self):
68 return self.ispec() # same as ospec
69
70 def process(self, i):
71 return i # pass-through
72
73
74
75 def testbench(dut):
76 stb = yield dut.out_op.stb
77 assert stb == 0
78 ack = yield dut.out_op.ack
79 assert ack == 0
80
81 # set row 1 input 0
82 yield dut.rs[1].in_op[0].eq(5)
83 yield dut.rs[1].stb.eq(0b01) # strobe indicate 1st op ready
84 #yield dut.rs[1].ack.eq(1)
85 yield
86
87 # check row 1 output (should be inactive)
88 decode = yield dut.rs[1].out_decode
89 assert decode == 0
90 if False:
91 op0 = yield dut.rs[1].out_op[0]
92 op1 = yield dut.rs[1].out_op[1]
93 assert op0 == 0 and op1 == 0
94
95 # output should be inactive
96 out_stb = yield dut.out_op.stb
97 assert out_stb == 1
98
99 # set row 0 input 1
100 yield dut.rs[1].in_op[1].eq(6)
101 yield dut.rs[1].stb.eq(0b11) # strobe indicate both ops ready
102
103 # set acknowledgement of output... takes 1 cycle to respond
104 yield dut.out_op.ack.eq(1)
105 yield
106 yield dut.out_op.ack.eq(0) # clear ack on output
107 yield dut.rs[1].stb.eq(0) # clear row 1 strobe
108
109 # output strobe should be active, MID should be 0 until "ack" is set...
110 out_stb = yield dut.out_op.stb
111 assert out_stb == 1
112 out_mid = yield dut.mid
113 assert out_mid == 0
114
115 # ... and output should not yet be passed through either
116 op0 = yield dut.out_op.v[0]
117 op1 = yield dut.out_op.v[1]
118 assert op0 == 0 and op1 == 0
119
120 # wait for out_op.ack to activate...
121 yield dut.rs[1].stb.eq(0b00) # set row 1 strobes to zero
122 yield
123
124 # *now* output should be passed through
125 op0 = yield dut.out_op.v[0]
126 op1 = yield dut.out_op.v[1]
127 assert op0 == 5 and op1 == 6
128
129 # set row 2 input
130 yield dut.rs[2].in_op[0].eq(3)
131 yield dut.rs[2].in_op[1].eq(4)
132 yield dut.rs[2].stb.eq(0b11) # strobe indicate 1st op ready
133 yield dut.out_op.ack.eq(1) # set output ack
134 yield
135 yield dut.rs[2].stb.eq(0) # clear row 2 strobe
136 yield dut.out_op.ack.eq(0) # set output ack
137 yield
138 op0 = yield dut.out_op.v[0]
139 op1 = yield dut.out_op.v[1]
140 assert op0 == 3 and op1 == 4, "op0 %d op1 %d" % (op0, op1)
141 out_mid = yield dut.mid
142 assert out_mid == 2
143
144 # set row 0 and 3 input
145 yield dut.rs[0].in_op[0].eq(9)
146 yield dut.rs[0].in_op[1].eq(8)
147 yield dut.rs[0].stb.eq(0b11) # strobe indicate 1st op ready
148 yield dut.rs[3].in_op[0].eq(1)
149 yield dut.rs[3].in_op[1].eq(2)
150 yield dut.rs[3].stb.eq(0b11) # strobe indicate 1st op ready
151
152 # set acknowledgement of output... takes 1 cycle to respond
153 yield dut.out_op.ack.eq(1)
154 yield
155 yield dut.rs[0].stb.eq(0) # clear row 1 strobe
156 yield
157 out_mid = yield dut.mid
158 assert out_mid == 0, "out mid %d" % out_mid
159
160 yield
161 yield dut.rs[3].stb.eq(0) # clear row 1 strobe
162 yield dut.out_op.ack.eq(0) # clear ack on output
163 yield
164 out_mid = yield dut.mid
165 assert out_mid == 3, "out mid %d" % out_mid
166
167
168 class InputTest:
169 def __init__(self, dut):
170 self.dut = dut
171 self.di = {}
172 self.do = {}
173 self.tlen = 10
174 for mid in range(dut.num_rows):
175 self.di[mid] = {}
176 self.do[mid] = {}
177 for i in range(self.tlen):
178 self.di[mid][i] = randint(0, 100) + (mid<<8)
179 self.do[mid][i] = self.di[mid][i]
180
181 def send(self, mid):
182 for i in range(self.tlen):
183 op2 = self.di[mid][i]
184 rs = dut.p[mid]
185 yield rs.i_valid.eq(1)
186 yield rs.i_data.data.eq(op2)
187 yield rs.i_data.idx.eq(i)
188 yield rs.i_data.mid.eq(mid)
189 yield
190 o_p_ready = yield rs.o_ready
191 while not o_p_ready:
192 yield
193 o_p_ready = yield rs.o_ready
194
195 print ("send", mid, i, hex(op2))
196
197 yield rs.i_valid.eq(0)
198 # wait random period of time before queueing another value
199 for i in range(randint(0, 3)):
200 yield
201
202 yield rs.i_valid.eq(0)
203 ## wait random period of time before queueing another value
204 #for i in range(randint(0, 3)):
205 # yield
206
207 #send_range = randint(0, 3)
208 #if send_range == 0:
209 # send = True
210 #else:
211 # send = randint(0, send_range) != 0
212
213 def rcv(self):
214 while True:
215 #stall_range = randint(0, 3)
216 #for j in range(randint(1,10)):
217 # stall = randint(0, stall_range) != 0
218 # yield self.dut.n[0].i_ready.eq(stall)
219 # yield
220 n = self.dut.n[0]
221 yield n.i_ready.eq(1)
222 yield
223 o_n_valid = yield n.o_valid
224 i_n_ready = yield n.i_ready
225 if not o_n_valid or not i_n_ready:
226 continue
227
228 mid = yield n.o_data.mid
229 out_i = yield n.o_data.idx
230 out_v = yield n.o_data.data
231
232 print ("recv", mid, out_i, hex(out_v))
233
234 # see if this output has occurred already, delete it if it has
235 assert out_i in self.do[mid], "out_i %d not in array %s" % \
236 (out_i, repr(self.do[mid]))
237 assert self.do[mid][out_i] == out_v # pass-through data
238 del self.do[mid][out_i]
239
240 # check if there's any more outputs
241 zerolen = True
242 for (k, v) in self.do.items():
243 if v:
244 zerolen = False
245 if zerolen:
246 break
247
248
249 class TestPriorityMuxPipe(PriorityUnbufferedPipeline):
250 def __init__(self):
251 self.num_rows = 4
252 stage = PassThroughStage()
253 PriorityUnbufferedPipeline.__init__(self, stage, p_len=self.num_rows)
254
255 def ports(self):
256 res = []
257 for i in range(len(self.p)):
258 res += [self.p[i].i_valid, self.p[i].o_ready] + \
259 self.p[i].i_data.ports()
260 for i in range(len(self.n)):
261 res += [self.n[i].i_ready, self.n[i].o_valid] + \
262 self.n[i].o_data.ports()
263 return res
264
265
266 if __name__ == '__main__':
267 dut = TestPriorityMuxPipe()
268 vl = rtlil.convert(dut, ports=dut.ports())
269 with open("test_inputgroup.il", "w") as f:
270 f.write(vl)
271 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
272
273 test = InputTest(dut)
274 run_simulation(dut, [test.send(1), test.send(0),
275 test.send(3), test.send(2),
276 test.rcv()],
277 vcd_name="test_inputgroup_parallel.vcd")
278