1 from nmigen
import Module
, Signal
2 from nmigen
.compat
.sim
import run_simulation
3 from example_buf_pipe
import ExampleBufPipe
, ExampleBufPipeAdd
4 from example_buf_pipe
import ExampleCombPipe
5 from example_buf_pipe
import IOAckIn
, IOAckOut
6 from random
import randint
9 def check_o_n_valid(dut
, val
):
10 o_n_valid
= yield dut
.o
.n_valid
11 assert o_n_valid
== val
15 #yield dut.i_p_rst.eq(1)
16 yield dut
.i
.n_ready
.eq(0)
17 yield dut
.o
.p_ready
.eq(0)
20 #yield dut.i_p_rst.eq(0)
21 yield dut
.i
.n_ready
.eq(1)
22 yield dut
.i
.data
.eq(5)
23 yield dut
.i
.p_valid
.eq(1)
26 yield dut
.i
.data
.eq(7)
27 yield from check_o_n_valid(dut
, 0) # effects of i_p_valid delayed
29 yield from check_o_n_valid(dut
, 1) # ok *now* i_p_valid effect is felt
31 yield dut
.i
.data
.eq(2)
33 yield dut
.i
.n_ready
.eq(0) # begin going into "stall" (next stage says ready)
34 yield dut
.i
.data
.eq(9)
36 yield dut
.i
.p_valid
.eq(0)
37 yield dut
.i
.data
.eq(12)
39 yield dut
.i
.data
.eq(32)
40 yield dut
.i
.n_ready
.eq(1)
42 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
44 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
46 yield from check_o_n_valid(dut
, 0) # buffer outputted, *now* we're done.
51 #yield dut.i.p_rst.eq(1)
52 yield dut
.i
.n_ready
.eq(0)
53 #yield dut.o.p_ready.eq(0)
56 #yield dut.i.p_rst.eq(0)
57 yield dut
.i
.n_ready
.eq(1)
58 yield dut
.i
.data
.eq(5)
59 yield dut
.i
.p_valid
.eq(1)
62 yield dut
.i
.data
.eq(7)
63 yield from check_o_n_valid(dut
, 0) # effects of i_p_valid delayed 2 clocks
65 yield from check_o_n_valid(dut
, 0) # effects of i_p_valid delayed 2 clocks
67 yield dut
.i
.data
.eq(2)
69 yield from check_o_n_valid(dut
, 1) # ok *now* i_p_valid effect is felt
70 yield dut
.i
.n_ready
.eq(0) # begin going into "stall" (next stage says ready)
71 yield dut
.i
.data
.eq(9)
73 yield dut
.i
.p_valid
.eq(0)
74 yield dut
.i
.data
.eq(12)
76 yield dut
.i
.data
.eq(32)
77 yield dut
.i
.n_ready
.eq(1)
79 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
81 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
83 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
85 yield from check_o_n_valid(dut
, 0) # buffer outputted, *now* we're done.
92 def __init__(self
, dut
):
95 for i
in range(num_tests
):
96 #data.append(randint(0, 1<<16-1))
102 while self
.o
!= len(self
.data
):
103 send_range
= randint(0, 3)
104 for j
in range(randint(1,10)):
108 send
= randint(0, send_range
) != 0
109 o_p_ready
= yield self
.dut
.o
.p_ready
113 if send
and self
.i
!= len(self
.data
):
114 yield self
.dut
.i
.p_valid
.eq(1)
115 yield self
.dut
.i
.data
.eq(self
.data
[self
.i
])
118 yield self
.dut
.i
.p_valid
.eq(0)
122 while self
.o
!= len(self
.data
):
123 stall_range
= randint(0, 3)
124 for j
in range(randint(1,10)):
125 stall
= randint(0, stall_range
) != 0
126 yield self
.dut
.i
.n_ready
.eq(stall
)
128 o_n_valid
= yield self
.dut
.o
.n_valid
129 i_n_ready
= yield self
.dut
.i
.n_ready
130 if not o_n_valid
or not i_n_ready
:
132 o_data
= yield self
.dut
.o
.data
133 assert o_data
== self
.data
[self
.o
] + 1, \
134 "%d-%d data %x not match %x\n" \
135 % (self
.i
, self
.o
, o_data
, self
.data
[self
.o
])
137 if self
.o
== len(self
.data
):
142 def __init__(self
, dut
):
145 for i
in range(num_tests
):
146 self
.data
.append((randint(0, 1<<14), randint(0, 1<<14)))
151 while self
.o
!= len(self
.data
):
152 send_range
= randint(0, 3)
153 for j
in range(randint(1,10)):
157 send
= randint(0, send_range
) != 0
158 o_p_ready
= yield self
.dut
.o
.p_ready
162 if send
and self
.i
!= len(self
.data
):
163 yield self
.dut
.i
.p_valid
.eq(1)
164 for v
in self
.dut
.set_input(self
.data
[self
.i
]):
168 yield self
.dut
.i
.p_valid
.eq(0)
172 while self
.o
!= len(self
.data
):
173 stall_range
= randint(0, 3)
174 for j
in range(randint(1,10)):
175 stall
= randint(0, stall_range
) != 0
176 yield self
.dut
.i
.n_ready
.eq(stall
)
178 o_n_valid
= yield self
.dut
.o
.n_valid
179 i_n_ready
= yield self
.dut
.i
.n_ready
180 if not o_n_valid
or not i_n_ready
:
182 o_data
= yield self
.dut
.o
.data
183 res
= self
.data
[self
.o
][0] + self
.data
[self
.o
][1]
184 assert o_data
== res
, \
185 "%d-%d data %x not match %s\n" \
186 % (self
.i
, self
.o
, o_data
, repr(self
.data
[self
.o
]))
188 if self
.o
== len(self
.data
):
194 for i
in range(num_tests
):
195 #data.append(randint(0, 1<<16-1))
200 stall
= randint(0, 3) != 0
201 send
= randint(0, 5) != 0
202 yield dut
.i
.n_ready
.eq(stall
)
203 o_p_ready
= yield dut
.o
.p_ready
205 if send
and i
!= len(data
):
206 yield dut
.i
.p_valid
.eq(1)
207 yield dut
.i
.data
.eq(data
[i
])
210 yield dut
.i
.p_valid
.eq(0)
212 o_n_valid
= yield dut
.o
.n_valid
213 i_n_ready
= yield dut
.i
.n_ready
214 if o_n_valid
and i_n_ready
:
215 o_data
= yield dut
.o
.data
216 assert o_data
== data
[o
] + 2, "%d-%d data %x not match %x\n" \
217 % (i
, o
, o_data
, data
[o
])
223 class ExampleBufPipe2
:
225 connect these: ------|---------------|
227 i_p_valid >>in pipe1 o_n_valid out>> i_p_valid >>in pipe2
228 o_p_ready <<out pipe1 i_n_ready <<in o_p_ready <<out pipe2
229 i_data >>in pipe1 o_data out>> i_data >>in pipe2
232 self
.pipe1
= ExampleBufPipe()
233 self
.pipe2
= ExampleBufPipe()
237 self
.i
.data
= Signal(32) # >>in - comes in from the PREVIOUS stage
241 self
.o
.data
= Signal(32) # out>> - goes out to the NEXT stage
243 def elaborate(self
, platform
):
245 m
.submodules
.pipe1
= self
.pipe1
246 m
.submodules
.pipe2
= self
.pipe2
248 # connect inter-pipe input/output valid/ready/data
249 m
.d
.comb
+= self
.pipe1
.connect_next(self
.pipe2
)
251 # inputs/outputs to the module: pipe1 connections here (LHS)
252 m
.d
.comb
+= self
.pipe1
.connect_in(self
)
254 # now pipe2 connections (RHS)
255 m
.d
.comb
+= self
.pipe2
.connect_out(self
)
261 if __name__
== '__main__':
263 dut
= ExampleBufPipe()
264 run_simulation(dut
, testbench(dut
), vcd_name
="test_bufpipe.vcd")
267 dut
= ExampleBufPipe2()
268 run_simulation(dut
, testbench2(dut
), vcd_name
="test_bufpipe2.vcd")
271 dut
= ExampleBufPipe()
273 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_bufpipe3.vcd")
276 dut
= ExampleCombPipe()
278 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_combpipe3.vcd")
281 dut
= ExampleBufPipe2()
282 run_simulation(dut
, testbench4(dut
), vcd_name
="test_bufpipe4.vcd")
285 dut
= ExampleBufPipeAdd()
287 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_bufpipe5.vcd")