lib.fifo: fix level on fifo full
authorJaro Habiger <jarohabiger@googlemail.com>
Tue, 3 Nov 2020 09:10:07 +0000 (10:10 +0100)
committerwhitequark <whitequark@whitequark.org>
Tue, 3 Nov 2020 09:20:30 +0000 (09:20 +0000)
nmigen/lib/fifo.py
tests/test_lib_fifo.py

index d0b79bd4993296cc7198eeff2396a8dd16b0b58e..bbe0e9dba984c245faeb205a6097f6db054e7d10 100644 (file)
@@ -380,8 +380,8 @@ class AsyncFIFO(Elaboratable, FIFOInterface):
             r_empty.eq(consume_r_gry == produce_r_gry),
         ]
 
-        m.d[self._w_domain] += self.w_level.eq((produce_w_bin - consume_w_bin)[:self._ctr_bits-1])
-        m.d.comb += self.r_level.eq((produce_r_bin - consume_r_bin)[:self._ctr_bits-1])
+        m.d[self._w_domain] += self.w_level.eq((produce_w_bin - consume_w_bin))
+        m.d.comb += self.r_level.eq((produce_r_bin - consume_r_bin))
 
         storage = Memory(width=self.width, depth=self.depth)
         w_port  = m.submodules.w_port = storage.write_port(domain=self._w_domain)
index 7de265f270d7d08a332e3bf037c96ea6b50711a1..c8674ba01ff31739416dc6fc36dcd86cbdd97efe 100644 (file)
@@ -302,4 +302,35 @@ class AsyncFIFOSimCase(FHDLTestCase):
         simulator = Simulator(fifo)
         simulator.add_clock(100e-6)
         simulator.add_sync_process(testbench)
-        simulator.run()
\ No newline at end of file
+        simulator.run()
+
+    def check_async_fifo_level(self, fifo, fill_in, expected_level):
+        write_done = Signal()
+
+        def write_process():
+            for i in range(fill_in):
+                yield fifo.w_data.eq(i)
+                yield fifo.w_en.eq(1)
+                yield
+            yield fifo.w_en.eq(0)
+            yield
+            yield
+            self.assertEqual((yield fifo.w_level), expected_level)
+            yield write_done.eq(1)
+
+        def read_process():
+            while not (yield write_done):
+                yield
+            self.assertEqual((yield fifo.r_level), expected_level)
+
+        simulator = Simulator(fifo)
+        simulator.add_clock(100e-6, domain="write")
+        simulator.add_sync_process(write_process, domain="write")
+        simulator.add_clock(50e-6, domain="read")
+        simulator.add_sync_process(read_process, domain="read")
+        with simulator.write_vcd("test.vcd"):
+            simulator.run()
+
+    def test_async_fifo_level_full(self):
+        fifo = AsyncFIFO(width=32, depth=8, r_domain="read", w_domain="write")
+        self.check_async_fifo_level(fifo, fill_in=10, expected_level=8)