1 # nmigen: UnusedElaboratable=no
5 from ..asserts
import *
6 from ..back
.pysim
import *
7 from ..lib
.fifo
import *
10 class FIFOTestCase(FHDLTestCase
):
11 def test_depth_wrong(self
):
12 with self
.assertRaisesRegex(TypeError,
13 r
"^FIFO width must be a non-negative integer, not -1$"):
14 FIFOInterface(width
=-1, depth
=8, fwft
=True)
15 with self
.assertRaisesRegex(TypeError,
16 r
"^FIFO depth must be a non-negative integer, not -1$"):
17 FIFOInterface(width
=8, depth
=-1, fwft
=True)
19 def test_sync_depth(self
):
20 self
.assertEqual(SyncFIFO(width
=8, depth
=0).depth
, 0)
21 self
.assertEqual(SyncFIFO(width
=8, depth
=1).depth
, 1)
22 self
.assertEqual(SyncFIFO(width
=8, depth
=2).depth
, 2)
24 def test_sync_buffered_depth(self
):
25 self
.assertEqual(SyncFIFOBuffered(width
=8, depth
=0).depth
, 0)
26 self
.assertEqual(SyncFIFOBuffered(width
=8, depth
=1).depth
, 1)
27 self
.assertEqual(SyncFIFOBuffered(width
=8, depth
=2).depth
, 2)
29 def test_async_depth(self
):
30 self
.assertEqual(AsyncFIFO(width
=8, depth
=0 ).depth
, 0)
31 self
.assertEqual(AsyncFIFO(width
=8, depth
=1 ).depth
, 1)
32 self
.assertEqual(AsyncFIFO(width
=8, depth
=2 ).depth
, 2)
33 self
.assertEqual(AsyncFIFO(width
=8, depth
=3 ).depth
, 4)
34 self
.assertEqual(AsyncFIFO(width
=8, depth
=4 ).depth
, 4)
35 self
.assertEqual(AsyncFIFO(width
=8, depth
=15).depth
, 16)
36 self
.assertEqual(AsyncFIFO(width
=8, depth
=16).depth
, 16)
37 self
.assertEqual(AsyncFIFO(width
=8, depth
=17).depth
, 32)
39 def test_async_depth_wrong(self
):
40 with self
.assertRaisesRegex(ValueError,
41 (r
"^AsyncFIFO only supports depths that are powers of 2; "
42 r
"requested exact depth 15 is not$")):
43 AsyncFIFO(width
=8, depth
=15, exact_depth
=True)
45 def test_async_buffered_depth(self
):
46 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=0 ).depth
, 0)
47 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=1 ).depth
, 2)
48 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=2 ).depth
, 2)
49 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=3 ).depth
, 3)
50 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=4 ).depth
, 5)
51 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=15).depth
, 17)
52 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=16).depth
, 17)
53 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=17).depth
, 17)
54 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=18).depth
, 33)
56 def test_async_buffered_depth_wrong(self
):
57 with self
.assertRaisesRegex(ValueError,
58 (r
"^AsyncFIFOBuffered only supports depths that are one higher than powers of 2; "
59 r
"requested exact depth 16 is not$")):
60 AsyncFIFOBuffered(width
=8, depth
=16, exact_depth
=True)
62 class FIFOModel(Elaboratable
, FIFOInterface
):
64 Non-synthesizable first-in first-out queue, implemented naively as a chain of registers.
66 def __init__(self
, *, width
, depth
, fwft
, r_domain
, w_domain
):
67 super().__init
__(width
=width
, depth
=depth
, fwft
=fwft
)
69 self
.r_domain
= r_domain
70 self
.w_domain
= w_domain
72 self
.level
= Signal(range(self
.depth
+ 1))
74 def elaborate(self
, platform
):
77 storage
= Memory(width
=self
.width
, depth
=self
.depth
)
78 w_port
= m
.submodules
.w_port
= storage
.write_port(domain
=self
.w_domain
)
79 r_port
= m
.submodules
.r_port
= storage
.read_port (domain
="comb")
81 produce
= Signal(range(self
.depth
))
82 consume
= Signal(range(self
.depth
))
84 m
.d
.comb
+= self
.r_rdy
.eq(self
.level
> 0)
85 m
.d
.comb
+= r_port
.addr
.eq((consume
+ 1) % self
.depth
)
87 m
.d
.comb
+= self
.r_data
.eq(r_port
.data
)
88 with m
.If(self
.r_en
& self
.r_rdy
):
90 m
.d
[self
.r_domain
] += self
.r_data
.eq(r_port
.data
)
91 m
.d
[self
.r_domain
] += consume
.eq(r_port
.addr
)
93 m
.d
.comb
+= self
.w_rdy
.eq(self
.level
< self
.depth
)
94 m
.d
.comb
+= w_port
.data
.eq(self
.w_data
)
95 with m
.If(self
.w_en
& self
.w_rdy
):
96 m
.d
.comb
+= w_port
.addr
.eq((produce
+ 1) % self
.depth
)
97 m
.d
.comb
+= w_port
.en
.eq(1)
98 m
.d
[self
.w_domain
] += produce
.eq(w_port
.addr
)
100 with m
.If(ResetSignal(self
.r_domain
) |
ResetSignal(self
.w_domain
)):
101 m
.d
.sync
+= self
.level
.eq(0)
103 m
.d
.sync
+= self
.level
.eq(self
.level
104 + (self
.w_rdy
& self
.w_en
)
105 - (self
.r_rdy
& self
.r_en
))
107 m
.d
.comb
+= Assert(ResetSignal(self
.r_domain
) == ResetSignal(self
.w_domain
))
112 class FIFOModelEquivalenceSpec(Elaboratable
):
114 The first-in first-out queue model equivalence specification: for any inputs and control
115 signals, the behavior of the implementation under test exactly matches the ideal model,
116 except for behavior not defined by the model.
118 def __init__(self
, fifo
, r_domain
, w_domain
):
121 self
.r_domain
= r_domain
122 self
.w_domain
= w_domain
124 def elaborate(self
, platform
):
126 m
.submodules
.dut
= dut
= self
.fifo
127 m
.submodules
.gold
= gold
= FIFOModel(width
=dut
.width
, depth
=dut
.depth
, fwft
=dut
.fwft
,
128 r_domain
=self
.r_domain
, w_domain
=self
.w_domain
)
131 gold
.r_en
.eq(dut
.r_rdy
& dut
.r_en
),
132 gold
.w_en
.eq(dut
.w_en
),
133 gold
.w_data
.eq(dut
.w_data
),
136 m
.d
.comb
+= Assert(dut
.r_rdy
.implies(gold
.r_rdy
))
137 m
.d
.comb
+= Assert(dut
.w_rdy
.implies(gold
.w_rdy
))
138 if hasattr(dut
, "level"):
139 m
.d
.comb
+= Assert(dut
.level
== gold
.level
)
142 m
.d
.comb
+= Assert(dut
.r_rdy
143 .implies(dut
.r_data
== gold
.r_data
))
145 m
.d
.comb
+= Assert((Past(dut
.r_rdy
, domain
=self
.r_domain
) &
146 Past(dut
.r_en
, domain
=self
.r_domain
))
147 .implies(dut
.r_data
== gold
.r_data
))
152 class FIFOContractSpec(Elaboratable
):
154 The first-in first-out queue contract specification: if two elements are written to the queue
155 consecutively, they must be read out consecutively at some later point, no matter all other
156 circumstances, with the exception of reset.
158 def __init__(self
, fifo
, *, r_domain
, w_domain
, bound
):
160 self
.r_domain
= r_domain
161 self
.w_domain
= w_domain
164 def elaborate(self
, platform
):
166 m
.submodules
.dut
= fifo
= self
.fifo
168 m
.domains
+= ClockDomain("sync")
169 m
.d
.comb
+= ResetSignal().eq(0)
170 if self
.w_domain
!= "sync":
171 m
.domains
+= ClockDomain(self
.w_domain
)
172 m
.d
.comb
+= ResetSignal(self
.w_domain
).eq(0)
173 if self
.r_domain
!= "sync":
174 m
.domains
+= ClockDomain(self
.r_domain
)
175 m
.d
.comb
+= ResetSignal(self
.r_domain
).eq(0)
177 entry_1
= AnyConst(fifo
.width
)
178 entry_2
= AnyConst(fifo
.width
)
180 with m
.FSM(domain
=self
.w_domain
) as write_fsm
:
181 with m
.State("WRITE-1"):
182 with m
.If(fifo
.w_rdy
):
184 fifo
.w_data
.eq(entry_1
),
188 with m
.State("WRITE-2"):
189 with m
.If(fifo
.w_rdy
):
191 fifo
.w_data
.eq(entry_2
),
195 with m
.State("DONE"):
198 with m
.FSM(domain
=self
.r_domain
) as read_fsm
:
199 read_1
= Signal(fifo
.width
)
200 read_2
= Signal(fifo
.width
)
201 with m
.State("READ"):
202 m
.d
.comb
+= fifo
.r_en
.eq(1)
206 r_rdy
= Past(fifo
.r_rdy
, domain
=self
.r_domain
)
210 read_2
.eq(fifo
.r_data
),
212 with m
.If((read_1
== entry_1
) & (read_2
== entry_2
)):
214 with m
.State("DONE"):
217 with m
.If(Initial()):
218 m
.d
.comb
+= Assume(write_fsm
.ongoing("WRITE-1"))
219 m
.d
.comb
+= Assume(read_fsm
.ongoing("READ"))
220 with m
.If(Past(Initial(), self
.bound
- 1)):
221 m
.d
.comb
+= Assert(read_fsm
.ongoing("DONE"))
223 with m
.If(ResetSignal(domain
=self
.w_domain
)):
224 m
.d
.comb
+= Assert(~fifo
.r_rdy
)
226 if self
.w_domain
!= "sync" or self
.r_domain
!= "sync":
227 m
.d
.comb
+= Assume(Rose(ClockSignal(self
.w_domain
)) |
228 Rose(ClockSignal(self
.r_domain
)))
233 class FIFOFormalCase(FHDLTestCase
):
234 def check_sync_fifo(self
, fifo
):
235 self
.assertFormal(FIFOModelEquivalenceSpec(fifo
, r_domain
="sync", w_domain
="sync"),
236 mode
="bmc", depth
=fifo
.depth
+ 1)
237 self
.assertFormal(FIFOContractSpec(fifo
, r_domain
="sync", w_domain
="sync",
238 bound
=fifo
.depth
* 2 + 1),
239 mode
="hybrid", depth
=fifo
.depth
* 2 + 1)
241 def test_sync_fwft_pot(self
):
242 self
.check_sync_fifo(SyncFIFO(width
=8, depth
=4, fwft
=True))
244 def test_sync_fwft_npot(self
):
245 self
.check_sync_fifo(SyncFIFO(width
=8, depth
=5, fwft
=True))
247 def test_sync_not_fwft_pot(self
):
248 self
.check_sync_fifo(SyncFIFO(width
=8, depth
=4, fwft
=False))
250 def test_sync_not_fwft_npot(self
):
251 self
.check_sync_fifo(SyncFIFO(width
=8, depth
=5, fwft
=False))
253 def test_sync_buffered_pot(self
):
254 self
.check_sync_fifo(SyncFIFOBuffered(width
=8, depth
=4))
256 def test_sync_buffered_potp1(self
):
257 self
.check_sync_fifo(SyncFIFOBuffered(width
=8, depth
=5))
259 def test_sync_buffered_potm1(self
):
260 self
.check_sync_fifo(SyncFIFOBuffered(width
=8, depth
=3))
262 def check_async_fifo(self
, fifo
):
263 # TODO: properly doing model equivalence checking on this likely requires multiclock,
264 # which is not really documented nor is it clear how to use it.
265 # self.assertFormal(FIFOModelEquivalenceSpec(fifo, r_domain="read", w_domain="write"),
266 # mode="bmc", depth=fifo.depth * 3 + 1)
267 self
.assertFormal(FIFOContractSpec(fifo
, r_domain
="read", w_domain
="write",
268 bound
=fifo
.depth
* 4 + 1),
269 mode
="hybrid", depth
=fifo
.depth
* 4 + 1)
271 def test_async(self
):
272 self
.check_async_fifo(AsyncFIFO(width
=8, depth
=4))
274 def test_async_buffered(self
):
275 self
.check_async_fifo(AsyncFIFOBuffered(width
=8, depth
=4))