1 from random
import randint
3 from nmigen
import Module
, Signal
, Cat
4 from nmigen
.compat
.sim
import run_simulation
5 from nmigen
.cli
import verilog
, rtlil
7 from multipipe
import (CombMultiInPipeline
, PriorityCombMuxInPipe
)
12 self
.mid
= Signal(2, reset_less
=True)
13 self
.idx
= Signal(6, reset_less
=True)
14 self
.data
= Signal(16, reset_less
=True)
17 return [self
.mid
.eq(i
.mid
), self
.idx
.eq(i
.idx
), self
.data
.eq(i
.data
)]
20 return [self
.mid
, self
.idx
, self
.data
]
23 class PassThroughStage
:
27 return self
.ispec() # same as ospec
29 return i
# pass-through
34 stb
= yield dut
.out_op
.stb
36 ack
= yield dut
.out_op
.ack
40 yield dut
.rs
[1].in_op
[0].eq(5)
41 yield dut
.rs
[1].stb
.eq(0b01) # strobe indicate 1st op ready
42 #yield dut.rs[1].ack.eq(1)
45 # check row 1 output (should be inactive)
46 decode
= yield dut
.rs
[1].out_decode
49 op0
= yield dut
.rs
[1].out_op
[0]
50 op1
= yield dut
.rs
[1].out_op
[1]
51 assert op0
== 0 and op1
== 0
53 # output should be inactive
54 out_stb
= yield dut
.out_op
.stb
58 yield dut
.rs
[1].in_op
[1].eq(6)
59 yield dut
.rs
[1].stb
.eq(0b11) # strobe indicate both ops ready
61 # set acknowledgement of output... takes 1 cycle to respond
62 yield dut
.out_op
.ack
.eq(1)
64 yield dut
.out_op
.ack
.eq(0) # clear ack on output
65 yield dut
.rs
[1].stb
.eq(0) # clear row 1 strobe
67 # output strobe should be active, MID should be 0 until "ack" is set...
68 out_stb
= yield dut
.out_op
.stb
70 out_mid
= yield dut
.mid
73 # ... and output should not yet be passed through either
74 op0
= yield dut
.out_op
.v
[0]
75 op1
= yield dut
.out_op
.v
[1]
76 assert op0
== 0 and op1
== 0
78 # wait for out_op.ack to activate...
79 yield dut
.rs
[1].stb
.eq(0b00) # set row 1 strobes to zero
82 # *now* output should be passed through
83 op0
= yield dut
.out_op
.v
[0]
84 op1
= yield dut
.out_op
.v
[1]
85 assert op0
== 5 and op1
== 6
88 yield dut
.rs
[2].in_op
[0].eq(3)
89 yield dut
.rs
[2].in_op
[1].eq(4)
90 yield dut
.rs
[2].stb
.eq(0b11) # strobe indicate 1st op ready
91 yield dut
.out_op
.ack
.eq(1) # set output ack
93 yield dut
.rs
[2].stb
.eq(0) # clear row 2 strobe
94 yield dut
.out_op
.ack
.eq(0) # set output ack
96 op0
= yield dut
.out_op
.v
[0]
97 op1
= yield dut
.out_op
.v
[1]
98 assert op0
== 3 and op1
== 4, "op0 %d op1 %d" % (op0
, op1
)
99 out_mid
= yield dut
.mid
102 # set row 0 and 3 input
103 yield dut
.rs
[0].in_op
[0].eq(9)
104 yield dut
.rs
[0].in_op
[1].eq(8)
105 yield dut
.rs
[0].stb
.eq(0b11) # strobe indicate 1st op ready
106 yield dut
.rs
[3].in_op
[0].eq(1)
107 yield dut
.rs
[3].in_op
[1].eq(2)
108 yield dut
.rs
[3].stb
.eq(0b11) # strobe indicate 1st op ready
110 # set acknowledgement of output... takes 1 cycle to respond
111 yield dut
.out_op
.ack
.eq(1)
113 yield dut
.rs
[0].stb
.eq(0) # clear row 1 strobe
115 out_mid
= yield dut
.mid
116 assert out_mid
== 0, "out mid %d" % out_mid
119 yield dut
.rs
[3].stb
.eq(0) # clear row 1 strobe
120 yield dut
.out_op
.ack
.eq(0) # clear ack on output
122 out_mid
= yield dut
.mid
123 assert out_mid
== 3, "out mid %d" % out_mid
127 def __init__(self
, dut
):
132 for mid
in range(dut
.num_rows
):
135 for i
in range(self
.tlen
):
136 self
.di
[mid
][i
] = randint(0, 100) + (mid
<<8)
137 self
.do
[mid
][i
] = self
.di
[mid
][i
]
140 for i
in range(self
.tlen
):
141 op2
= self
.di
[mid
][i
]
143 yield rs
.i_valid
.eq(1)
144 yield rs
.i_data
.data
.eq(op2
)
145 yield rs
.i_data
.idx
.eq(i
)
146 yield rs
.i_data
.mid
.eq(mid
)
148 o_p_ready
= yield rs
.o_ready
151 o_p_ready
= yield rs
.o_ready
153 print ("send", mid
, i
, hex(op2
))
155 yield rs
.i_valid
.eq(0)
156 # wait random period of time before queueing another value
157 for i
in range(randint(0, 3)):
160 yield rs
.i_valid
.eq(0)
161 ## wait random period of time before queueing another value
162 #for i in range(randint(0, 3)):
165 #send_range = randint(0, 3)
169 # send = randint(0, send_range) != 0
173 #stall_range = randint(0, 3)
174 #for j in range(randint(1,10)):
175 # stall = randint(0, stall_range) != 0
176 # yield self.dut.n[0].i_ready.eq(stall)
179 yield n
.i_ready
.eq(1)
181 o_n_valid
= yield n
.o_valid
182 i_n_ready
= yield n
.i_ready
183 if not o_n_valid
or not i_n_ready
:
186 mid
= yield n
.o_data
.mid
187 out_i
= yield n
.o_data
.idx
188 out_v
= yield n
.o_data
.data
190 print ("recv", mid
, out_i
, hex(out_v
))
192 # see if this output has occurred already, delete it if it has
193 assert out_i
in self
.do
[mid
], "out_i %d not in array %s" % \
194 (out_i
, repr(self
.do
[mid
]))
195 assert self
.do
[mid
][out_i
] == out_v
# pass-through data
196 del self
.do
[mid
][out_i
]
198 # check if there's any more outputs
200 for (k
, v
) in self
.do
.items():
207 class TestPriorityMuxPipe(PriorityCombMuxInPipe
):
210 stage
= PassThroughStage()
211 PriorityCombMuxInPipe
.__init
__(self
, stage
, p_len
=self
.num_rows
)
214 if __name__
== '__main__':
215 dut
= TestPriorityMuxPipe()
216 vl
= rtlil
.convert(dut
, ports
=dut
.ports())
217 with
open("test_inputgroup_multi.il", "w") as f
:
219 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
221 test
= InputTest(dut
)
222 run_simulation(dut
, [test
.send(1), test
.send(0),
223 test
.send(3), test
.send(2),
225 vcd_name
="test_inputgroup_multi.vcd")