tests: fix remove unnecessary workaround for some unittest assertions.
[nmigen.git] / nmigen / test / test_lib_fifo.py
1 # nmigen: UnusedElaboratable=no
2
3 from .utils import *
4 from ..hdl import *
5 from ..asserts import *
6 from ..back.pysim import *
7 from ..lib.fifo import *
8
9
10 class FIFOTestCase(FHDLTestCase):
11 def test_depth_wrong(self):
12 with self.assertRaisesRegex(TypeError,
13 r"^FIFO width must be a non-negative integer, not -1$"):
14 FIFOInterface(width=-1, depth=8, fwft=True)
15 with self.assertRaisesRegex(TypeError,
16 r"^FIFO depth must be a non-negative integer, not -1$"):
17 FIFOInterface(width=8, depth=-1, fwft=True)
18
19 def test_sync_depth(self):
20 self.assertEqual(SyncFIFO(width=8, depth=0).depth, 0)
21 self.assertEqual(SyncFIFO(width=8, depth=1).depth, 1)
22 self.assertEqual(SyncFIFO(width=8, depth=2).depth, 2)
23
24 def test_sync_buffered_depth(self):
25 self.assertEqual(SyncFIFOBuffered(width=8, depth=0).depth, 0)
26 self.assertEqual(SyncFIFOBuffered(width=8, depth=1).depth, 1)
27 self.assertEqual(SyncFIFOBuffered(width=8, depth=2).depth, 2)
28
29 def test_async_depth(self):
30 self.assertEqual(AsyncFIFO(width=8, depth=0 ).depth, 0)
31 self.assertEqual(AsyncFIFO(width=8, depth=1 ).depth, 1)
32 self.assertEqual(AsyncFIFO(width=8, depth=2 ).depth, 2)
33 self.assertEqual(AsyncFIFO(width=8, depth=3 ).depth, 4)
34 self.assertEqual(AsyncFIFO(width=8, depth=4 ).depth, 4)
35 self.assertEqual(AsyncFIFO(width=8, depth=15).depth, 16)
36 self.assertEqual(AsyncFIFO(width=8, depth=16).depth, 16)
37 self.assertEqual(AsyncFIFO(width=8, depth=17).depth, 32)
38
39 def test_async_depth_wrong(self):
40 with self.assertRaisesRegex(ValueError,
41 (r"^AsyncFIFO only supports depths that are powers of 2; "
42 r"requested exact depth 15 is not$")):
43 AsyncFIFO(width=8, depth=15, exact_depth=True)
44
45 def test_async_buffered_depth(self):
46 self.assertEqual(AsyncFIFOBuffered(width=8, depth=0 ).depth, 0)
47 self.assertEqual(AsyncFIFOBuffered(width=8, depth=1 ).depth, 2)
48 self.assertEqual(AsyncFIFOBuffered(width=8, depth=2 ).depth, 2)
49 self.assertEqual(AsyncFIFOBuffered(width=8, depth=3 ).depth, 3)
50 self.assertEqual(AsyncFIFOBuffered(width=8, depth=4 ).depth, 5)
51 self.assertEqual(AsyncFIFOBuffered(width=8, depth=15).depth, 17)
52 self.assertEqual(AsyncFIFOBuffered(width=8, depth=16).depth, 17)
53 self.assertEqual(AsyncFIFOBuffered(width=8, depth=17).depth, 17)
54 self.assertEqual(AsyncFIFOBuffered(width=8, depth=18).depth, 33)
55
56 def test_async_buffered_depth_wrong(self):
57 with self.assertRaisesRegex(ValueError,
58 (r"^AsyncFIFOBuffered only supports depths that are one higher than powers of 2; "
59 r"requested exact depth 16 is not$")):
60 AsyncFIFOBuffered(width=8, depth=16, exact_depth=True)
61
62 class FIFOModel(Elaboratable, FIFOInterface):
63 """
64 Non-synthesizable first-in first-out queue, implemented naively as a chain of registers.
65 """
66 def __init__(self, *, width, depth, fwft, r_domain, w_domain):
67 super().__init__(width=width, depth=depth, fwft=fwft)
68
69 self.r_domain = r_domain
70 self.w_domain = w_domain
71
72 self.level = Signal(range(self.depth + 1))
73
74 def elaborate(self, platform):
75 m = Module()
76
77 storage = Memory(width=self.width, depth=self.depth)
78 w_port = m.submodules.w_port = storage.write_port(domain=self.w_domain)
79 r_port = m.submodules.r_port = storage.read_port (domain="comb")
80
81 produce = Signal(range(self.depth))
82 consume = Signal(range(self.depth))
83
84 m.d.comb += self.r_rdy.eq(self.level > 0)
85 m.d.comb += r_port.addr.eq((consume + 1) % self.depth)
86 if self.fwft:
87 m.d.comb += self.r_data.eq(r_port.data)
88 with m.If(self.r_en & self.r_rdy):
89 if not self.fwft:
90 m.d[self.r_domain] += self.r_data.eq(r_port.data)
91 m.d[self.r_domain] += consume.eq(r_port.addr)
92
93 m.d.comb += self.w_rdy.eq(self.level < self.depth)
94 m.d.comb += w_port.data.eq(self.w_data)
95 with m.If(self.w_en & self.w_rdy):
96 m.d.comb += w_port.addr.eq((produce + 1) % self.depth)
97 m.d.comb += w_port.en.eq(1)
98 m.d[self.w_domain] += produce.eq(w_port.addr)
99
100 with m.If(ResetSignal(self.r_domain) | ResetSignal(self.w_domain)):
101 m.d.sync += self.level.eq(0)
102 with m.Else():
103 m.d.sync += self.level.eq(self.level
104 + (self.w_rdy & self.w_en)
105 - (self.r_rdy & self.r_en))
106
107 m.d.comb += Assert(ResetSignal(self.r_domain) == ResetSignal(self.w_domain))
108
109 return m
110
111
112 class FIFOModelEquivalenceSpec(Elaboratable):
113 """
114 The first-in first-out queue model equivalence specification: for any inputs and control
115 signals, the behavior of the implementation under test exactly matches the ideal model,
116 except for behavior not defined by the model.
117 """
118 def __init__(self, fifo, r_domain, w_domain):
119 self.fifo = fifo
120
121 self.r_domain = r_domain
122 self.w_domain = w_domain
123
124 def elaborate(self, platform):
125 m = Module()
126 m.submodules.dut = dut = self.fifo
127 m.submodules.gold = gold = FIFOModel(width=dut.width, depth=dut.depth, fwft=dut.fwft,
128 r_domain=self.r_domain, w_domain=self.w_domain)
129
130 m.d.comb += [
131 gold.r_en.eq(dut.r_rdy & dut.r_en),
132 gold.w_en.eq(dut.w_en),
133 gold.w_data.eq(dut.w_data),
134 ]
135
136 m.d.comb += Assert(dut.r_rdy.implies(gold.r_rdy))
137 m.d.comb += Assert(dut.w_rdy.implies(gold.w_rdy))
138 if hasattr(dut, "level"):
139 m.d.comb += Assert(dut.level == gold.level)
140
141 if dut.fwft:
142 m.d.comb += Assert(dut.r_rdy
143 .implies(dut.r_data == gold.r_data))
144 else:
145 m.d.comb += Assert((Past(dut.r_rdy, domain=self.r_domain) &
146 Past(dut.r_en, domain=self.r_domain))
147 .implies(dut.r_data == gold.r_data))
148
149 return m
150
151
152 class FIFOContractSpec(Elaboratable):
153 """
154 The first-in first-out queue contract specification: if two elements are written to the queue
155 consecutively, they must be read out consecutively at some later point, no matter all other
156 circumstances, with the exception of reset.
157 """
158 def __init__(self, fifo, *, r_domain, w_domain, bound):
159 self.fifo = fifo
160 self.r_domain = r_domain
161 self.w_domain = w_domain
162 self.bound = bound
163
164 def elaborate(self, platform):
165 m = Module()
166 m.submodules.dut = fifo = self.fifo
167
168 m.domains += ClockDomain("sync")
169 m.d.comb += ResetSignal().eq(0)
170 if self.w_domain != "sync":
171 m.domains += ClockDomain(self.w_domain)
172 m.d.comb += ResetSignal(self.w_domain).eq(0)
173 if self.r_domain != "sync":
174 m.domains += ClockDomain(self.r_domain)
175 m.d.comb += ResetSignal(self.r_domain).eq(0)
176
177 entry_1 = AnyConst(fifo.width)
178 entry_2 = AnyConst(fifo.width)
179
180 with m.FSM(domain=self.w_domain) as write_fsm:
181 with m.State("WRITE-1"):
182 with m.If(fifo.w_rdy):
183 m.d.comb += [
184 fifo.w_data.eq(entry_1),
185 fifo.w_en.eq(1)
186 ]
187 m.next = "WRITE-2"
188 with m.State("WRITE-2"):
189 with m.If(fifo.w_rdy):
190 m.d.comb += [
191 fifo.w_data.eq(entry_2),
192 fifo.w_en.eq(1)
193 ]
194 m.next = "DONE"
195 with m.State("DONE"):
196 pass
197
198 with m.FSM(domain=self.r_domain) as read_fsm:
199 read_1 = Signal(fifo.width)
200 read_2 = Signal(fifo.width)
201 with m.State("READ"):
202 m.d.comb += fifo.r_en.eq(1)
203 if fifo.fwft:
204 r_rdy = fifo.r_rdy
205 else:
206 r_rdy = Past(fifo.r_rdy, domain=self.r_domain)
207 with m.If(r_rdy):
208 m.d.sync += [
209 read_1.eq(read_2),
210 read_2.eq(fifo.r_data),
211 ]
212 with m.If((read_1 == entry_1) & (read_2 == entry_2)):
213 m.next = "DONE"
214 with m.State("DONE"):
215 pass
216
217 with m.If(Initial()):
218 m.d.comb += Assume(write_fsm.ongoing("WRITE-1"))
219 m.d.comb += Assume(read_fsm.ongoing("READ"))
220 with m.If(Past(Initial(), self.bound - 1)):
221 m.d.comb += Assert(read_fsm.ongoing("DONE"))
222
223 with m.If(ResetSignal(domain=self.w_domain)):
224 m.d.comb += Assert(~fifo.r_rdy)
225
226 if self.w_domain != "sync" or self.r_domain != "sync":
227 m.d.comb += Assume(Rose(ClockSignal(self.w_domain)) |
228 Rose(ClockSignal(self.r_domain)))
229
230 return m
231
232
233 class FIFOFormalCase(FHDLTestCase):
234 def check_sync_fifo(self, fifo):
235 self.assertFormal(FIFOModelEquivalenceSpec(fifo, r_domain="sync", w_domain="sync"),
236 mode="bmc", depth=fifo.depth + 1)
237 self.assertFormal(FIFOContractSpec(fifo, r_domain="sync", w_domain="sync",
238 bound=fifo.depth * 2 + 1),
239 mode="hybrid", depth=fifo.depth * 2 + 1)
240
241 def test_sync_fwft_pot(self):
242 self.check_sync_fifo(SyncFIFO(width=8, depth=4, fwft=True))
243
244 def test_sync_fwft_npot(self):
245 self.check_sync_fifo(SyncFIFO(width=8, depth=5, fwft=True))
246
247 def test_sync_not_fwft_pot(self):
248 self.check_sync_fifo(SyncFIFO(width=8, depth=4, fwft=False))
249
250 def test_sync_not_fwft_npot(self):
251 self.check_sync_fifo(SyncFIFO(width=8, depth=5, fwft=False))
252
253 def test_sync_buffered_pot(self):
254 self.check_sync_fifo(SyncFIFOBuffered(width=8, depth=4))
255
256 def test_sync_buffered_potp1(self):
257 self.check_sync_fifo(SyncFIFOBuffered(width=8, depth=5))
258
259 def test_sync_buffered_potm1(self):
260 self.check_sync_fifo(SyncFIFOBuffered(width=8, depth=3))
261
262 def check_async_fifo(self, fifo):
263 # TODO: properly doing model equivalence checking on this likely requires multiclock,
264 # which is not really documented nor is it clear how to use it.
265 # self.assertFormal(FIFOModelEquivalenceSpec(fifo, r_domain="read", w_domain="write"),
266 # mode="bmc", depth=fifo.depth * 3 + 1)
267 self.assertFormal(FIFOContractSpec(fifo, r_domain="read", w_domain="write",
268 bound=fifo.depth * 4 + 1),
269 mode="hybrid", depth=fifo.depth * 4 + 1)
270
271 def test_async(self):
272 self.check_async_fifo(AsyncFIFO(width=8, depth=4))
273
274 def test_async_buffered(self):
275 self.check_async_fifo(AsyncFIFOBuffered(width=8, depth=4))