1 from random
import randint
3 from nmigen
import Module
, Signal
, Cat
4 from nmigen
.compat
.sim
import run_simulation
5 from nmigen
.cli
import verilog
, rtlil
7 from multipipe
import CombMultiInPipeline
, InputPriorityArbiter
11 class PriorityUnbufferedPipeline(CombMultiInPipeline
):
12 def __init__(self
, stage
, p_len
=4):
13 p_mux
= InputPriorityArbiter(self
, p_len
)
14 CombMultiInPipeline
.__init
__(self
, stage
, p_len
=p_len
, p_mux
=p_mux
)
17 return self
.p_mux
.ports()
18 #return UnbufferedPipeline.ports(self) + self.p_mux.ports()
22 self
.mid
= Signal(2, reset_less
=True)
23 self
.idx
= Signal(6, reset_less
=True)
24 self
.data
= Signal(16, reset_less
=True)
27 return [self
.mid
.eq(i
.mid
), self
.idx
.eq(i
.idx
), self
.data
.eq(i
.data
)]
30 return [self
.mid
, self
.idx
, self
.data
]
32 class PassThroughStage
:
36 return self
.ispec() # same as ospec
39 return i
# pass-through
44 stb
= yield dut
.out_op
.stb
46 ack
= yield dut
.out_op
.ack
50 yield dut
.rs
[1].in_op
[0].eq(5)
51 yield dut
.rs
[1].stb
.eq(0b01) # strobe indicate 1st op ready
52 #yield dut.rs[1].ack.eq(1)
55 # check row 1 output (should be inactive)
56 decode
= yield dut
.rs
[1].out_decode
59 op0
= yield dut
.rs
[1].out_op
[0]
60 op1
= yield dut
.rs
[1].out_op
[1]
61 assert op0
== 0 and op1
== 0
63 # output should be inactive
64 out_stb
= yield dut
.out_op
.stb
68 yield dut
.rs
[1].in_op
[1].eq(6)
69 yield dut
.rs
[1].stb
.eq(0b11) # strobe indicate both ops ready
71 # set acknowledgement of output... takes 1 cycle to respond
72 yield dut
.out_op
.ack
.eq(1)
74 yield dut
.out_op
.ack
.eq(0) # clear ack on output
75 yield dut
.rs
[1].stb
.eq(0) # clear row 1 strobe
77 # output strobe should be active, MID should be 0 until "ack" is set...
78 out_stb
= yield dut
.out_op
.stb
80 out_mid
= yield dut
.mid
83 # ... and output should not yet be passed through either
84 op0
= yield dut
.out_op
.v
[0]
85 op1
= yield dut
.out_op
.v
[1]
86 assert op0
== 0 and op1
== 0
88 # wait for out_op.ack to activate...
89 yield dut
.rs
[1].stb
.eq(0b00) # set row 1 strobes to zero
92 # *now* output should be passed through
93 op0
= yield dut
.out_op
.v
[0]
94 op1
= yield dut
.out_op
.v
[1]
95 assert op0
== 5 and op1
== 6
98 yield dut
.rs
[2].in_op
[0].eq(3)
99 yield dut
.rs
[2].in_op
[1].eq(4)
100 yield dut
.rs
[2].stb
.eq(0b11) # strobe indicate 1st op ready
101 yield dut
.out_op
.ack
.eq(1) # set output ack
103 yield dut
.rs
[2].stb
.eq(0) # clear row 2 strobe
104 yield dut
.out_op
.ack
.eq(0) # set output ack
106 op0
= yield dut
.out_op
.v
[0]
107 op1
= yield dut
.out_op
.v
[1]
108 assert op0
== 3 and op1
== 4, "op0 %d op1 %d" % (op0
, op1
)
109 out_mid
= yield dut
.mid
112 # set row 0 and 3 input
113 yield dut
.rs
[0].in_op
[0].eq(9)
114 yield dut
.rs
[0].in_op
[1].eq(8)
115 yield dut
.rs
[0].stb
.eq(0b11) # strobe indicate 1st op ready
116 yield dut
.rs
[3].in_op
[0].eq(1)
117 yield dut
.rs
[3].in_op
[1].eq(2)
118 yield dut
.rs
[3].stb
.eq(0b11) # strobe indicate 1st op ready
120 # set acknowledgement of output... takes 1 cycle to respond
121 yield dut
.out_op
.ack
.eq(1)
123 yield dut
.rs
[0].stb
.eq(0) # clear row 1 strobe
125 out_mid
= yield dut
.mid
126 assert out_mid
== 0, "out mid %d" % out_mid
129 yield dut
.rs
[3].stb
.eq(0) # clear row 1 strobe
130 yield dut
.out_op
.ack
.eq(0) # clear ack on output
132 out_mid
= yield dut
.mid
133 assert out_mid
== 3, "out mid %d" % out_mid
137 def __init__(self
, dut
):
142 for mid
in range(dut
.num_rows
):
145 for i
in range(self
.tlen
):
146 self
.di
[mid
][i
] = randint(0, 100) + (mid
<<8)
147 self
.do
[mid
][i
] = self
.di
[mid
][i
]
150 for i
in range(self
.tlen
):
151 op2
= self
.di
[mid
][i
]
153 yield rs
.i_valid
.eq(1)
154 yield rs
.i_data
.data
.eq(op2
)
155 yield rs
.i_data
.idx
.eq(i
)
156 yield rs
.i_data
.mid
.eq(mid
)
158 o_p_ready
= yield rs
.o_ready
161 o_p_ready
= yield rs
.o_ready
163 print ("send", mid
, i
, hex(op2
))
165 yield rs
.i_valid
.eq(0)
166 # wait random period of time before queueing another value
167 for i
in range(randint(0, 3)):
170 yield rs
.i_valid
.eq(0)
171 ## wait random period of time before queueing another value
172 #for i in range(randint(0, 3)):
175 #send_range = randint(0, 3)
179 # send = randint(0, send_range) != 0
183 #stall_range = randint(0, 3)
184 #for j in range(randint(1,10)):
185 # stall = randint(0, stall_range) != 0
186 # yield self.dut.n[0].i_ready.eq(stall)
189 yield n
.i_ready
.eq(1)
191 o_n_valid
= yield n
.o_valid
192 i_n_ready
= yield n
.i_ready
193 if not o_n_valid
or not i_n_ready
:
196 mid
= yield n
.o_data
.mid
197 out_i
= yield n
.o_data
.idx
198 out_v
= yield n
.o_data
.data
200 print ("recv", mid
, out_i
, hex(out_v
))
202 # see if this output has occurred already, delete it if it has
203 assert out_i
in self
.do
[mid
], "out_i %d not in array %s" % \
204 (out_i
, repr(self
.do
[mid
]))
205 assert self
.do
[mid
][out_i
] == out_v
# pass-through data
206 del self
.do
[mid
][out_i
]
208 # check if there's any more outputs
210 for (k
, v
) in self
.do
.items():
217 class TestPriorityMuxPipe(PriorityUnbufferedPipeline
):
220 stage
= PassThroughStage()
221 PriorityUnbufferedPipeline
.__init
__(self
, stage
, p_len
=self
.num_rows
)
225 for i
in range(len(self
.p
)):
226 res
+= [self
.p
[i
].i_valid
, self
.p
[i
].o_ready
] + \
227 self
.p
[i
].i_data
.ports()
228 res
+= [self
.n
.i_ready
, self
.n
.o_valid
] + \
229 self
.n
.o_data
.ports()
233 if __name__
== '__main__':
234 dut
= TestPriorityMuxPipe()
235 vl
= rtlil
.convert(dut
, ports
=dut
.ports())
236 with
open("test_inputgroup_multi.il", "w") as f
:
238 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
240 test
= InputTest(dut
)
241 run_simulation(dut
, [test
.send(1), test
.send(0),
242 #test.send(3), test.send(2),
244 vcd_name
="test_inputgroup_multi.vcd")