add multi-out pipe module (untested)
[ieee754fpu.git] / src / add / test_prioritymux_pipe.py
1 from random import randint
2 from math import log
3 from nmigen import Module, Signal, Cat
4 from nmigen.compat.sim import run_simulation
5 from nmigen.cli import verilog, rtlil
6
7 from multipipe import CombMultiInPipeline, InputPriorityArbiter
8
9
10
11 class PriorityUnbufferedPipeline(CombMultiInPipeline):
12 def __init__(self, stage, p_len=4):
13 p_mux = InputPriorityArbiter(self, p_len)
14 CombMultiInPipeline.__init__(self, stage, p_len=p_len, p_mux=p_mux)
15
16 def ports(self):
17 return self.p_mux.ports()
18 #return UnbufferedPipeline.ports(self) + self.p_mux.ports()
19
20 class PassData:
21 def __init__(self):
22 self.mid = Signal(2, reset_less=True)
23 self.idx = Signal(6, reset_less=True)
24 self.data = Signal(16, reset_less=True)
25
26 def eq(self, i):
27 return [self.mid.eq(i.mid), self.idx.eq(i.idx), self.data.eq(i.data)]
28
29 def ports(self):
30 return [self.mid, self.idx, self.data]
31
32 class PassThroughStage:
33 def ispec(self):
34 return PassData()
35 def ospec(self):
36 return self.ispec() # same as ospec
37
38 def process(self, i):
39 return i # pass-through
40
41
42
43 def testbench(dut):
44 stb = yield dut.out_op.stb
45 assert stb == 0
46 ack = yield dut.out_op.ack
47 assert ack == 0
48
49 # set row 1 input 0
50 yield dut.rs[1].in_op[0].eq(5)
51 yield dut.rs[1].stb.eq(0b01) # strobe indicate 1st op ready
52 #yield dut.rs[1].ack.eq(1)
53 yield
54
55 # check row 1 output (should be inactive)
56 decode = yield dut.rs[1].out_decode
57 assert decode == 0
58 if False:
59 op0 = yield dut.rs[1].out_op[0]
60 op1 = yield dut.rs[1].out_op[1]
61 assert op0 == 0 and op1 == 0
62
63 # output should be inactive
64 out_stb = yield dut.out_op.stb
65 assert out_stb == 1
66
67 # set row 0 input 1
68 yield dut.rs[1].in_op[1].eq(6)
69 yield dut.rs[1].stb.eq(0b11) # strobe indicate both ops ready
70
71 # set acknowledgement of output... takes 1 cycle to respond
72 yield dut.out_op.ack.eq(1)
73 yield
74 yield dut.out_op.ack.eq(0) # clear ack on output
75 yield dut.rs[1].stb.eq(0) # clear row 1 strobe
76
77 # output strobe should be active, MID should be 0 until "ack" is set...
78 out_stb = yield dut.out_op.stb
79 assert out_stb == 1
80 out_mid = yield dut.mid
81 assert out_mid == 0
82
83 # ... and output should not yet be passed through either
84 op0 = yield dut.out_op.v[0]
85 op1 = yield dut.out_op.v[1]
86 assert op0 == 0 and op1 == 0
87
88 # wait for out_op.ack to activate...
89 yield dut.rs[1].stb.eq(0b00) # set row 1 strobes to zero
90 yield
91
92 # *now* output should be passed through
93 op0 = yield dut.out_op.v[0]
94 op1 = yield dut.out_op.v[1]
95 assert op0 == 5 and op1 == 6
96
97 # set row 2 input
98 yield dut.rs[2].in_op[0].eq(3)
99 yield dut.rs[2].in_op[1].eq(4)
100 yield dut.rs[2].stb.eq(0b11) # strobe indicate 1st op ready
101 yield dut.out_op.ack.eq(1) # set output ack
102 yield
103 yield dut.rs[2].stb.eq(0) # clear row 2 strobe
104 yield dut.out_op.ack.eq(0) # set output ack
105 yield
106 op0 = yield dut.out_op.v[0]
107 op1 = yield dut.out_op.v[1]
108 assert op0 == 3 and op1 == 4, "op0 %d op1 %d" % (op0, op1)
109 out_mid = yield dut.mid
110 assert out_mid == 2
111
112 # set row 0 and 3 input
113 yield dut.rs[0].in_op[0].eq(9)
114 yield dut.rs[0].in_op[1].eq(8)
115 yield dut.rs[0].stb.eq(0b11) # strobe indicate 1st op ready
116 yield dut.rs[3].in_op[0].eq(1)
117 yield dut.rs[3].in_op[1].eq(2)
118 yield dut.rs[3].stb.eq(0b11) # strobe indicate 1st op ready
119
120 # set acknowledgement of output... takes 1 cycle to respond
121 yield dut.out_op.ack.eq(1)
122 yield
123 yield dut.rs[0].stb.eq(0) # clear row 1 strobe
124 yield
125 out_mid = yield dut.mid
126 assert out_mid == 0, "out mid %d" % out_mid
127
128 yield
129 yield dut.rs[3].stb.eq(0) # clear row 1 strobe
130 yield dut.out_op.ack.eq(0) # clear ack on output
131 yield
132 out_mid = yield dut.mid
133 assert out_mid == 3, "out mid %d" % out_mid
134
135
136 class InputTest:
137 def __init__(self, dut):
138 self.dut = dut
139 self.di = {}
140 self.do = {}
141 self.tlen = 10
142 for mid in range(dut.num_rows):
143 self.di[mid] = {}
144 self.do[mid] = {}
145 for i in range(self.tlen):
146 self.di[mid][i] = randint(0, 100) + (mid<<8)
147 self.do[mid][i] = self.di[mid][i]
148
149 def send(self, mid):
150 for i in range(self.tlen):
151 op2 = self.di[mid][i]
152 rs = dut.p[mid]
153 yield rs.i_valid.eq(1)
154 yield rs.i_data.data.eq(op2)
155 yield rs.i_data.idx.eq(i)
156 yield rs.i_data.mid.eq(mid)
157 yield
158 o_p_ready = yield rs.o_ready
159 while not o_p_ready:
160 yield
161 o_p_ready = yield rs.o_ready
162
163 print ("send", mid, i, hex(op2))
164
165 yield rs.i_valid.eq(0)
166 # wait random period of time before queueing another value
167 for i in range(randint(0, 3)):
168 yield
169
170 yield rs.i_valid.eq(0)
171 ## wait random period of time before queueing another value
172 #for i in range(randint(0, 3)):
173 # yield
174
175 #send_range = randint(0, 3)
176 #if send_range == 0:
177 # send = True
178 #else:
179 # send = randint(0, send_range) != 0
180
181 def rcv(self):
182 while True:
183 #stall_range = randint(0, 3)
184 #for j in range(randint(1,10)):
185 # stall = randint(0, stall_range) != 0
186 # yield self.dut.n[0].i_ready.eq(stall)
187 # yield
188 n = self.dut.n
189 yield n.i_ready.eq(1)
190 yield
191 o_n_valid = yield n.o_valid
192 i_n_ready = yield n.i_ready
193 if not o_n_valid or not i_n_ready:
194 continue
195
196 mid = yield n.o_data.mid
197 out_i = yield n.o_data.idx
198 out_v = yield n.o_data.data
199
200 print ("recv", mid, out_i, hex(out_v))
201
202 # see if this output has occurred already, delete it if it has
203 assert out_i in self.do[mid], "out_i %d not in array %s" % \
204 (out_i, repr(self.do[mid]))
205 assert self.do[mid][out_i] == out_v # pass-through data
206 del self.do[mid][out_i]
207
208 # check if there's any more outputs
209 zerolen = True
210 for (k, v) in self.do.items():
211 if v:
212 zerolen = False
213 if zerolen:
214 break
215
216
217 class TestPriorityMuxPipe(PriorityUnbufferedPipeline):
218 def __init__(self):
219 self.num_rows = 4
220 stage = PassThroughStage()
221 PriorityUnbufferedPipeline.__init__(self, stage, p_len=self.num_rows)
222
223 def ports(self):
224 res = []
225 for i in range(len(self.p)):
226 res += [self.p[i].i_valid, self.p[i].o_ready] + \
227 self.p[i].i_data.ports()
228 res += [self.n.i_ready, self.n.o_valid] + \
229 self.n.o_data.ports()
230 return res
231
232
233 if __name__ == '__main__':
234 dut = TestPriorityMuxPipe()
235 vl = rtlil.convert(dut, ports=dut.ports())
236 with open("test_inputgroup_multi.il", "w") as f:
237 f.write(vl)
238 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
239
240 test = InputTest(dut)
241 run_simulation(dut, [test.send(1), test.send(0),
242 test.send(3), test.send(2),
243 test.rcv()],
244 vcd_name="test_inputgroup_multi.vcd")
245