83dc825313ca6d153f100f88c3d6973aa631afd7
1 # nmigen: UnusedElaboratable=no
3 from nmigen
.hdl
import *
4 from nmigen
.asserts
import *
5 from nmigen
.back
.pysim
import *
6 from nmigen
.lib
.fifo
import *
11 class FIFOTestCase(FHDLTestCase
):
12 def test_depth_wrong(self
):
13 with self
.assertRaisesRegex(TypeError,
14 r
"^FIFO width must be a non-negative integer, not -1$"):
15 FIFOInterface(width
=-1, depth
=8, fwft
=True)
16 with self
.assertRaisesRegex(TypeError,
17 r
"^FIFO depth must be a non-negative integer, not -1$"):
18 FIFOInterface(width
=8, depth
=-1, fwft
=True)
20 def test_sync_depth(self
):
21 self
.assertEqual(SyncFIFO(width
=8, depth
=0).depth
, 0)
22 self
.assertEqual(SyncFIFO(width
=8, depth
=1).depth
, 1)
23 self
.assertEqual(SyncFIFO(width
=8, depth
=2).depth
, 2)
25 def test_sync_buffered_depth(self
):
26 self
.assertEqual(SyncFIFOBuffered(width
=8, depth
=0).depth
, 0)
27 self
.assertEqual(SyncFIFOBuffered(width
=8, depth
=1).depth
, 1)
28 self
.assertEqual(SyncFIFOBuffered(width
=8, depth
=2).depth
, 2)
30 def test_async_depth(self
):
31 self
.assertEqual(AsyncFIFO(width
=8, depth
=0 ).depth
, 0)
32 self
.assertEqual(AsyncFIFO(width
=8, depth
=1 ).depth
, 1)
33 self
.assertEqual(AsyncFIFO(width
=8, depth
=2 ).depth
, 2)
34 self
.assertEqual(AsyncFIFO(width
=8, depth
=3 ).depth
, 4)
35 self
.assertEqual(AsyncFIFO(width
=8, depth
=4 ).depth
, 4)
36 self
.assertEqual(AsyncFIFO(width
=8, depth
=15).depth
, 16)
37 self
.assertEqual(AsyncFIFO(width
=8, depth
=16).depth
, 16)
38 self
.assertEqual(AsyncFIFO(width
=8, depth
=17).depth
, 32)
40 def test_async_depth_wrong(self
):
41 with self
.assertRaisesRegex(ValueError,
42 (r
"^AsyncFIFO only supports depths that are powers of 2; "
43 r
"requested exact depth 15 is not$")):
44 AsyncFIFO(width
=8, depth
=15, exact_depth
=True)
46 def test_async_buffered_depth(self
):
47 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=0 ).depth
, 0)
48 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=1 ).depth
, 2)
49 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=2 ).depth
, 2)
50 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=3 ).depth
, 3)
51 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=4 ).depth
, 5)
52 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=15).depth
, 17)
53 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=16).depth
, 17)
54 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=17).depth
, 17)
55 self
.assertEqual(AsyncFIFOBuffered(width
=8, depth
=18).depth
, 33)
57 def test_async_buffered_depth_wrong(self
):
58 with self
.assertRaisesRegex(ValueError,
59 (r
"^AsyncFIFOBuffered only supports depths that are one higher than powers of 2; "
60 r
"requested exact depth 16 is not$")):
61 AsyncFIFOBuffered(width
=8, depth
=16, exact_depth
=True)
63 class FIFOModel(Elaboratable
, FIFOInterface
):
65 Non-synthesizable first-in first-out queue, implemented naively as a chain of registers.
67 def __init__(self
, *, width
, depth
, fwft
, r_domain
, w_domain
):
68 super().__init
__(width
=width
, depth
=depth
, fwft
=fwft
)
70 self
.r_domain
= r_domain
71 self
.w_domain
= w_domain
73 self
.level
= Signal(range(self
.depth
+ 1))
74 self
.r_level
= Signal(range(self
.depth
+ 1))
75 self
.w_level
= Signal(range(self
.depth
+ 1))
77 def elaborate(self
, platform
):
80 storage
= Memory(width
=self
.width
, depth
=self
.depth
)
81 w_port
= m
.submodules
.w_port
= storage
.write_port(domain
=self
.w_domain
)
82 r_port
= m
.submodules
.r_port
= storage
.read_port (domain
="comb")
84 produce
= Signal(range(self
.depth
))
85 consume
= Signal(range(self
.depth
))
87 m
.d
.comb
+= self
.r_rdy
.eq(self
.level
> 0)
88 m
.d
.comb
+= r_port
.addr
.eq((consume
+ 1) % self
.depth
)
90 m
.d
.comb
+= self
.r_data
.eq(r_port
.data
)
91 with m
.If(self
.r_en
& self
.r_rdy
):
93 m
.d
[self
.r_domain
] += self
.r_data
.eq(r_port
.data
)
94 m
.d
[self
.r_domain
] += consume
.eq(r_port
.addr
)
96 m
.d
.comb
+= self
.w_rdy
.eq(self
.level
< self
.depth
)
97 m
.d
.comb
+= w_port
.data
.eq(self
.w_data
)
98 with m
.If(self
.w_en
& self
.w_rdy
):
99 m
.d
.comb
+= w_port
.addr
.eq((produce
+ 1) % self
.depth
)
100 m
.d
.comb
+= w_port
.en
.eq(1)
101 m
.d
[self
.w_domain
] += produce
.eq(w_port
.addr
)
103 with m
.If(ResetSignal(self
.r_domain
) |
ResetSignal(self
.w_domain
)):
104 m
.d
.sync
+= self
.level
.eq(0)
106 m
.d
.sync
+= self
.level
.eq(self
.level
107 + (self
.w_rdy
& self
.w_en
)
108 - (self
.r_rdy
& self
.r_en
))
111 self
.r_level
.eq(self
.level
),
112 self
.w_level
.eq(self
.level
),
114 m
.d
.comb
+= Assert(ResetSignal(self
.r_domain
) == ResetSignal(self
.w_domain
))
119 class FIFOModelEquivalenceSpec(Elaboratable
):
121 The first-in first-out queue model equivalence specification: for any inputs and control
122 signals, the behavior of the implementation under test exactly matches the ideal model,
123 except for behavior not defined by the model.
125 def __init__(self
, fifo
, r_domain
, w_domain
):
128 self
.r_domain
= r_domain
129 self
.w_domain
= w_domain
131 def elaborate(self
, platform
):
133 m
.submodules
.dut
= dut
= self
.fifo
134 m
.submodules
.gold
= gold
= FIFOModel(width
=dut
.width
, depth
=dut
.depth
, fwft
=dut
.fwft
,
135 r_domain
=self
.r_domain
, w_domain
=self
.w_domain
)
138 gold
.r_en
.eq(dut
.r_rdy
& dut
.r_en
),
139 gold
.w_en
.eq(dut
.w_en
),
140 gold
.w_data
.eq(dut
.w_data
),
143 m
.d
.comb
+= Assert(dut
.r_rdy
.implies(gold
.r_rdy
))
144 m
.d
.comb
+= Assert(dut
.w_rdy
.implies(gold
.w_rdy
))
145 m
.d
.comb
+= Assert(dut
.r_level
== gold
.r_level
)
146 m
.d
.comb
+= Assert(dut
.w_level
== gold
.w_level
)
149 m
.d
.comb
+= Assert(dut
.r_rdy
150 .implies(dut
.r_data
== gold
.r_data
))
152 m
.d
.comb
+= Assert((Past(dut
.r_rdy
, domain
=self
.r_domain
) &
153 Past(dut
.r_en
, domain
=self
.r_domain
))
154 .implies(dut
.r_data
== gold
.r_data
))
159 class FIFOContractSpec(Elaboratable
):
161 The first-in first-out queue contract specification: if two elements are written to the queue
162 consecutively, they must be read out consecutively at some later point, no matter all other
163 circumstances, with the exception of reset.
165 def __init__(self
, fifo
, *, r_domain
, w_domain
, bound
):
167 self
.r_domain
= r_domain
168 self
.w_domain
= w_domain
171 def elaborate(self
, platform
):
173 m
.submodules
.dut
= fifo
= self
.fifo
175 m
.domains
+= ClockDomain("sync")
176 m
.d
.comb
+= ResetSignal().eq(0)
177 if self
.w_domain
!= "sync":
178 m
.domains
+= ClockDomain(self
.w_domain
)
179 m
.d
.comb
+= ResetSignal(self
.w_domain
).eq(0)
180 if self
.r_domain
!= "sync":
181 m
.domains
+= ClockDomain(self
.r_domain
)
182 m
.d
.comb
+= ResetSignal(self
.r_domain
).eq(0)
184 entry_1
= AnyConst(fifo
.width
)
185 entry_2
= AnyConst(fifo
.width
)
187 with m
.FSM(domain
=self
.w_domain
) as write_fsm
:
188 with m
.State("WRITE-1"):
189 with m
.If(fifo
.w_rdy
):
191 fifo
.w_data
.eq(entry_1
),
195 with m
.State("WRITE-2"):
196 with m
.If(fifo
.w_rdy
):
198 fifo
.w_data
.eq(entry_2
),
202 with m
.State("DONE"):
205 with m
.FSM(domain
=self
.r_domain
) as read_fsm
:
206 read_1
= Signal(fifo
.width
)
207 read_2
= Signal(fifo
.width
)
208 with m
.State("READ"):
209 m
.d
.comb
+= fifo
.r_en
.eq(1)
213 r_rdy
= Past(fifo
.r_rdy
, domain
=self
.r_domain
)
217 read_2
.eq(fifo
.r_data
),
219 with m
.If((read_1
== entry_1
) & (read_2
== entry_2
)):
221 with m
.State("DONE"):
224 with m
.If(Initial()):
225 m
.d
.comb
+= Assume(write_fsm
.ongoing("WRITE-1"))
226 m
.d
.comb
+= Assume(read_fsm
.ongoing("READ"))
227 with m
.If(Past(Initial(), self
.bound
- 1)):
228 m
.d
.comb
+= Assert(read_fsm
.ongoing("DONE"))
230 with m
.If(ResetSignal(domain
=self
.w_domain
)):
231 m
.d
.comb
+= Assert(~fifo
.r_rdy
)
233 if self
.w_domain
!= "sync" or self
.r_domain
!= "sync":
234 m
.d
.comb
+= Assume(Rose(ClockSignal(self
.w_domain
)) |
235 Rose(ClockSignal(self
.r_domain
)))
240 class FIFOFormalCase(FHDLTestCase
):
241 def check_sync_fifo(self
, fifo
):
242 self
.assertFormal(FIFOModelEquivalenceSpec(fifo
, r_domain
="sync", w_domain
="sync"),
243 mode
="bmc", depth
=fifo
.depth
+ 1)
244 self
.assertFormal(FIFOContractSpec(fifo
, r_domain
="sync", w_domain
="sync",
245 bound
=fifo
.depth
* 2 + 1),
246 mode
="hybrid", depth
=fifo
.depth
* 2 + 1)
248 def test_sync_fwft_pot(self
):
249 self
.check_sync_fifo(SyncFIFO(width
=8, depth
=4, fwft
=True))
251 def test_sync_fwft_npot(self
):
252 self
.check_sync_fifo(SyncFIFO(width
=8, depth
=5, fwft
=True))
254 def test_sync_not_fwft_pot(self
):
255 self
.check_sync_fifo(SyncFIFO(width
=8, depth
=4, fwft
=False))
257 def test_sync_not_fwft_npot(self
):
258 self
.check_sync_fifo(SyncFIFO(width
=8, depth
=5, fwft
=False))
260 def test_sync_buffered_pot(self
):
261 self
.check_sync_fifo(SyncFIFOBuffered(width
=8, depth
=4))
263 def test_sync_buffered_potp1(self
):
264 self
.check_sync_fifo(SyncFIFOBuffered(width
=8, depth
=5))
266 def test_sync_buffered_potm1(self
):
267 self
.check_sync_fifo(SyncFIFOBuffered(width
=8, depth
=3))
269 def check_async_fifo(self
, fifo
):
270 # TODO: properly doing model equivalence checking on this likely requires multiclock,
271 # which is not really documented nor is it clear how to use it.
272 # self.assertFormal(FIFOModelEquivalenceSpec(fifo, r_domain="read", w_domain="write"),
273 # mode="bmc", depth=fifo.depth * 3 + 1)
274 self
.assertFormal(FIFOContractSpec(fifo
, r_domain
="read", w_domain
="write",
275 bound
=fifo
.depth
* 4 + 1),
276 mode
="hybrid", depth
=fifo
.depth
* 4 + 1)
278 def test_async(self
):
279 self
.check_async_fifo(AsyncFIFO(width
=8, depth
=4))
281 def test_async_buffered(self
):
282 self
.check_async_fifo(AsyncFIFOBuffered(width
=8, depth
=4))