1 from random
import randint
3 from nmigen
import Module
, Signal
, Cat
4 from nmigen
.compat
.sim
import run_simulation
5 from nmigen
.cli
import verilog
, rtlil
7 from multipipe
import CombMultiOutPipeline
8 from singlepipe
import UnbufferedPipeline
11 class MuxUnbufferedPipeline(CombMultiOutPipeline
):
12 def __init__(self
, stage
, n_len
):
13 # HACK: stage is also the n-way multiplexer
14 CombMultiOutPipeline
.__init
__(self
, stage
, n_len
=n_len
, n_mux
=stage
)
16 # HACK: n-mux is also the stage... so set the muxid equal to input mid
17 stage
.m_id
= self
.p
.i_data
.mid
20 return self
.p_mux
.ports()
25 self
.mid
= Signal(2, reset_less
=True)
26 self
.data
= Signal(16, reset_less
=True)
29 return [self
.mid
.eq(i
.mid
), self
.data
.eq(i
.data
)]
32 return [self
.mid
, self
.data
]
35 class PassThroughStage
:
41 return Signal(16, name
="data_out", reset_less
=True)
47 class PassThroughDataStage
:
51 return self
.ispec() # same as ospec
54 return i
# pass-through
58 class PassThroughPipe(UnbufferedPipeline
):
60 UnbufferedPipeline
.__init
__(self
, PassThroughDataStage())
66 stb
= yield dut
.out_op
.stb
68 ack
= yield dut
.out_op
.ack
72 yield dut
.rs
[1].in_op
[0].eq(5)
73 yield dut
.rs
[1].stb
.eq(0b01) # strobe indicate 1st op ready
74 #yield dut.rs[1].ack.eq(1)
77 # check row 1 output (should be inactive)
78 decode
= yield dut
.rs
[1].out_decode
81 op0
= yield dut
.rs
[1].out_op
[0]
82 op1
= yield dut
.rs
[1].out_op
[1]
83 assert op0
== 0 and op1
== 0
85 # output should be inactive
86 out_stb
= yield dut
.out_op
.stb
90 yield dut
.rs
[1].in_op
[1].eq(6)
91 yield dut
.rs
[1].stb
.eq(0b11) # strobe indicate both ops ready
93 # set acknowledgement of output... takes 1 cycle to respond
94 yield dut
.out_op
.ack
.eq(1)
96 yield dut
.out_op
.ack
.eq(0) # clear ack on output
97 yield dut
.rs
[1].stb
.eq(0) # clear row 1 strobe
99 # output strobe should be active, MID should be 0 until "ack" is set...
100 out_stb
= yield dut
.out_op
.stb
102 out_mid
= yield dut
.mid
105 # ... and output should not yet be passed through either
106 op0
= yield dut
.out_op
.v
[0]
107 op1
= yield dut
.out_op
.v
[1]
108 assert op0
== 0 and op1
== 0
110 # wait for out_op.ack to activate...
111 yield dut
.rs
[1].stb
.eq(0b00) # set row 1 strobes to zero
114 # *now* output should be passed through
115 op0
= yield dut
.out_op
.v
[0]
116 op1
= yield dut
.out_op
.v
[1]
117 assert op0
== 5 and op1
== 6
120 yield dut
.rs
[2].in_op
[0].eq(3)
121 yield dut
.rs
[2].in_op
[1].eq(4)
122 yield dut
.rs
[2].stb
.eq(0b11) # strobe indicate 1st op ready
123 yield dut
.out_op
.ack
.eq(1) # set output ack
125 yield dut
.rs
[2].stb
.eq(0) # clear row 2 strobe
126 yield dut
.out_op
.ack
.eq(0) # set output ack
128 op0
= yield dut
.out_op
.v
[0]
129 op1
= yield dut
.out_op
.v
[1]
130 assert op0
== 3 and op1
== 4, "op0 %d op1 %d" % (op0
, op1
)
131 out_mid
= yield dut
.mid
134 # set row 0 and 3 input
135 yield dut
.rs
[0].in_op
[0].eq(9)
136 yield dut
.rs
[0].in_op
[1].eq(8)
137 yield dut
.rs
[0].stb
.eq(0b11) # strobe indicate 1st op ready
138 yield dut
.rs
[3].in_op
[0].eq(1)
139 yield dut
.rs
[3].in_op
[1].eq(2)
140 yield dut
.rs
[3].stb
.eq(0b11) # strobe indicate 1st op ready
142 # set acknowledgement of output... takes 1 cycle to respond
143 yield dut
.out_op
.ack
.eq(1)
145 yield dut
.rs
[0].stb
.eq(0) # clear row 1 strobe
147 out_mid
= yield dut
.mid
148 assert out_mid
== 0, "out mid %d" % out_mid
151 yield dut
.rs
[3].stb
.eq(0) # clear row 1 strobe
152 yield dut
.out_op
.ack
.eq(0) # clear ack on output
154 out_mid
= yield dut
.mid
155 assert out_mid
== 3, "out mid %d" % out_mid
159 def __init__(self
, dut
):
164 for i
in range(self
.tlen
* dut
.num_rows
):
168 mid
= randint(0, dut
.num_rows
-1)
169 data
= randint(0, 255) + (mid
<<8)
170 if mid
not in self
.do
:
172 self
.di
.append((data
, mid
))
173 self
.do
[mid
].append(data
)
176 for i
in range(self
.tlen
* dut
.num_rows
):
180 yield rs
.i_valid
.eq(1)
181 yield rs
.i_data
.data
.eq(op2
)
182 yield rs
.i_data
.mid
.eq(mid
)
184 o_p_ready
= yield rs
.o_ready
187 o_p_ready
= yield rs
.o_ready
189 print ("send", mid
, i
, hex(op2
))
191 yield rs
.i_valid
.eq(0)
192 # wait random period of time before queueing another value
193 for i
in range(randint(0, 3)):
196 yield rs
.i_valid
.eq(0)
201 stall_range
= randint(0, 3)
202 while out_i
!= len(self
.do
[mid
]):
204 assert count
!= 2000, "timeout: too long"
206 yield n
.i_ready
.eq(1)
208 o_n_valid
= yield n
.o_valid
209 i_n_ready
= yield n
.i_ready
210 if not o_n_valid
or not i_n_ready
:
213 out_v
= yield n
.o_data
215 print ("recv", mid
, out_i
, hex(out_v
))
217 assert self
.do
[mid
][out_i
] == out_v
# pass-through data
221 if randint(0, 5) == 0:
222 stall_range
= randint(0, 3)
223 stall
= randint(0, stall_range
) != 0
225 yield n
.i_ready
.eq(0)
226 for i
in range(stall_range
):
230 class TestPriorityMuxPipe(MuxUnbufferedPipeline
):
231 def __init__(self
, num_rows
):
232 self
.num_rows
= num_rows
233 stage
= PassThroughStage()
234 MuxUnbufferedPipeline
.__init
__(self
, stage
, n_len
=self
.num_rows
)
237 res
= [self
.p
.i_valid
, self
.p
.o_ready
] + \
238 self
.p
.i_data
.ports()
239 for i
in range(len(self
.n
)):
240 res
+= [self
.n
[i
].i_ready
, self
.n
[i
].o_valid
] + \
242 #self.n[i].o_data.ports()
246 class TestSyncToPriorityPipe
:
249 self
.pipe
= PassThroughPipe()
250 self
.muxpipe
= TestPriorityMuxPipe(self
.num_rows
)
253 self
.n
= self
.muxpipe
.n
255 def elaborate(self
, platform
):
257 m
.submodules
+= self
.pipe
258 m
.submodules
+= self
.muxpipe
259 m
.d
.comb
+= self
.pipe
.n
.connect_to_next(self
.muxpipe
.p
)
263 res
= [self
.p
.i_valid
, self
.p
.o_ready
] + \
264 self
.p
.i_data
.ports()
265 for i
in range(len(self
.n
)):
266 res
+= [self
.n
[i
].i_ready
, self
.n
[i
].o_valid
] + \
268 #self.n[i].o_data.ports()
272 if __name__
== '__main__':
273 dut
= TestSyncToPriorityPipe()
274 vl
= rtlil
.convert(dut
, ports
=dut
.ports())
275 with
open("test_outmux_pipe.il", "w") as f
:
277 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
279 test
= OutputTest(dut
)
280 run_simulation(dut
, [test
.rcv(1), test
.rcv(0),
281 test
.rcv(3), test
.rcv(2),
283 vcd_name
="test_outmux_pipe.vcd")