lib.fifo.AsyncFIFOBuffered: fix FFSynchronizer latency
[nmigen.git] / tests / test_lib_fifo.py
1 # nmigen: UnusedElaboratable=no
2
3 from nmigen.hdl import *
4 from nmigen.asserts import *
5 from nmigen.sim import *
6 from nmigen.lib.fifo import *
7
8 from .utils import *
9
10
11 class FIFOTestCase(FHDLTestCase):
12 def test_depth_wrong(self):
13 with self.assertRaisesRegex(TypeError,
14 r"^FIFO width must be a non-negative integer, not -1$"):
15 FIFOInterface(width=-1, depth=8, fwft=True)
16 with self.assertRaisesRegex(TypeError,
17 r"^FIFO depth must be a non-negative integer, not -1$"):
18 FIFOInterface(width=8, depth=-1, fwft=True)
19
20 def test_sync_depth(self):
21 self.assertEqual(SyncFIFO(width=8, depth=0).depth, 0)
22 self.assertEqual(SyncFIFO(width=8, depth=1).depth, 1)
23 self.assertEqual(SyncFIFO(width=8, depth=2).depth, 2)
24
25 def test_sync_buffered_depth(self):
26 self.assertEqual(SyncFIFOBuffered(width=8, depth=0).depth, 0)
27 self.assertEqual(SyncFIFOBuffered(width=8, depth=1).depth, 1)
28 self.assertEqual(SyncFIFOBuffered(width=8, depth=2).depth, 2)
29
30 def test_async_depth(self):
31 self.assertEqual(AsyncFIFO(width=8, depth=0 ).depth, 0)
32 self.assertEqual(AsyncFIFO(width=8, depth=1 ).depth, 1)
33 self.assertEqual(AsyncFIFO(width=8, depth=2 ).depth, 2)
34 self.assertEqual(AsyncFIFO(width=8, depth=3 ).depth, 4)
35 self.assertEqual(AsyncFIFO(width=8, depth=4 ).depth, 4)
36 self.assertEqual(AsyncFIFO(width=8, depth=15).depth, 16)
37 self.assertEqual(AsyncFIFO(width=8, depth=16).depth, 16)
38 self.assertEqual(AsyncFIFO(width=8, depth=17).depth, 32)
39
40 def test_async_depth_wrong(self):
41 with self.assertRaisesRegex(ValueError,
42 (r"^AsyncFIFO only supports depths that are powers of 2; "
43 r"requested exact depth 15 is not$")):
44 AsyncFIFO(width=8, depth=15, exact_depth=True)
45
46 def test_async_buffered_depth(self):
47 self.assertEqual(AsyncFIFOBuffered(width=8, depth=0 ).depth, 0)
48 self.assertEqual(AsyncFIFOBuffered(width=8, depth=1 ).depth, 2)
49 self.assertEqual(AsyncFIFOBuffered(width=8, depth=2 ).depth, 2)
50 self.assertEqual(AsyncFIFOBuffered(width=8, depth=3 ).depth, 3)
51 self.assertEqual(AsyncFIFOBuffered(width=8, depth=4 ).depth, 5)
52 self.assertEqual(AsyncFIFOBuffered(width=8, depth=15).depth, 17)
53 self.assertEqual(AsyncFIFOBuffered(width=8, depth=16).depth, 17)
54 self.assertEqual(AsyncFIFOBuffered(width=8, depth=17).depth, 17)
55 self.assertEqual(AsyncFIFOBuffered(width=8, depth=18).depth, 33)
56
57 def test_async_buffered_depth_wrong(self):
58 with self.assertRaisesRegex(ValueError,
59 (r"^AsyncFIFOBuffered only supports depths that are one higher than powers of 2; "
60 r"requested exact depth 16 is not$")):
61 AsyncFIFOBuffered(width=8, depth=16, exact_depth=True)
62
63 class FIFOModel(Elaboratable, FIFOInterface):
64 """
65 Non-synthesizable first-in first-out queue, implemented naively as a chain of registers.
66 """
67 def __init__(self, *, width, depth, fwft, r_domain, w_domain):
68 super().__init__(width=width, depth=depth, fwft=fwft)
69
70 self.r_domain = r_domain
71 self.w_domain = w_domain
72
73 self.level = Signal(range(self.depth + 1))
74 self.r_level = Signal(range(self.depth + 1))
75 self.w_level = Signal(range(self.depth + 1))
76
77 def elaborate(self, platform):
78 m = Module()
79
80 storage = Memory(width=self.width, depth=self.depth)
81 w_port = m.submodules.w_port = storage.write_port(domain=self.w_domain)
82 r_port = m.submodules.r_port = storage.read_port (domain="comb")
83
84 produce = Signal(range(self.depth))
85 consume = Signal(range(self.depth))
86
87 m.d.comb += self.r_rdy.eq(self.level > 0)
88 m.d.comb += r_port.addr.eq((consume + 1) % self.depth)
89 if self.fwft:
90 m.d.comb += self.r_data.eq(r_port.data)
91 with m.If(self.r_en & self.r_rdy):
92 if not self.fwft:
93 m.d[self.r_domain] += self.r_data.eq(r_port.data)
94 m.d[self.r_domain] += consume.eq(r_port.addr)
95
96 m.d.comb += self.w_rdy.eq(self.level < self.depth)
97 m.d.comb += w_port.data.eq(self.w_data)
98 with m.If(self.w_en & self.w_rdy):
99 m.d.comb += w_port.addr.eq((produce + 1) % self.depth)
100 m.d.comb += w_port.en.eq(1)
101 m.d[self.w_domain] += produce.eq(w_port.addr)
102
103 with m.If(ResetSignal(self.r_domain) | ResetSignal(self.w_domain)):
104 m.d.sync += self.level.eq(0)
105 with m.Else():
106 m.d.sync += self.level.eq(self.level
107 + (self.w_rdy & self.w_en)
108 - (self.r_rdy & self.r_en))
109
110 m.d.comb += [
111 self.r_level.eq(self.level),
112 self.w_level.eq(self.level),
113 ]
114 m.d.comb += Assert(ResetSignal(self.r_domain) == ResetSignal(self.w_domain))
115
116 return m
117
118
119 class FIFOModelEquivalenceSpec(Elaboratable):
120 """
121 The first-in first-out queue model equivalence specification: for any inputs and control
122 signals, the behavior of the implementation under test exactly matches the ideal model,
123 except for behavior not defined by the model.
124 """
125 def __init__(self, fifo, r_domain, w_domain):
126 self.fifo = fifo
127
128 self.r_domain = r_domain
129 self.w_domain = w_domain
130
131 def elaborate(self, platform):
132 m = Module()
133 m.submodules.dut = dut = self.fifo
134 m.submodules.gold = gold = FIFOModel(width=dut.width, depth=dut.depth, fwft=dut.fwft,
135 r_domain=self.r_domain, w_domain=self.w_domain)
136
137 m.d.comb += [
138 gold.r_en.eq(dut.r_rdy & dut.r_en),
139 gold.w_en.eq(dut.w_en),
140 gold.w_data.eq(dut.w_data),
141 ]
142
143 m.d.comb += Assert(dut.r_rdy.implies(gold.r_rdy))
144 m.d.comb += Assert(dut.w_rdy.implies(gold.w_rdy))
145 m.d.comb += Assert(dut.r_level == gold.r_level)
146 m.d.comb += Assert(dut.w_level == gold.w_level)
147
148 if dut.fwft:
149 m.d.comb += Assert(dut.r_rdy
150 .implies(dut.r_data == gold.r_data))
151 else:
152 m.d.comb += Assert((Past(dut.r_rdy, domain=self.r_domain) &
153 Past(dut.r_en, domain=self.r_domain))
154 .implies(dut.r_data == gold.r_data))
155
156 return m
157
158
159 class FIFOContractSpec(Elaboratable):
160 """
161 The first-in first-out queue contract specification: if two elements are written to the queue
162 consecutively, they must be read out consecutively at some later point, no matter all other
163 circumstances, with the exception of reset.
164 """
165 def __init__(self, fifo, *, r_domain, w_domain, bound):
166 self.fifo = fifo
167 self.r_domain = r_domain
168 self.w_domain = w_domain
169 self.bound = bound
170
171 def elaborate(self, platform):
172 m = Module()
173 m.submodules.dut = fifo = self.fifo
174
175 m.domains += ClockDomain("sync")
176 m.d.comb += ResetSignal().eq(0)
177 if self.w_domain != "sync":
178 m.domains += ClockDomain(self.w_domain)
179 m.d.comb += ResetSignal(self.w_domain).eq(0)
180 if self.r_domain != "sync":
181 m.domains += ClockDomain(self.r_domain)
182 m.d.comb += ResetSignal(self.r_domain).eq(0)
183
184 entry_1 = AnyConst(fifo.width)
185 entry_2 = AnyConst(fifo.width)
186
187 with m.FSM(domain=self.w_domain) as write_fsm:
188 with m.State("WRITE-1"):
189 with m.If(fifo.w_rdy):
190 m.d.comb += [
191 fifo.w_data.eq(entry_1),
192 fifo.w_en.eq(1)
193 ]
194 m.next = "WRITE-2"
195 with m.State("WRITE-2"):
196 with m.If(fifo.w_rdy):
197 m.d.comb += [
198 fifo.w_data.eq(entry_2),
199 fifo.w_en.eq(1)
200 ]
201 m.next = "DONE"
202 with m.State("DONE"):
203 pass
204
205 with m.FSM(domain=self.r_domain) as read_fsm:
206 read_1 = Signal(fifo.width)
207 read_2 = Signal(fifo.width)
208 with m.State("READ"):
209 m.d.comb += fifo.r_en.eq(1)
210 if fifo.fwft:
211 r_rdy = fifo.r_rdy
212 else:
213 r_rdy = Past(fifo.r_rdy, domain=self.r_domain)
214 with m.If(r_rdy):
215 m.d.sync += [
216 read_1.eq(read_2),
217 read_2.eq(fifo.r_data),
218 ]
219 with m.If((read_1 == entry_1) & (read_2 == entry_2)):
220 m.next = "DONE"
221 with m.State("DONE"):
222 pass
223
224 with m.If(Initial()):
225 m.d.comb += Assume(write_fsm.ongoing("WRITE-1"))
226 m.d.comb += Assume(read_fsm.ongoing("READ"))
227 with m.If(Past(Initial(), self.bound - 1)):
228 m.d.comb += Assert(read_fsm.ongoing("DONE"))
229
230 with m.If(ResetSignal(domain=self.w_domain)):
231 m.d.comb += Assert(~fifo.r_rdy)
232
233 if self.w_domain != "sync" or self.r_domain != "sync":
234 m.d.comb += Assume(Rose(ClockSignal(self.w_domain)) |
235 Rose(ClockSignal(self.r_domain)))
236
237 return m
238
239
240 class FIFOFormalCase(FHDLTestCase):
241 def check_sync_fifo(self, fifo):
242 self.assertFormal(FIFOModelEquivalenceSpec(fifo, r_domain="sync", w_domain="sync"),
243 mode="bmc", depth=fifo.depth + 1)
244 self.assertFormal(FIFOContractSpec(fifo, r_domain="sync", w_domain="sync",
245 bound=fifo.depth * 2 + 1),
246 mode="hybrid", depth=fifo.depth * 2 + 1)
247
248 def test_sync_fwft_pot(self):
249 self.check_sync_fifo(SyncFIFO(width=8, depth=4, fwft=True))
250
251 def test_sync_fwft_npot(self):
252 self.check_sync_fifo(SyncFIFO(width=8, depth=5, fwft=True))
253
254 def test_sync_not_fwft_pot(self):
255 self.check_sync_fifo(SyncFIFO(width=8, depth=4, fwft=False))
256
257 def test_sync_not_fwft_npot(self):
258 self.check_sync_fifo(SyncFIFO(width=8, depth=5, fwft=False))
259
260 def test_sync_buffered_pot(self):
261 self.check_sync_fifo(SyncFIFOBuffered(width=8, depth=4))
262
263 def test_sync_buffered_potp1(self):
264 self.check_sync_fifo(SyncFIFOBuffered(width=8, depth=5))
265
266 def test_sync_buffered_potm1(self):
267 self.check_sync_fifo(SyncFIFOBuffered(width=8, depth=3))
268
269 def check_async_fifo(self, fifo):
270 # TODO: properly doing model equivalence checking on this likely requires multiclock,
271 # which is not really documented nor is it clear how to use it.
272 # self.assertFormal(FIFOModelEquivalenceSpec(fifo, r_domain="read", w_domain="write"),
273 # mode="bmc", depth=fifo.depth * 3 + 1)
274 self.assertFormal(FIFOContractSpec(fifo, r_domain="read", w_domain="write",
275 bound=fifo.depth * 4 + 1),
276 mode="hybrid", depth=fifo.depth * 4 + 1)
277
278 def test_async(self):
279 self.check_async_fifo(AsyncFIFO(width=8, depth=4))
280
281 def test_async_buffered(self):
282 self.check_async_fifo(AsyncFIFOBuffered(width=8, depth=4))
283
284
285 # we need this testcase because we cant do model equivalence checking on the async fifos (at the moment)
286 class AsyncFIFOSimCase(FHDLTestCase):
287 def test_async_fifo_r_level_latency(self):
288 fifo = AsyncFIFO(width=32, depth=10, r_domain="sync", w_domain="sync")
289
290 ff_syncronizer_latency = 2
291
292 def testbench():
293 for i in range(10):
294 yield fifo.w_data.eq(i)
295 yield fifo.w_en.eq(1)
296 yield
297
298 if (i - ff_syncronizer_latency) > 0:
299 self.assertEqual((yield fifo.r_level), i - ff_syncronizer_latency)
300 else:
301 self.assertEqual((yield fifo.r_level), 0)
302
303 simulator = Simulator(fifo)
304 simulator.add_clock(100e-6)
305 simulator.add_sync_process(testbench)
306 simulator.run()
307
308 def check_async_fifo_level(self, fifo, fill_in, expected_level):
309 write_done = Signal()
310
311 def write_process():
312 for i in range(fill_in):
313 yield fifo.w_data.eq(i)
314 yield fifo.w_en.eq(1)
315 yield Tick("write")
316 yield fifo.w_en.eq(0)
317 yield Tick("write")
318 yield Tick("write")
319 self.assertEqual((yield fifo.w_level), expected_level)
320 yield write_done.eq(1)
321
322 def read_process():
323 while not (yield write_done):
324 yield Tick("read")
325 self.assertEqual((yield fifo.r_level), expected_level)
326
327 simulator = Simulator(fifo)
328 simulator.add_clock(100e-6, domain="write")
329 simulator.add_sync_process(write_process, domain="write")
330 simulator.add_clock(50e-6, domain="read")
331 simulator.add_sync_process(read_process, domain="read")
332 with simulator.write_vcd("test.vcd"):
333 simulator.run()
334
335 def test_async_fifo_level(self):
336 fifo = AsyncFIFO(width=32, depth=8, r_domain="read", w_domain="write")
337 self.check_async_fifo_level(fifo, fill_in=5, expected_level=5)
338
339 def test_async_fifo_level_full(self):
340 fifo = AsyncFIFO(width=32, depth=8, r_domain="read", w_domain="write")
341 self.check_async_fifo_level(fifo, fill_in=10, expected_level=8)
342
343 def test_async_buffered_fifo_level(self):
344 fifo = AsyncFIFOBuffered(width=32, depth=9, r_domain="read", w_domain="write")
345 self.check_async_fifo_level(fifo, fill_in=5, expected_level=5)
346
347 def test_async_buffered_fifo_level_only_three(self):
348 fifo = AsyncFIFOBuffered(width=32, depth=9, r_domain="read", w_domain="write")
349 self.check_async_fifo_level(fifo, fill_in=3, expected_level=3)
350
351 def test_async_buffered_fifo_level_full(self):
352 fifo = AsyncFIFOBuffered(width=32, depth=9, r_domain="read", w_domain="write")
353 self.check_async_fifo_level(fifo, fill_in=10, expected_level=9)