add combinatorial pipe example
[ieee754fpu.git] / src / add / test_buf_pipe.py
1 from nmigen import Module, Signal
2 from nmigen.compat.sim import run_simulation
3 from example_buf_pipe import ExampleBufPipe, ExampleBufPipeAdd
4 from example_buf_pipe import ExampleCombPipe
5 from example_buf_pipe import IOAckIn, IOAckOut
6 from random import randint
7
8
9 def check_o_n_valid(dut, val):
10 o_n_valid = yield dut.o.n_valid
11 assert o_n_valid == val
12
13
14 def testbench(dut):
15 #yield dut.i_p_rst.eq(1)
16 yield dut.i.n_ready.eq(0)
17 yield dut.o.p_ready.eq(0)
18 yield
19 yield
20 #yield dut.i_p_rst.eq(0)
21 yield dut.i.n_ready.eq(1)
22 yield dut.i.data.eq(5)
23 yield dut.i.p_valid.eq(1)
24 yield
25
26 yield dut.i.data.eq(7)
27 yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed
28 yield
29 yield from check_o_n_valid(dut, 1) # ok *now* i_p_valid effect is felt
30
31 yield dut.i.data.eq(2)
32 yield
33 yield dut.i.n_ready.eq(0) # begin going into "stall" (next stage says ready)
34 yield dut.i.data.eq(9)
35 yield
36 yield dut.i.p_valid.eq(0)
37 yield dut.i.data.eq(12)
38 yield
39 yield dut.i.data.eq(32)
40 yield dut.i.n_ready.eq(1)
41 yield
42 yield from check_o_n_valid(dut, 1) # buffer still needs to output
43 yield
44 yield from check_o_n_valid(dut, 1) # buffer still needs to output
45 yield
46 yield from check_o_n_valid(dut, 0) # buffer outputted, *now* we're done.
47 yield
48
49
50 def testbench2(dut):
51 #yield dut.i.p_rst.eq(1)
52 yield dut.i.n_ready.eq(0)
53 #yield dut.o.p_ready.eq(0)
54 yield
55 yield
56 #yield dut.i.p_rst.eq(0)
57 yield dut.i.n_ready.eq(1)
58 yield dut.i.data.eq(5)
59 yield dut.i.p_valid.eq(1)
60 yield
61
62 yield dut.i.data.eq(7)
63 yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed 2 clocks
64 yield
65 yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed 2 clocks
66
67 yield dut.i.data.eq(2)
68 yield
69 yield from check_o_n_valid(dut, 1) # ok *now* i_p_valid effect is felt
70 yield dut.i.n_ready.eq(0) # begin going into "stall" (next stage says ready)
71 yield dut.i.data.eq(9)
72 yield
73 yield dut.i.p_valid.eq(0)
74 yield dut.i.data.eq(12)
75 yield
76 yield dut.i.data.eq(32)
77 yield dut.i.n_ready.eq(1)
78 yield
79 yield from check_o_n_valid(dut, 1) # buffer still needs to output
80 yield
81 yield from check_o_n_valid(dut, 1) # buffer still needs to output
82 yield
83 yield from check_o_n_valid(dut, 1) # buffer still needs to output
84 yield
85 yield from check_o_n_valid(dut, 0) # buffer outputted, *now* we're done.
86 yield
87 yield
88 yield
89
90
91 class Test3:
92 def __init__(self, dut):
93 self.dut = dut
94 self.data = []
95 for i in range(num_tests):
96 #data.append(randint(0, 1<<16-1))
97 self.data.append(i+1)
98 self.i = 0
99 self.o = 0
100
101 def send(self):
102 while self.o != len(self.data):
103 send_range = randint(0, 3)
104 for j in range(randint(1,10)):
105 if send_range == 0:
106 send = True
107 else:
108 send = randint(0, send_range) != 0
109 o_p_ready = yield self.dut.o.p_ready
110 if not o_p_ready:
111 yield
112 continue
113 if send and self.i != len(self.data):
114 yield self.dut.i.p_valid.eq(1)
115 yield self.dut.i.data.eq(self.data[self.i])
116 self.i += 1
117 else:
118 yield self.dut.i.p_valid.eq(0)
119 yield
120
121 def rcv(self):
122 while self.o != len(self.data):
123 stall_range = randint(0, 3)
124 for j in range(randint(1,10)):
125 stall = randint(0, stall_range) != 0
126 yield self.dut.i.n_ready.eq(stall)
127 yield
128 o_n_valid = yield self.dut.o.n_valid
129 i_n_ready = yield self.dut.i.n_ready
130 if not o_n_valid or not i_n_ready:
131 continue
132 o_data = yield self.dut.o.data
133 assert o_data == self.data[self.o] + 1, \
134 "%d-%d data %x not match %x\n" \
135 % (self.i, self.o, o_data, self.data[self.o])
136 self.o += 1
137 if self.o == len(self.data):
138 break
139
140
141 class Test5:
142 def __init__(self, dut):
143 self.dut = dut
144 self.data = []
145 for i in range(num_tests):
146 self.data.append((randint(0, 1<<14), randint(0, 1<<14)))
147 self.i = 0
148 self.o = 0
149
150 def send(self):
151 while self.o != len(self.data):
152 send_range = randint(0, 3)
153 for j in range(randint(1,10)):
154 if send_range == 0:
155 send = True
156 else:
157 send = randint(0, send_range) != 0
158 o_p_ready = yield self.dut.o.p_ready
159 if not o_p_ready:
160 yield
161 continue
162 if send and self.i != len(self.data):
163 yield self.dut.i.p_valid.eq(1)
164 for v in self.dut.set_input(self.data[self.i]):
165 yield v
166 self.i += 1
167 else:
168 yield self.dut.i.p_valid.eq(0)
169 yield
170
171 def rcv(self):
172 while self.o != len(self.data):
173 stall_range = randint(0, 3)
174 for j in range(randint(1,10)):
175 stall = randint(0, stall_range) != 0
176 yield self.dut.i.n_ready.eq(stall)
177 yield
178 o_n_valid = yield self.dut.o.n_valid
179 i_n_ready = yield self.dut.i.n_ready
180 if not o_n_valid or not i_n_ready:
181 continue
182 o_data = yield self.dut.o.data
183 res = self.data[self.o][0] + self.data[self.o][1]
184 assert o_data == res, \
185 "%d-%d data %x not match %s\n" \
186 % (self.i, self.o, o_data, repr(self.data[self.o]))
187 self.o += 1
188 if self.o == len(self.data):
189 break
190
191
192 def testbench4(dut):
193 data = []
194 for i in range(num_tests):
195 #data.append(randint(0, 1<<16-1))
196 data.append(i+1)
197 i = 0
198 o = 0
199 while True:
200 stall = randint(0, 3) != 0
201 send = randint(0, 5) != 0
202 yield dut.i.n_ready.eq(stall)
203 o_p_ready = yield dut.o.p_ready
204 if o_p_ready:
205 if send and i != len(data):
206 yield dut.i.p_valid.eq(1)
207 yield dut.i.data.eq(data[i])
208 i += 1
209 else:
210 yield dut.i.p_valid.eq(0)
211 yield
212 o_n_valid = yield dut.o.n_valid
213 i_n_ready = yield dut.i.n_ready
214 if o_n_valid and i_n_ready:
215 o_data = yield dut.o.data
216 assert o_data == data[o] + 2, "%d-%d data %x not match %x\n" \
217 % (i, o, o_data, data[o])
218 o += 1
219 if o == len(data):
220 break
221
222
223 class ExampleBufPipe2:
224 """
225 connect these: ------|---------------|
226 v v
227 i_p_valid >>in pipe1 o_n_valid out>> i_p_valid >>in pipe2
228 o_p_ready <<out pipe1 i_n_ready <<in o_p_ready <<out pipe2
229 i_data >>in pipe1 o_data out>> i_data >>in pipe2
230 """
231 def __init__(self):
232 self.pipe1 = ExampleBufPipe()
233 self.pipe2 = ExampleBufPipe()
234
235 # input
236 self.i = IOAckIn()
237 self.i.data = Signal(32) # >>in - comes in from the PREVIOUS stage
238
239 # output
240 self.o = IOAckOut()
241 self.o.data = Signal(32) # out>> - goes out to the NEXT stage
242
243 def elaborate(self, platform):
244 m = Module()
245 m.submodules.pipe1 = self.pipe1
246 m.submodules.pipe2 = self.pipe2
247
248 # connect inter-pipe input/output valid/ready/data
249 m.d.comb += self.pipe1.connect_next(self.pipe2)
250
251 # inputs/outputs to the module: pipe1 connections here (LHS)
252 m.d.comb += self.pipe1.connect_in(self)
253
254 # now pipe2 connections (RHS)
255 m.d.comb += self.pipe2.connect_out(self)
256
257 return m
258
259 num_tests = 1000
260
261 if __name__ == '__main__':
262 print ("test 1")
263 dut = ExampleBufPipe()
264 run_simulation(dut, testbench(dut), vcd_name="test_bufpipe.vcd")
265
266 print ("test 2")
267 dut = ExampleBufPipe2()
268 run_simulation(dut, testbench2(dut), vcd_name="test_bufpipe2.vcd")
269
270 print ("test 3")
271 dut = ExampleBufPipe()
272 test = Test3(dut)
273 run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe3.vcd")
274
275 print ("test 3.5")
276 dut = ExampleCombPipe()
277 test = Test3(dut)
278 run_simulation(dut, [test.send, test.rcv], vcd_name="test_combpipe3.vcd")
279
280 print ("test 4")
281 dut = ExampleBufPipe2()
282 run_simulation(dut, testbench4(dut), vcd_name="test_bufpipe4.vcd")
283
284 print ("test 5")
285 dut = ExampleBufPipeAdd()
286 test = Test5(dut)
287 run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe5.vcd")
288