lib.fifo.AsyncFIFO: fix incorrect latency of r_level.
[nmigen.git] / tests / test_lib_fifo.py
1 # nmigen: UnusedElaboratable=no
2
3 from nmigen.hdl import *
4 from nmigen.asserts import *
5 from nmigen.sim import *
6 from nmigen.lib.fifo import *
7
8 from .utils import *
9
10
11 class FIFOTestCase(FHDLTestCase):
12 def test_depth_wrong(self):
13 with self.assertRaisesRegex(TypeError,
14 r"^FIFO width must be a non-negative integer, not -1$"):
15 FIFOInterface(width=-1, depth=8, fwft=True)
16 with self.assertRaisesRegex(TypeError,
17 r"^FIFO depth must be a non-negative integer, not -1$"):
18 FIFOInterface(width=8, depth=-1, fwft=True)
19
20 def test_sync_depth(self):
21 self.assertEqual(SyncFIFO(width=8, depth=0).depth, 0)
22 self.assertEqual(SyncFIFO(width=8, depth=1).depth, 1)
23 self.assertEqual(SyncFIFO(width=8, depth=2).depth, 2)
24
25 def test_sync_buffered_depth(self):
26 self.assertEqual(SyncFIFOBuffered(width=8, depth=0).depth, 0)
27 self.assertEqual(SyncFIFOBuffered(width=8, depth=1).depth, 1)
28 self.assertEqual(SyncFIFOBuffered(width=8, depth=2).depth, 2)
29
30 def test_async_depth(self):
31 self.assertEqual(AsyncFIFO(width=8, depth=0 ).depth, 0)
32 self.assertEqual(AsyncFIFO(width=8, depth=1 ).depth, 1)
33 self.assertEqual(AsyncFIFO(width=8, depth=2 ).depth, 2)
34 self.assertEqual(AsyncFIFO(width=8, depth=3 ).depth, 4)
35 self.assertEqual(AsyncFIFO(width=8, depth=4 ).depth, 4)
36 self.assertEqual(AsyncFIFO(width=8, depth=15).depth, 16)
37 self.assertEqual(AsyncFIFO(width=8, depth=16).depth, 16)
38 self.assertEqual(AsyncFIFO(width=8, depth=17).depth, 32)
39
40 def test_async_depth_wrong(self):
41 with self.assertRaisesRegex(ValueError,
42 (r"^AsyncFIFO only supports depths that are powers of 2; "
43 r"requested exact depth 15 is not$")):
44 AsyncFIFO(width=8, depth=15, exact_depth=True)
45
46 def test_async_buffered_depth(self):
47 self.assertEqual(AsyncFIFOBuffered(width=8, depth=0 ).depth, 0)
48 self.assertEqual(AsyncFIFOBuffered(width=8, depth=1 ).depth, 2)
49 self.assertEqual(AsyncFIFOBuffered(width=8, depth=2 ).depth, 2)
50 self.assertEqual(AsyncFIFOBuffered(width=8, depth=3 ).depth, 3)
51 self.assertEqual(AsyncFIFOBuffered(width=8, depth=4 ).depth, 5)
52 self.assertEqual(AsyncFIFOBuffered(width=8, depth=15).depth, 17)
53 self.assertEqual(AsyncFIFOBuffered(width=8, depth=16).depth, 17)
54 self.assertEqual(AsyncFIFOBuffered(width=8, depth=17).depth, 17)
55 self.assertEqual(AsyncFIFOBuffered(width=8, depth=18).depth, 33)
56
57 def test_async_buffered_depth_wrong(self):
58 with self.assertRaisesRegex(ValueError,
59 (r"^AsyncFIFOBuffered only supports depths that are one higher than powers of 2; "
60 r"requested exact depth 16 is not$")):
61 AsyncFIFOBuffered(width=8, depth=16, exact_depth=True)
62
63 class FIFOModel(Elaboratable, FIFOInterface):
64 """
65 Non-synthesizable first-in first-out queue, implemented naively as a chain of registers.
66 """
67 def __init__(self, *, width, depth, fwft, r_domain, w_domain):
68 super().__init__(width=width, depth=depth, fwft=fwft)
69
70 self.r_domain = r_domain
71 self.w_domain = w_domain
72
73 self.level = Signal(range(self.depth + 1))
74 self.r_level = Signal(range(self.depth + 1))
75 self.w_level = Signal(range(self.depth + 1))
76
77 def elaborate(self, platform):
78 m = Module()
79
80 storage = Memory(width=self.width, depth=self.depth)
81 w_port = m.submodules.w_port = storage.write_port(domain=self.w_domain)
82 r_port = m.submodules.r_port = storage.read_port (domain="comb")
83
84 produce = Signal(range(self.depth))
85 consume = Signal(range(self.depth))
86
87 m.d.comb += self.r_rdy.eq(self.level > 0)
88 m.d.comb += r_port.addr.eq((consume + 1) % self.depth)
89 if self.fwft:
90 m.d.comb += self.r_data.eq(r_port.data)
91 with m.If(self.r_en & self.r_rdy):
92 if not self.fwft:
93 m.d[self.r_domain] += self.r_data.eq(r_port.data)
94 m.d[self.r_domain] += consume.eq(r_port.addr)
95
96 m.d.comb += self.w_rdy.eq(self.level < self.depth)
97 m.d.comb += w_port.data.eq(self.w_data)
98 with m.If(self.w_en & self.w_rdy):
99 m.d.comb += w_port.addr.eq((produce + 1) % self.depth)
100 m.d.comb += w_port.en.eq(1)
101 m.d[self.w_domain] += produce.eq(w_port.addr)
102
103 with m.If(ResetSignal(self.r_domain) | ResetSignal(self.w_domain)):
104 m.d.sync += self.level.eq(0)
105 with m.Else():
106 m.d.sync += self.level.eq(self.level
107 + (self.w_rdy & self.w_en)
108 - (self.r_rdy & self.r_en))
109
110 m.d.comb += [
111 self.r_level.eq(self.level),
112 self.w_level.eq(self.level),
113 ]
114 m.d.comb += Assert(ResetSignal(self.r_domain) == ResetSignal(self.w_domain))
115
116 return m
117
118
119 class FIFOModelEquivalenceSpec(Elaboratable):
120 """
121 The first-in first-out queue model equivalence specification: for any inputs and control
122 signals, the behavior of the implementation under test exactly matches the ideal model,
123 except for behavior not defined by the model.
124 """
125 def __init__(self, fifo, r_domain, w_domain):
126 self.fifo = fifo
127
128 self.r_domain = r_domain
129 self.w_domain = w_domain
130
131 def elaborate(self, platform):
132 m = Module()
133 m.submodules.dut = dut = self.fifo
134 m.submodules.gold = gold = FIFOModel(width=dut.width, depth=dut.depth, fwft=dut.fwft,
135 r_domain=self.r_domain, w_domain=self.w_domain)
136
137 m.d.comb += [
138 gold.r_en.eq(dut.r_rdy & dut.r_en),
139 gold.w_en.eq(dut.w_en),
140 gold.w_data.eq(dut.w_data),
141 ]
142
143 m.d.comb += Assert(dut.r_rdy.implies(gold.r_rdy))
144 m.d.comb += Assert(dut.w_rdy.implies(gold.w_rdy))
145 m.d.comb += Assert(dut.r_level == gold.r_level)
146 m.d.comb += Assert(dut.w_level == gold.w_level)
147
148 if dut.fwft:
149 m.d.comb += Assert(dut.r_rdy
150 .implies(dut.r_data == gold.r_data))
151 else:
152 m.d.comb += Assert((Past(dut.r_rdy, domain=self.r_domain) &
153 Past(dut.r_en, domain=self.r_domain))
154 .implies(dut.r_data == gold.r_data))
155
156 return m
157
158
159 class FIFOContractSpec(Elaboratable):
160 """
161 The first-in first-out queue contract specification: if two elements are written to the queue
162 consecutively, they must be read out consecutively at some later point, no matter all other
163 circumstances, with the exception of reset.
164 """
165 def __init__(self, fifo, *, r_domain, w_domain, bound):
166 self.fifo = fifo
167 self.r_domain = r_domain
168 self.w_domain = w_domain
169 self.bound = bound
170
171 def elaborate(self, platform):
172 m = Module()
173 m.submodules.dut = fifo = self.fifo
174
175 m.domains += ClockDomain("sync")
176 m.d.comb += ResetSignal().eq(0)
177 if self.w_domain != "sync":
178 m.domains += ClockDomain(self.w_domain)
179 m.d.comb += ResetSignal(self.w_domain).eq(0)
180 if self.r_domain != "sync":
181 m.domains += ClockDomain(self.r_domain)
182 m.d.comb += ResetSignal(self.r_domain).eq(0)
183
184 entry_1 = AnyConst(fifo.width)
185 entry_2 = AnyConst(fifo.width)
186
187 with m.FSM(domain=self.w_domain) as write_fsm:
188 with m.State("WRITE-1"):
189 with m.If(fifo.w_rdy):
190 m.d.comb += [
191 fifo.w_data.eq(entry_1),
192 fifo.w_en.eq(1)
193 ]
194 m.next = "WRITE-2"
195 with m.State("WRITE-2"):
196 with m.If(fifo.w_rdy):
197 m.d.comb += [
198 fifo.w_data.eq(entry_2),
199 fifo.w_en.eq(1)
200 ]
201 m.next = "DONE"
202 with m.State("DONE"):
203 pass
204
205 with m.FSM(domain=self.r_domain) as read_fsm:
206 read_1 = Signal(fifo.width)
207 read_2 = Signal(fifo.width)
208 with m.State("READ"):
209 m.d.comb += fifo.r_en.eq(1)
210 if fifo.fwft:
211 r_rdy = fifo.r_rdy
212 else:
213 r_rdy = Past(fifo.r_rdy, domain=self.r_domain)
214 with m.If(r_rdy):
215 m.d.sync += [
216 read_1.eq(read_2),
217 read_2.eq(fifo.r_data),
218 ]
219 with m.If((read_1 == entry_1) & (read_2 == entry_2)):
220 m.next = "DONE"
221 with m.State("DONE"):
222 pass
223
224 with m.If(Initial()):
225 m.d.comb += Assume(write_fsm.ongoing("WRITE-1"))
226 m.d.comb += Assume(read_fsm.ongoing("READ"))
227 with m.If(Past(Initial(), self.bound - 1)):
228 m.d.comb += Assert(read_fsm.ongoing("DONE"))
229
230 with m.If(ResetSignal(domain=self.w_domain)):
231 m.d.comb += Assert(~fifo.r_rdy)
232
233 if self.w_domain != "sync" or self.r_domain != "sync":
234 m.d.comb += Assume(Rose(ClockSignal(self.w_domain)) |
235 Rose(ClockSignal(self.r_domain)))
236
237 return m
238
239
240 class FIFOFormalCase(FHDLTestCase):
241 def check_sync_fifo(self, fifo):
242 self.assertFormal(FIFOModelEquivalenceSpec(fifo, r_domain="sync", w_domain="sync"),
243 mode="bmc", depth=fifo.depth + 1)
244 self.assertFormal(FIFOContractSpec(fifo, r_domain="sync", w_domain="sync",
245 bound=fifo.depth * 2 + 1),
246 mode="hybrid", depth=fifo.depth * 2 + 1)
247
248 def test_sync_fwft_pot(self):
249 self.check_sync_fifo(SyncFIFO(width=8, depth=4, fwft=True))
250
251 def test_sync_fwft_npot(self):
252 self.check_sync_fifo(SyncFIFO(width=8, depth=5, fwft=True))
253
254 def test_sync_not_fwft_pot(self):
255 self.check_sync_fifo(SyncFIFO(width=8, depth=4, fwft=False))
256
257 def test_sync_not_fwft_npot(self):
258 self.check_sync_fifo(SyncFIFO(width=8, depth=5, fwft=False))
259
260 def test_sync_buffered_pot(self):
261 self.check_sync_fifo(SyncFIFOBuffered(width=8, depth=4))
262
263 def test_sync_buffered_potp1(self):
264 self.check_sync_fifo(SyncFIFOBuffered(width=8, depth=5))
265
266 def test_sync_buffered_potm1(self):
267 self.check_sync_fifo(SyncFIFOBuffered(width=8, depth=3))
268
269 def check_async_fifo(self, fifo):
270 # TODO: properly doing model equivalence checking on this likely requires multiclock,
271 # which is not really documented nor is it clear how to use it.
272 # self.assertFormal(FIFOModelEquivalenceSpec(fifo, r_domain="read", w_domain="write"),
273 # mode="bmc", depth=fifo.depth * 3 + 1)
274 self.assertFormal(FIFOContractSpec(fifo, r_domain="read", w_domain="write",
275 bound=fifo.depth * 4 + 1),
276 mode="hybrid", depth=fifo.depth * 4 + 1)
277
278 def test_async(self):
279 self.check_async_fifo(AsyncFIFO(width=8, depth=4))
280
281 def test_async_buffered(self):
282 self.check_async_fifo(AsyncFIFOBuffered(width=8, depth=4))
283
284
285 class AsyncFIFOSimCase(FHDLTestCase):
286 def test_async_fifo_r_level_latency(self):
287 fifo = AsyncFIFO(width=32, depth=10, r_domain="sync", w_domain="sync")
288
289 ff_syncronizer_latency = 2
290
291 def testbench():
292 for i in range(10):
293 yield fifo.w_data.eq(i)
294 yield fifo.w_en.eq(1)
295 yield
296
297 if (i - ff_syncronizer_latency) > 0:
298 self.assertEqual((yield fifo.r_level), i - ff_syncronizer_latency)
299 else:
300 self.assertEqual((yield fifo.r_level), 0)
301
302 simulator = Simulator(fifo)
303 simulator.add_clock(100e-6)
304 simulator.add_sync_process(testbench)
305 simulator.run()