1 # nmigen: UnusedElaboratable=no
5 from ..asserts
import *
6 from ..back
.pysim
import *
7 from ..lib
.fifo
import *
10 class FIFOTestCase(FHDLTestCase
):
11 def test_depth_wrong(self
):
12 with self
.assertRaisesRegex(TypeError,
13 r
"^FIFO width must be a non-negative integer, not -1$"):
14 FIFOInterface(width
=-1, depth
=8, fwft
=True)
15 with self
.assertRaisesRegex(TypeError,
16 r
"^FIFO depth must be a non-negative integer, not -1$"):
17 FIFOInterface(width
=8, depth
=-1, fwft
=True)
19 def test_sync_depth(self
):
20 self
.assertEqual(SyncFIFO(width
=8, depth
=0).depth
, 0)
21 self
.assertEqual(SyncFIFO(width
=8, depth
=1).depth
, 1)
22 self
.assertEqual(SyncFIFO(width
=8, depth
=2).depth
, 2)
24 def test_sync_buffered_depth(self
):
25 self
.assertEqual(SyncFIFOBuffered(width
=8, depth
=0).depth
, 0)
26 self
.assertEqual(SyncFIFOBuffered(width
=8, depth
=1).depth
, 1)
27 self
.assertEqual(SyncFIFOBuffered(width
=8, depth
=2).depth
, 2)
29 def test_async_depth(self
):
30 self
.assertEqual(AsyncFIFO(width
=8, depth
=0 ).depth
, 0)
31 self
.assertEqual(AsyncFIFO(width
=8, depth
=1 ).depth
, 1)
32 self
.assertEqual(AsyncFIFO(width
=8, depth
=2 ).depth
, 2)
33 self
.assertEqual(AsyncFIFO(width
=8, depth
=3 ).depth
, 4)
34 self
.assertEqual(AsyncFIFO(width
=8, depth
=4 ).depth
, 4)
35 self
.assertEqual(AsyncFIFO(width
=8, depth
=15).depth
, 16)
36 self
.assertEqual(AsyncFIFO(width
=8, depth
=16).depth
, 16)
37 self
.assertEqual(AsyncFIFO(width
=8, depth
=17).depth
, 32)
39 def test_async_depth_wrong(self
):
40 with self
.assertRaisesRegex(ValueError,
41 (r
"^AsyncFIFO only supports depths that are powers of 2; "
42 r
"requested exact depth 15 is not$")):
43 AsyncFIFO(width
=8, depth
=15, exact_depth
=True)
45 def test_async_buffered_depth(self
):
46 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=0 ).depth
, 0)
47 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=1 ).depth
, 2)
48 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=2 ).depth
, 2)
49 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=3 ).depth
, 3)
50 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=4 ).depth
, 5)
51 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=15).depth
, 17)
52 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=16).depth
, 17)
53 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=17).depth
, 17)
54 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=18).depth
, 33)
56 def test_async_buffered_depth_wrong(self
):
57 with self
.assertRaisesRegex(ValueError,
58 (r
"^AsyncFIFOBuffered only supports depths that are one higher than powers of 2; "
59 r
"requested exact depth 16 is not$")):
60 AsyncFIFOBuffered(width
=8, depth
=16, exact_depth
=True)
62 class FIFOModel(Elaboratable
, FIFOInterface
):
64 Non-synthesizable first-in first-out queue, implemented naively as a chain of registers.
66 def __init__(self
, *, width
, depth
, fwft
, r_domain
, w_domain
):
67 super().__init
__(width
=width
, depth
=depth
, fwft
=fwft
)
69 self
.r_domain
= r_domain
70 self
.w_domain
= w_domain
72 self
.level
= Signal(range(self
.depth
+ 1))
73 self
.r_level
= Signal(range(self
.depth
+ 1))
74 self
.w_level
= Signal(range(self
.depth
+ 1))
76 def elaborate(self
, platform
):
79 storage
= Memory(width
=self
.width
, depth
=self
.depth
)
80 w_port
= m
.submodules
.w_port
= storage
.write_port(domain
=self
.w_domain
)
81 r_port
= m
.submodules
.r_port
= storage
.read_port (domain
="comb")
83 produce
= Signal(range(self
.depth
))
84 consume
= Signal(range(self
.depth
))
86 m
.d
.comb
+= self
.r_rdy
.eq(self
.level
> 0)
87 m
.d
.comb
+= r_port
.addr
.eq((consume
+ 1) % self
.depth
)
89 m
.d
.comb
+= self
.r_data
.eq(r_port
.data
)
90 with m
.If(self
.r_en
& self
.r_rdy
):
92 m
.d
[self
.r_domain
] += self
.r_data
.eq(r_port
.data
)
93 m
.d
[self
.r_domain
] += consume
.eq(r_port
.addr
)
95 m
.d
.comb
+= self
.w_rdy
.eq(self
.level
< self
.depth
)
96 m
.d
.comb
+= w_port
.data
.eq(self
.w_data
)
97 with m
.If(self
.w_en
& self
.w_rdy
):
98 m
.d
.comb
+= w_port
.addr
.eq((produce
+ 1) % self
.depth
)
99 m
.d
.comb
+= w_port
.en
.eq(1)
100 m
.d
[self
.w_domain
] += produce
.eq(w_port
.addr
)
102 with m
.If(ResetSignal(self
.r_domain
) |
ResetSignal(self
.w_domain
)):
103 m
.d
.sync
+= self
.level
.eq(0)
105 m
.d
.sync
+= self
.level
.eq(self
.level
106 + (self
.w_rdy
& self
.w_en
)
107 - (self
.r_rdy
& self
.r_en
))
110 self
.r_level
.eq(self
.level
),
111 self
.w_level
.eq(self
.level
),
113 m
.d
.comb
+= Assert(ResetSignal(self
.r_domain
) == ResetSignal(self
.w_domain
))
118 class FIFOModelEquivalenceSpec(Elaboratable
):
120 The first-in first-out queue model equivalence specification: for any inputs and control
121 signals, the behavior of the implementation under test exactly matches the ideal model,
122 except for behavior not defined by the model.
124 def __init__(self
, fifo
, r_domain
, w_domain
):
127 self
.r_domain
= r_domain
128 self
.w_domain
= w_domain
130 def elaborate(self
, platform
):
132 m
.submodules
.dut
= dut
= self
.fifo
133 m
.submodules
.gold
= gold
= FIFOModel(width
=dut
.width
, depth
=dut
.depth
, fwft
=dut
.fwft
,
134 r_domain
=self
.r_domain
, w_domain
=self
.w_domain
)
137 gold
.r_en
.eq(dut
.r_rdy
& dut
.r_en
),
138 gold
.w_en
.eq(dut
.w_en
),
139 gold
.w_data
.eq(dut
.w_data
),
142 m
.d
.comb
+= Assert(dut
.r_rdy
.implies(gold
.r_rdy
))
143 m
.d
.comb
+= Assert(dut
.w_rdy
.implies(gold
.w_rdy
))
144 m
.d
.comb
+= Assert(dut
.r_level
== gold
.r_level
)
145 m
.d
.comb
+= Assert(dut
.w_level
== gold
.w_level
)
148 m
.d
.comb
+= Assert(dut
.r_rdy
149 .implies(dut
.r_data
== gold
.r_data
))
151 m
.d
.comb
+= Assert((Past(dut
.r_rdy
, domain
=self
.r_domain
) &
152 Past(dut
.r_en
, domain
=self
.r_domain
))
153 .implies(dut
.r_data
== gold
.r_data
))
158 class FIFOContractSpec(Elaboratable
):
160 The first-in first-out queue contract specification: if two elements are written to the queue
161 consecutively, they must be read out consecutively at some later point, no matter all other
162 circumstances, with the exception of reset.
164 def __init__(self
, fifo
, *, r_domain
, w_domain
, bound
):
166 self
.r_domain
= r_domain
167 self
.w_domain
= w_domain
170 def elaborate(self
, platform
):
172 m
.submodules
.dut
= fifo
= self
.fifo
174 m
.domains
+= ClockDomain("sync")
175 m
.d
.comb
+= ResetSignal().eq(0)
176 if self
.w_domain
!= "sync":
177 m
.domains
+= ClockDomain(self
.w_domain
)
178 m
.d
.comb
+= ResetSignal(self
.w_domain
).eq(0)
179 if self
.r_domain
!= "sync":
180 m
.domains
+= ClockDomain(self
.r_domain
)
181 m
.d
.comb
+= ResetSignal(self
.r_domain
).eq(0)
183 entry_1
= AnyConst(fifo
.width
)
184 entry_2
= AnyConst(fifo
.width
)
186 with m
.FSM(domain
=self
.w_domain
) as write_fsm
:
187 with m
.State("WRITE-1"):
188 with m
.If(fifo
.w_rdy
):
190 fifo
.w_data
.eq(entry_1
),
194 with m
.State("WRITE-2"):
195 with m
.If(fifo
.w_rdy
):
197 fifo
.w_data
.eq(entry_2
),
201 with m
.State("DONE"):
204 with m
.FSM(domain
=self
.r_domain
) as read_fsm
:
205 read_1
= Signal(fifo
.width
)
206 read_2
= Signal(fifo
.width
)
207 with m
.State("READ"):
208 m
.d
.comb
+= fifo
.r_en
.eq(1)
212 r_rdy
= Past(fifo
.r_rdy
, domain
=self
.r_domain
)
216 read_2
.eq(fifo
.r_data
),
218 with m
.If((read_1
== entry_1
) & (read_2
== entry_2
)):
220 with m
.State("DONE"):
223 with m
.If(Initial()):
224 m
.d
.comb
+= Assume(write_fsm
.ongoing("WRITE-1"))
225 m
.d
.comb
+= Assume(read_fsm
.ongoing("READ"))
226 with m
.If(Past(Initial(), self
.bound
- 1)):
227 m
.d
.comb
+= Assert(read_fsm
.ongoing("DONE"))
229 with m
.If(ResetSignal(domain
=self
.w_domain
)):
230 m
.d
.comb
+= Assert(~fifo
.r_rdy
)
232 if self
.w_domain
!= "sync" or self
.r_domain
!= "sync":
233 m
.d
.comb
+= Assume(Rose(ClockSignal(self
.w_domain
)) |
234 Rose(ClockSignal(self
.r_domain
)))
239 class FIFOFormalCase(FHDLTestCase
):
240 def check_sync_fifo(self
, fifo
):
241 self
.assertFormal(FIFOModelEquivalenceSpec(fifo
, r_domain
="sync", w_domain
="sync"),
242 mode
="bmc", depth
=fifo
.depth
+ 1)
243 self
.assertFormal(FIFOContractSpec(fifo
, r_domain
="sync", w_domain
="sync",
244 bound
=fifo
.depth
* 2 + 1),
245 mode
="hybrid", depth
=fifo
.depth
* 2 + 1)
247 def test_sync_fwft_pot(self
):
248 self
.check_sync_fifo(SyncFIFO(width
=8, depth
=4, fwft
=True))
250 def test_sync_fwft_npot(self
):
251 self
.check_sync_fifo(SyncFIFO(width
=8, depth
=5, fwft
=True))
253 def test_sync_not_fwft_pot(self
):
254 self
.check_sync_fifo(SyncFIFO(width
=8, depth
=4, fwft
=False))
256 def test_sync_not_fwft_npot(self
):
257 self
.check_sync_fifo(SyncFIFO(width
=8, depth
=5, fwft
=False))
259 def test_sync_buffered_pot(self
):
260 self
.check_sync_fifo(SyncFIFOBuffered(width
=8, depth
=4))
262 def test_sync_buffered_potp1(self
):
263 self
.check_sync_fifo(SyncFIFOBuffered(width
=8, depth
=5))
265 def test_sync_buffered_potm1(self
):
266 self
.check_sync_fifo(SyncFIFOBuffered(width
=8, depth
=3))
268 def check_async_fifo(self
, fifo
):
269 # TODO: properly doing model equivalence checking on this likely requires multiclock,
270 # which is not really documented nor is it clear how to use it.
271 # self.assertFormal(FIFOModelEquivalenceSpec(fifo, r_domain="read", w_domain="write"),
272 # mode="bmc", depth=fifo.depth * 3 + 1)
273 self
.assertFormal(FIFOContractSpec(fifo
, r_domain
="read", w_domain
="write",
274 bound
=fifo
.depth
* 4 + 1),
275 mode
="hybrid", depth
=fifo
.depth
* 4 + 1)
277 def test_async(self
):
278 self
.check_async_fifo(AsyncFIFO(width
=8, depth
=4))
280 def test_async_buffered(self
):
281 self
.check_async_fifo(AsyncFIFOBuffered(width
=8, depth
=4))