1 from nmigen
import Module
, Signal
2 from nmigen
.compat
.sim
import run_simulation
3 from example_buf_pipe
import ExampleBufPipe
4 from random
import randint
7 def check_o_n_valid(dut
, val
):
8 o_n_valid
= yield dut
.o
.n_valid
9 assert o_n_valid
== val
11 def check_o_n_valid2(dut
, val
):
12 o_n_valid
= yield dut
.o_n_valid
13 assert o_n_valid
== val
17 #yield dut.i_p_rst.eq(1)
18 yield dut
.i
.n_ready
.eq(0)
19 yield dut
.o
.p_ready
.eq(0)
22 #yield dut.i_p_rst.eq(0)
23 yield dut
.i
.n_ready
.eq(1)
24 yield dut
.stage
.i_data
.eq(5)
25 yield dut
.i
.p_valid
.eq(1)
28 yield dut
.stage
.i_data
.eq(7)
29 yield from check_o_n_valid(dut
, 0) # effects of i_p_valid delayed
31 yield from check_o_n_valid(dut
, 1) # ok *now* i_p_valid effect is felt
33 yield dut
.stage
.i_data
.eq(2)
35 yield dut
.i
.n_ready
.eq(0) # begin going into "stall" (next stage says ready)
36 yield dut
.stage
.i_data
.eq(9)
38 yield dut
.i
.p_valid
.eq(0)
39 yield dut
.stage
.i_data
.eq(12)
41 yield dut
.stage
.i_data
.eq(32)
42 yield dut
.i
.n_ready
.eq(1)
44 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
46 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
48 yield from check_o_n_valid(dut
, 0) # buffer outputted, *now* we're done.
53 #yield dut.i.p_rst.eq(1)
54 yield dut
.i_n_ready
.eq(0)
55 #yield dut.o.p_ready.eq(0)
58 #yield dut.i.p_rst.eq(0)
59 yield dut
.i_n_ready
.eq(1)
60 yield dut
.i_data
.eq(5)
61 yield dut
.i_p_valid
.eq(1)
64 yield dut
.i_data
.eq(7)
65 yield from check_o_n_valid2(dut
, 0) # effects of i_p_valid delayed 2 clocks
67 yield from check_o_n_valid2(dut
, 0) # effects of i_p_valid delayed 2 clocks
69 yield dut
.i_data
.eq(2)
71 yield from check_o_n_valid2(dut
, 1) # ok *now* i_p_valid effect is felt
72 yield dut
.i_n_ready
.eq(0) # begin going into "stall" (next stage says ready)
73 yield dut
.i_data
.eq(9)
75 yield dut
.i_p_valid
.eq(0)
76 yield dut
.i_data
.eq(12)
78 yield dut
.i_data
.eq(32)
79 yield dut
.i_n_ready
.eq(1)
81 yield from check_o_n_valid2(dut
, 1) # buffer still needs to output
83 yield from check_o_n_valid2(dut
, 1) # buffer still needs to output
85 yield from check_o_n_valid2(dut
, 1) # buffer still needs to output
87 yield from check_o_n_valid2(dut
, 0) # buffer outputted, *now* we're done.
94 def __init__(self
, dut
):
97 for i
in range(10000):
98 #data.append(randint(0, 1<<16-1))
104 while self
.o
!= len(self
.data
):
105 send_range
= randint(0, 3)
106 for j
in range(randint(1,10)):
110 send
= randint(0, send_range
) != 0
111 o_p_ready
= yield self
.dut
.o
.p_ready
115 if send
and self
.i
!= len(self
.data
):
116 yield self
.dut
.i
.p_valid
.eq(1)
117 yield self
.dut
.stage
.i_data
.eq(self
.data
[self
.i
])
120 yield self
.dut
.i
.p_valid
.eq(0)
124 while self
.o
!= len(self
.data
):
125 stall_range
= randint(0, 3)
126 for j
in range(randint(1,10)):
127 stall
= randint(0, stall_range
) != 0
128 yield self
.dut
.i
.n_ready
.eq(stall
)
130 o_n_valid
= yield self
.dut
.o
.n_valid
131 i_n_ready
= yield self
.dut
.i
.n_ready
132 if not o_n_valid
or not i_n_ready
:
134 o_data
= yield self
.dut
.stage
.o_data
135 assert o_data
== self
.data
[self
.o
] + 1, \
136 "%d-%d data %x not match %x\n" \
137 % (self
.i
, self
.o
, o_data
, self
.data
[self
.o
])
139 if self
.o
== len(self
.data
):
145 for i
in range(10000):
146 #data.append(randint(0, 1<<16-1))
151 stall
= randint(0, 3) != 0
152 send
= randint(0, 5) != 0
153 yield dut
.i_n_ready
.eq(stall
)
154 o_p_ready
= yield dut
.o_p_ready
156 if send
and i
!= len(data
):
157 yield dut
.i_p_valid
.eq(1)
158 yield dut
.i_data
.eq(data
[i
])
161 yield dut
.i_p_valid
.eq(0)
163 o_n_valid
= yield dut
.o_n_valid
164 i_n_ready
= yield dut
.i_n_ready
165 if o_n_valid
and i_n_ready
:
166 o_data
= yield dut
.o_data
167 assert o_data
== data
[o
] + 2, "%d-%d data %x not match %x\n" \
168 % (i
, o
, o_data
, data
[o
])
174 class ExampleBufPipe2
:
176 connect these: ------|---------------|
178 i_p_valid >>in pipe1 o_n_valid out>> i_p_valid >>in pipe2
179 o_p_ready <<out pipe1 i_n_ready <<in o_p_ready <<out pipe2
180 stage.i_data >>in pipe1 o_data out>> stage.i_data >>in pipe2
183 self
.pipe1
= ExampleBufPipe()
184 self
.pipe2
= ExampleBufPipe()
187 self
.i_p_valid
= Signal() # >>in - comes in from PREVIOUS stage
188 self
.i_n_ready
= Signal() # in<< - comes in from the NEXT stage
189 self
.i_data
= Signal(32) # >>in - comes in from the PREVIOUS stage
192 self
.o_n_valid
= Signal() # out>> - goes out to the NEXT stage
193 self
.o_p_ready
= Signal() # <<out - goes out to the PREVIOUS stage
194 self
.o_data
= Signal(32) # out>> - goes out to the NEXT stage
196 def elaborate(self
, platform
):
198 m
.submodules
.pipe1
= self
.pipe1
199 m
.submodules
.pipe2
= self
.pipe2
201 # connect inter-pipe input/output valid/ready/data
202 m
.d
.comb
+= self
.pipe2
.i
.p_valid
.eq(self
.pipe1
.o
.n_valid
)
203 m
.d
.comb
+= self
.pipe1
.i
.n_ready
.eq(self
.pipe2
.o
.p_ready
)
204 m
.d
.comb
+= self
.pipe2
.stage
.i_data
.eq(self
.pipe1
.stage
.o_data
)
206 # inputs/outputs to the module: pipe1 connections here (LHS)
207 m
.d
.comb
+= self
.pipe1
.i
.p_valid
.eq(self
.i_p_valid
)
208 m
.d
.comb
+= self
.o_p_ready
.eq(self
.pipe1
.o
.p_ready
)
209 m
.d
.comb
+= self
.pipe1
.stage
.i_data
.eq(self
.i_data
)
211 # now pipe2 connections (RHS)
212 m
.d
.comb
+= self
.o_n_valid
.eq(self
.pipe2
.o
.n_valid
)
213 m
.d
.comb
+= self
.pipe2
.i
.n_ready
.eq(self
.i_n_ready
)
214 m
.d
.comb
+= self
.o_data
.eq(self
.pipe2
.stage
.o_data
)
218 if __name__
== '__main__':
220 dut
= ExampleBufPipe()
221 run_simulation(dut
, testbench(dut
), vcd_name
="test_bufpipe.vcd")
224 dut
= ExampleBufPipe2()
225 run_simulation(dut
, testbench2(dut
), vcd_name
="test_bufpipe2.vcd")
228 dut
= ExampleBufPipe()
230 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_bufpipe3.vcd")
233 dut
= ExampleBufPipe2()
234 run_simulation(dut
, testbench4(dut
), vcd_name
="test_bufpipe4.vcd")