formal test for PriorityPicker passes
[nmutil.git] / src / nmutil / picker.py
1 # SPDX-License-Identifier: LGPL-3-or-later
2 """ Priority Picker: optimized back-to-back PriorityEncoder and Decoder
3 and MultiPriorityPicker: cascading mutually-exclusive pickers
4
5 This work is funded through NLnet under Grant 2019-02-012
6
7 License: LGPLv3+
8
9
10 PriorityPicker: the input is N bits, the output is N bits wide and
11 only one is enabled.
12
13 MultiPriorityPicker: likewise except that there are M pickers and
14 each output is guaranteed mutually exclusive. Optionally:
15 an "index" (and enable line) is also outputted.
16
17 MultiPriorityPicker is designed for port-selection, when there are
18 multiple "things" (of width N) contending for access to M "ports".
19 When the M=0 "thing" requests a port, it gets allocated port 0
20 (always). However if the M=0 "thing" does *not* request a port,
21 this gives the M=1 "thing" the opportunity to gain access to port 0.
22
23 Given that N may potentially be much greater than M (16 bits wide
24 where M may be e.g. only 4) we can't just ok, "ok so M=N therefore
25 M=0 gets access to port 0, M=1 gets access to port 1" etc.
26 """
27
28 from nmigen import Module, Signal, Cat, Elaboratable, Array, Const, Mux
29 from nmigen.cli import rtlil
30 import math
31
32
33 class PriorityPicker(Elaboratable):
34 """ implements a priority-picker. input: N bits, output: N bits
35
36 * msb_mode is for a MSB-priority picker
37 * reverse_i=True is for convenient reversal of the input bits
38 * reverse_o=True is for convenient reversal of the output bits
39 """
40
41 def __init__(self, wid, msb_mode=False, reverse_i=False, reverse_o=False):
42 self.wid = wid
43 # inputs
44 self.msb_mode = msb_mode
45 self.reverse_i = reverse_i
46 self.reverse_o = reverse_o
47 self.i = Signal(wid, reset_less=True)
48 self.o = Signal(wid, reset_less=True)
49
50 self.en_o = Signal(reset_less=True)
51 "true if any output is true"
52
53 def elaborate(self, platform):
54 m = Module()
55
56 # works by saying, "if all previous bits were zero, we get a chance"
57 res = []
58 ni = Signal(self.wid, reset_less=True)
59 i = list(self.i)
60 if self.reverse_i:
61 i.reverse()
62 if self.msb_mode:
63 i.reverse()
64 m.d.comb += ni.eq(~Cat(*i))
65 prange = list(range(0, self.wid))
66 if self.msb_mode:
67 prange.reverse()
68 for n in prange:
69 t = Signal(name="t%d" % n, reset_less=True)
70 res.append(t)
71 if n == 0:
72 m.d.comb += t.eq(i[n])
73 else:
74 m.d.comb += t.eq(~Cat(ni[n], *i[:n]).bool())
75 if self.reverse_o:
76 res.reverse()
77 # we like Cat(*xxx). turn lists into concatenated bits
78 m.d.comb += self.o.eq(Cat(*res))
79 # useful "is any output enabled" signal
80 m.d.comb += self.en_o.eq(self.o.bool()) # true if 1 input is true
81
82 return m
83
84 def __iter__(self):
85 yield self.i
86 yield self.o
87 yield self.en_o
88
89 def ports(self):
90 return list(self)
91
92
93 class MultiPriorityPicker(Elaboratable):
94 """ implements a multi-input priority picker
95 Mx inputs of N bits, Mx outputs of N bits, only one is set
96
97 Each picker masks out the one below it, such that the first
98 gets top priority, the second cannot have the same bit that
99 the first has set, and so on. To do this, a "mask" accumulates
100 the output from the chain, masking the input to the next chain.
101
102 Also outputted (optional): an index for each picked "thing".
103 """
104
105 def __init__(self, wid, levels, indices=False, multiin=False):
106 self.levels = levels
107 self.wid = wid
108 self.indices = indices
109 self.multiin = multiin
110
111 if multiin:
112 # multiple inputs, multiple outputs.
113 i_l = [] # array of picker outputs
114 for j in range(self.levels):
115 i = Signal(self.wid, name="i_%d" % j, reset_less=True)
116 i_l.append(i)
117 self.i = Array(i_l)
118 else:
119 # only the one input, but multiple (single) bit outputs
120 self.i = Signal(self.wid, reset_less=True)
121
122 # create array of (single-bit) outputs (unary)
123 o_l = [] # array of picker outputs
124 for j in range(self.levels):
125 o = Signal(self.wid, name="o_%d" % j, reset_less=True)
126 o_l.append(o)
127 self.o = Array(o_l)
128
129 # add an array of "enables"
130 self.en_o = Signal(self.levels, name="en_o", reset_less=True)
131
132 if not self.indices:
133 return
134
135 # add an array of indices
136 lidx = math.ceil(math.log2(self.levels))
137 idx_o = [] # store the array of indices
138 for j in range(self.levels):
139 i = Signal(lidx, name="idxo_%d" % j, reset_less=True)
140 idx_o.append(i)
141 self.idx_o = Array(idx_o)
142
143 def elaborate(self, platform):
144 m = Module()
145 comb = m.d.comb
146
147 # create Priority Pickers, accumulate their outputs and prevent
148 # the next one in the chain from selecting that output bit.
149 # the input from the current picker will be "masked" and connected
150 # to the *next* picker on the next loop
151 prev_pp = None
152 p_mask = None
153 pp_l = []
154 for j in range(self.levels):
155 if self.multiin:
156 i = self.i[j]
157 else:
158 i = self.i
159 o = self.o[j]
160 pp = PriorityPicker(self.wid)
161 pp_l.append(pp)
162 setattr(m.submodules, "pp%d" % j, pp)
163 comb += o.eq(pp.o)
164 if prev_pp is None:
165 comb += pp.i.eq(i)
166 p_mask = Const(0, self.wid)
167 else:
168 mask = Signal(self.wid, name="m_%d" % j, reset_less=True)
169 comb += mask.eq(prev_pp.o | p_mask) # accumulate output bits
170 comb += pp.i.eq(i & ~mask) # mask out input
171 p_mask = mask
172 i = pp.i # for input to next round
173 prev_pp = pp
174
175 # accumulate the enables
176 en_l = []
177 for j in range(self.levels):
178 en_l.append(pp_l[j].en_o)
179 # concat accumulated enable bits
180 comb += self.en_o.eq(Cat(*en_l))
181
182 if not self.indices:
183 return m
184
185 # for each picker enabled, pass that out and set a cascading index
186 lidx = math.ceil(math.log2(self.levels))
187 prev_count = None
188 for j in range(self.levels):
189 en_o = pp_l[j].en_o
190 if prev_count is None:
191 comb += self.idx_o[j].eq(0)
192 else:
193 count1 = Signal(lidx, name="count_%d" % j, reset_less=True)
194 comb += count1.eq(prev_count + Const(1, lidx))
195 comb += self.idx_o[j].eq(Mux(en_o, count1, prev_count))
196 prev_count = self.idx_o[j]
197
198 return m
199
200 def __iter__(self):
201 if self.multiin:
202 yield from self.i
203 else:
204 yield self.i
205 yield from self.o
206 if not self.indices:
207 return
208 yield self.en_o
209 yield from self.idx_o
210
211 def ports(self):
212 return list(self)
213
214
215 if __name__ == '__main__':
216 dut = PriorityPicker(16)
217 vl = rtlil.convert(dut, ports=dut.ports())
218 with open("test_picker.il", "w") as f:
219 f.write(vl)
220 dut = MultiPriorityPicker(5, 4, True)
221 vl = rtlil.convert(dut, ports=dut.ports())
222 with open("test_multi_picker.il", "w") as f:
223 f.write(vl)
224 dut = MultiPriorityPicker(5, 4, False, True)
225 vl = rtlil.convert(dut, ports=dut.ports())
226 with open("test_multi_picker_noidx.il", "w") as f:
227 f.write(vl)