create base multi-in ports function
[ieee754fpu.git] / src / add / test_prioritymux_pipe.py
1 from random import randint
2 from math import log
3 from nmigen import Module, Signal, Cat
4 from nmigen.compat.sim import run_simulation
5 from nmigen.cli import verilog, rtlil
6
7 from multipipe import (CombMultiInPipeline, PriorityCombMuxInPipe)
8
9
10 class PassData:
11 def __init__(self):
12 self.mid = Signal(2, reset_less=True)
13 self.idx = Signal(6, reset_less=True)
14 self.data = Signal(16, reset_less=True)
15
16 def eq(self, i):
17 return [self.mid.eq(i.mid), self.idx.eq(i.idx), self.data.eq(i.data)]
18
19 def ports(self):
20 return [self.mid, self.idx, self.data]
21
22
23 class PassThroughStage:
24 def ispec(self):
25 return PassData()
26 def ospec(self):
27 return self.ispec() # same as ospec
28 def process(self, i):
29 return i # pass-through
30
31
32
33 def testbench(dut):
34 stb = yield dut.out_op.stb
35 assert stb == 0
36 ack = yield dut.out_op.ack
37 assert ack == 0
38
39 # set row 1 input 0
40 yield dut.rs[1].in_op[0].eq(5)
41 yield dut.rs[1].stb.eq(0b01) # strobe indicate 1st op ready
42 #yield dut.rs[1].ack.eq(1)
43 yield
44
45 # check row 1 output (should be inactive)
46 decode = yield dut.rs[1].out_decode
47 assert decode == 0
48 if False:
49 op0 = yield dut.rs[1].out_op[0]
50 op1 = yield dut.rs[1].out_op[1]
51 assert op0 == 0 and op1 == 0
52
53 # output should be inactive
54 out_stb = yield dut.out_op.stb
55 assert out_stb == 1
56
57 # set row 0 input 1
58 yield dut.rs[1].in_op[1].eq(6)
59 yield dut.rs[1].stb.eq(0b11) # strobe indicate both ops ready
60
61 # set acknowledgement of output... takes 1 cycle to respond
62 yield dut.out_op.ack.eq(1)
63 yield
64 yield dut.out_op.ack.eq(0) # clear ack on output
65 yield dut.rs[1].stb.eq(0) # clear row 1 strobe
66
67 # output strobe should be active, MID should be 0 until "ack" is set...
68 out_stb = yield dut.out_op.stb
69 assert out_stb == 1
70 out_mid = yield dut.mid
71 assert out_mid == 0
72
73 # ... and output should not yet be passed through either
74 op0 = yield dut.out_op.v[0]
75 op1 = yield dut.out_op.v[1]
76 assert op0 == 0 and op1 == 0
77
78 # wait for out_op.ack to activate...
79 yield dut.rs[1].stb.eq(0b00) # set row 1 strobes to zero
80 yield
81
82 # *now* output should be passed through
83 op0 = yield dut.out_op.v[0]
84 op1 = yield dut.out_op.v[1]
85 assert op0 == 5 and op1 == 6
86
87 # set row 2 input
88 yield dut.rs[2].in_op[0].eq(3)
89 yield dut.rs[2].in_op[1].eq(4)
90 yield dut.rs[2].stb.eq(0b11) # strobe indicate 1st op ready
91 yield dut.out_op.ack.eq(1) # set output ack
92 yield
93 yield dut.rs[2].stb.eq(0) # clear row 2 strobe
94 yield dut.out_op.ack.eq(0) # set output ack
95 yield
96 op0 = yield dut.out_op.v[0]
97 op1 = yield dut.out_op.v[1]
98 assert op0 == 3 and op1 == 4, "op0 %d op1 %d" % (op0, op1)
99 out_mid = yield dut.mid
100 assert out_mid == 2
101
102 # set row 0 and 3 input
103 yield dut.rs[0].in_op[0].eq(9)
104 yield dut.rs[0].in_op[1].eq(8)
105 yield dut.rs[0].stb.eq(0b11) # strobe indicate 1st op ready
106 yield dut.rs[3].in_op[0].eq(1)
107 yield dut.rs[3].in_op[1].eq(2)
108 yield dut.rs[3].stb.eq(0b11) # strobe indicate 1st op ready
109
110 # set acknowledgement of output... takes 1 cycle to respond
111 yield dut.out_op.ack.eq(1)
112 yield
113 yield dut.rs[0].stb.eq(0) # clear row 1 strobe
114 yield
115 out_mid = yield dut.mid
116 assert out_mid == 0, "out mid %d" % out_mid
117
118 yield
119 yield dut.rs[3].stb.eq(0) # clear row 1 strobe
120 yield dut.out_op.ack.eq(0) # clear ack on output
121 yield
122 out_mid = yield dut.mid
123 assert out_mid == 3, "out mid %d" % out_mid
124
125
126 class InputTest:
127 def __init__(self, dut):
128 self.dut = dut
129 self.di = {}
130 self.do = {}
131 self.tlen = 10
132 for mid in range(dut.num_rows):
133 self.di[mid] = {}
134 self.do[mid] = {}
135 for i in range(self.tlen):
136 self.di[mid][i] = randint(0, 100) + (mid<<8)
137 self.do[mid][i] = self.di[mid][i]
138
139 def send(self, mid):
140 for i in range(self.tlen):
141 op2 = self.di[mid][i]
142 rs = dut.p[mid]
143 yield rs.i_valid.eq(1)
144 yield rs.i_data.data.eq(op2)
145 yield rs.i_data.idx.eq(i)
146 yield rs.i_data.mid.eq(mid)
147 yield
148 o_p_ready = yield rs.o_ready
149 while not o_p_ready:
150 yield
151 o_p_ready = yield rs.o_ready
152
153 print ("send", mid, i, hex(op2))
154
155 yield rs.i_valid.eq(0)
156 # wait random period of time before queueing another value
157 for i in range(randint(0, 3)):
158 yield
159
160 yield rs.i_valid.eq(0)
161 ## wait random period of time before queueing another value
162 #for i in range(randint(0, 3)):
163 # yield
164
165 #send_range = randint(0, 3)
166 #if send_range == 0:
167 # send = True
168 #else:
169 # send = randint(0, send_range) != 0
170
171 def rcv(self):
172 while True:
173 #stall_range = randint(0, 3)
174 #for j in range(randint(1,10)):
175 # stall = randint(0, stall_range) != 0
176 # yield self.dut.n[0].i_ready.eq(stall)
177 # yield
178 n = self.dut.n
179 yield n.i_ready.eq(1)
180 yield
181 o_n_valid = yield n.o_valid
182 i_n_ready = yield n.i_ready
183 if not o_n_valid or not i_n_ready:
184 continue
185
186 mid = yield n.o_data.mid
187 out_i = yield n.o_data.idx
188 out_v = yield n.o_data.data
189
190 print ("recv", mid, out_i, hex(out_v))
191
192 # see if this output has occurred already, delete it if it has
193 assert out_i in self.do[mid], "out_i %d not in array %s" % \
194 (out_i, repr(self.do[mid]))
195 assert self.do[mid][out_i] == out_v # pass-through data
196 del self.do[mid][out_i]
197
198 # check if there's any more outputs
199 zerolen = True
200 for (k, v) in self.do.items():
201 if v:
202 zerolen = False
203 if zerolen:
204 break
205
206
207 class TestPriorityMuxPipe(PriorityCombMuxInPipe):
208 def __init__(self):
209 self.num_rows = 4
210 stage = PassThroughStage()
211 PriorityCombMuxInPipe.__init__(self, stage, p_len=self.num_rows)
212
213
214 if __name__ == '__main__':
215 dut = TestPriorityMuxPipe()
216 vl = rtlil.convert(dut, ports=dut.ports())
217 with open("test_inputgroup_multi.il", "w") as f:
218 f.write(vl)
219 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
220
221 test = InputTest(dut)
222 run_simulation(dut, [test.send(1), test.send(0),
223 test.send(3), test.send(2),
224 test.rcv()],
225 vcd_name="test_inputgroup_multi.vcd")
226