lib.fifo: add `r_level` and `w_level` to all FIFOs
[nmigen.git] / nmigen / test / test_lib_fifo.py
1 # nmigen: UnusedElaboratable=no
2
3 from .utils import *
4 from ..hdl import *
5 from ..asserts import *
6 from ..back.pysim import *
7 from ..lib.fifo import *
8
9
10 class FIFOTestCase(FHDLTestCase):
11 def test_depth_wrong(self):
12 with self.assertRaisesRegex(TypeError,
13 r"^FIFO width must be a non-negative integer, not -1$"):
14 FIFOInterface(width=-1, depth=8, fwft=True)
15 with self.assertRaisesRegex(TypeError,
16 r"^FIFO depth must be a non-negative integer, not -1$"):
17 FIFOInterface(width=8, depth=-1, fwft=True)
18
19 def test_sync_depth(self):
20 self.assertEqual(SyncFIFO(width=8, depth=0).depth, 0)
21 self.assertEqual(SyncFIFO(width=8, depth=1).depth, 1)
22 self.assertEqual(SyncFIFO(width=8, depth=2).depth, 2)
23
24 def test_sync_buffered_depth(self):
25 self.assertEqual(SyncFIFOBuffered(width=8, depth=0).depth, 0)
26 self.assertEqual(SyncFIFOBuffered(width=8, depth=1).depth, 1)
27 self.assertEqual(SyncFIFOBuffered(width=8, depth=2).depth, 2)
28
29 def test_async_depth(self):
30 self.assertEqual(AsyncFIFO(width=8, depth=0 ).depth, 0)
31 self.assertEqual(AsyncFIFO(width=8, depth=1 ).depth, 1)
32 self.assertEqual(AsyncFIFO(width=8, depth=2 ).depth, 2)
33 self.assertEqual(AsyncFIFO(width=8, depth=3 ).depth, 4)
34 self.assertEqual(AsyncFIFO(width=8, depth=4 ).depth, 4)
35 self.assertEqual(AsyncFIFO(width=8, depth=15).depth, 16)
36 self.assertEqual(AsyncFIFO(width=8, depth=16).depth, 16)
37 self.assertEqual(AsyncFIFO(width=8, depth=17).depth, 32)
38
39 def test_async_depth_wrong(self):
40 with self.assertRaisesRegex(ValueError,
41 (r"^AsyncFIFO only supports depths that are powers of 2; "
42 r"requested exact depth 15 is not$")):
43 AsyncFIFO(width=8, depth=15, exact_depth=True)
44
45 def test_async_buffered_depth(self):
46 self.assertEqual(AsyncFIFOBuffered(width=8, depth=0 ).depth, 0)
47 self.assertEqual(AsyncFIFOBuffered(width=8, depth=1 ).depth, 2)
48 self.assertEqual(AsyncFIFOBuffered(width=8, depth=2 ).depth, 2)
49 self.assertEqual(AsyncFIFOBuffered(width=8, depth=3 ).depth, 3)
50 self.assertEqual(AsyncFIFOBuffered(width=8, depth=4 ).depth, 5)
51 self.assertEqual(AsyncFIFOBuffered(width=8, depth=15).depth, 17)
52 self.assertEqual(AsyncFIFOBuffered(width=8, depth=16).depth, 17)
53 self.assertEqual(AsyncFIFOBuffered(width=8, depth=17).depth, 17)
54 self.assertEqual(AsyncFIFOBuffered(width=8, depth=18).depth, 33)
55
56 def test_async_buffered_depth_wrong(self):
57 with self.assertRaisesRegex(ValueError,
58 (r"^AsyncFIFOBuffered only supports depths that are one higher than powers of 2; "
59 r"requested exact depth 16 is not$")):
60 AsyncFIFOBuffered(width=8, depth=16, exact_depth=True)
61
62 class FIFOModel(Elaboratable, FIFOInterface):
63 """
64 Non-synthesizable first-in first-out queue, implemented naively as a chain of registers.
65 """
66 def __init__(self, *, width, depth, fwft, r_domain, w_domain):
67 super().__init__(width=width, depth=depth, fwft=fwft)
68
69 self.r_domain = r_domain
70 self.w_domain = w_domain
71
72 self.level = Signal(range(self.depth + 1))
73 self.r_level = Signal(range(self.depth + 1))
74 self.w_level = Signal(range(self.depth + 1))
75
76 def elaborate(self, platform):
77 m = Module()
78
79 storage = Memory(width=self.width, depth=self.depth)
80 w_port = m.submodules.w_port = storage.write_port(domain=self.w_domain)
81 r_port = m.submodules.r_port = storage.read_port (domain="comb")
82
83 produce = Signal(range(self.depth))
84 consume = Signal(range(self.depth))
85
86 m.d.comb += self.r_rdy.eq(self.level > 0)
87 m.d.comb += r_port.addr.eq((consume + 1) % self.depth)
88 if self.fwft:
89 m.d.comb += self.r_data.eq(r_port.data)
90 with m.If(self.r_en & self.r_rdy):
91 if not self.fwft:
92 m.d[self.r_domain] += self.r_data.eq(r_port.data)
93 m.d[self.r_domain] += consume.eq(r_port.addr)
94
95 m.d.comb += self.w_rdy.eq(self.level < self.depth)
96 m.d.comb += w_port.data.eq(self.w_data)
97 with m.If(self.w_en & self.w_rdy):
98 m.d.comb += w_port.addr.eq((produce + 1) % self.depth)
99 m.d.comb += w_port.en.eq(1)
100 m.d[self.w_domain] += produce.eq(w_port.addr)
101
102 with m.If(ResetSignal(self.r_domain) | ResetSignal(self.w_domain)):
103 m.d.sync += self.level.eq(0)
104 with m.Else():
105 m.d.sync += self.level.eq(self.level
106 + (self.w_rdy & self.w_en)
107 - (self.r_rdy & self.r_en))
108
109 m.d.comb += [
110 self.r_level.eq(self.level),
111 self.w_level.eq(self.level),
112 ]
113 m.d.comb += Assert(ResetSignal(self.r_domain) == ResetSignal(self.w_domain))
114
115 return m
116
117
118 class FIFOModelEquivalenceSpec(Elaboratable):
119 """
120 The first-in first-out queue model equivalence specification: for any inputs and control
121 signals, the behavior of the implementation under test exactly matches the ideal model,
122 except for behavior not defined by the model.
123 """
124 def __init__(self, fifo, r_domain, w_domain):
125 self.fifo = fifo
126
127 self.r_domain = r_domain
128 self.w_domain = w_domain
129
130 def elaborate(self, platform):
131 m = Module()
132 m.submodules.dut = dut = self.fifo
133 m.submodules.gold = gold = FIFOModel(width=dut.width, depth=dut.depth, fwft=dut.fwft,
134 r_domain=self.r_domain, w_domain=self.w_domain)
135
136 m.d.comb += [
137 gold.r_en.eq(dut.r_rdy & dut.r_en),
138 gold.w_en.eq(dut.w_en),
139 gold.w_data.eq(dut.w_data),
140 ]
141
142 m.d.comb += Assert(dut.r_rdy.implies(gold.r_rdy))
143 m.d.comb += Assert(dut.w_rdy.implies(gold.w_rdy))
144 m.d.comb += Assert(dut.r_level == gold.r_level)
145 m.d.comb += Assert(dut.w_level == gold.w_level)
146
147 if dut.fwft:
148 m.d.comb += Assert(dut.r_rdy
149 .implies(dut.r_data == gold.r_data))
150 else:
151 m.d.comb += Assert((Past(dut.r_rdy, domain=self.r_domain) &
152 Past(dut.r_en, domain=self.r_domain))
153 .implies(dut.r_data == gold.r_data))
154
155 return m
156
157
158 class FIFOContractSpec(Elaboratable):
159 """
160 The first-in first-out queue contract specification: if two elements are written to the queue
161 consecutively, they must be read out consecutively at some later point, no matter all other
162 circumstances, with the exception of reset.
163 """
164 def __init__(self, fifo, *, r_domain, w_domain, bound):
165 self.fifo = fifo
166 self.r_domain = r_domain
167 self.w_domain = w_domain
168 self.bound = bound
169
170 def elaborate(self, platform):
171 m = Module()
172 m.submodules.dut = fifo = self.fifo
173
174 m.domains += ClockDomain("sync")
175 m.d.comb += ResetSignal().eq(0)
176 if self.w_domain != "sync":
177 m.domains += ClockDomain(self.w_domain)
178 m.d.comb += ResetSignal(self.w_domain).eq(0)
179 if self.r_domain != "sync":
180 m.domains += ClockDomain(self.r_domain)
181 m.d.comb += ResetSignal(self.r_domain).eq(0)
182
183 entry_1 = AnyConst(fifo.width)
184 entry_2 = AnyConst(fifo.width)
185
186 with m.FSM(domain=self.w_domain) as write_fsm:
187 with m.State("WRITE-1"):
188 with m.If(fifo.w_rdy):
189 m.d.comb += [
190 fifo.w_data.eq(entry_1),
191 fifo.w_en.eq(1)
192 ]
193 m.next = "WRITE-2"
194 with m.State("WRITE-2"):
195 with m.If(fifo.w_rdy):
196 m.d.comb += [
197 fifo.w_data.eq(entry_2),
198 fifo.w_en.eq(1)
199 ]
200 m.next = "DONE"
201 with m.State("DONE"):
202 pass
203
204 with m.FSM(domain=self.r_domain) as read_fsm:
205 read_1 = Signal(fifo.width)
206 read_2 = Signal(fifo.width)
207 with m.State("READ"):
208 m.d.comb += fifo.r_en.eq(1)
209 if fifo.fwft:
210 r_rdy = fifo.r_rdy
211 else:
212 r_rdy = Past(fifo.r_rdy, domain=self.r_domain)
213 with m.If(r_rdy):
214 m.d.sync += [
215 read_1.eq(read_2),
216 read_2.eq(fifo.r_data),
217 ]
218 with m.If((read_1 == entry_1) & (read_2 == entry_2)):
219 m.next = "DONE"
220 with m.State("DONE"):
221 pass
222
223 with m.If(Initial()):
224 m.d.comb += Assume(write_fsm.ongoing("WRITE-1"))
225 m.d.comb += Assume(read_fsm.ongoing("READ"))
226 with m.If(Past(Initial(), self.bound - 1)):
227 m.d.comb += Assert(read_fsm.ongoing("DONE"))
228
229 with m.If(ResetSignal(domain=self.w_domain)):
230 m.d.comb += Assert(~fifo.r_rdy)
231
232 if self.w_domain != "sync" or self.r_domain != "sync":
233 m.d.comb += Assume(Rose(ClockSignal(self.w_domain)) |
234 Rose(ClockSignal(self.r_domain)))
235
236 return m
237
238
239 class FIFOFormalCase(FHDLTestCase):
240 def check_sync_fifo(self, fifo):
241 self.assertFormal(FIFOModelEquivalenceSpec(fifo, r_domain="sync", w_domain="sync"),
242 mode="bmc", depth=fifo.depth + 1)
243 self.assertFormal(FIFOContractSpec(fifo, r_domain="sync", w_domain="sync",
244 bound=fifo.depth * 2 + 1),
245 mode="hybrid", depth=fifo.depth * 2 + 1)
246
247 def test_sync_fwft_pot(self):
248 self.check_sync_fifo(SyncFIFO(width=8, depth=4, fwft=True))
249
250 def test_sync_fwft_npot(self):
251 self.check_sync_fifo(SyncFIFO(width=8, depth=5, fwft=True))
252
253 def test_sync_not_fwft_pot(self):
254 self.check_sync_fifo(SyncFIFO(width=8, depth=4, fwft=False))
255
256 def test_sync_not_fwft_npot(self):
257 self.check_sync_fifo(SyncFIFO(width=8, depth=5, fwft=False))
258
259 def test_sync_buffered_pot(self):
260 self.check_sync_fifo(SyncFIFOBuffered(width=8, depth=4))
261
262 def test_sync_buffered_potp1(self):
263 self.check_sync_fifo(SyncFIFOBuffered(width=8, depth=5))
264
265 def test_sync_buffered_potm1(self):
266 self.check_sync_fifo(SyncFIFOBuffered(width=8, depth=3))
267
268 def check_async_fifo(self, fifo):
269 # TODO: properly doing model equivalence checking on this likely requires multiclock,
270 # which is not really documented nor is it clear how to use it.
271 # self.assertFormal(FIFOModelEquivalenceSpec(fifo, r_domain="read", w_domain="write"),
272 # mode="bmc", depth=fifo.depth * 3 + 1)
273 self.assertFormal(FIFOContractSpec(fifo, r_domain="read", w_domain="write",
274 bound=fifo.depth * 4 + 1),
275 mode="hybrid", depth=fifo.depth * 4 + 1)
276
277 def test_async(self):
278 self.check_async_fifo(AsyncFIFO(width=8, depth=4))
279
280 def test_async_buffered(self):
281 self.check_async_fifo(AsyncFIFOBuffered(width=8, depth=4))