soc/interconnect/axi: point-to-point interconnect and timeout module with tests
authorJędrzej Boczar <jboczar@antmicro.com>
Fri, 17 Jul 2020 14:48:46 +0000 (16:48 +0200)
committerJędrzej Boczar <jboczar@antmicro.com>
Wed, 22 Jul 2020 15:16:12 +0000 (17:16 +0200)
litex/soc/interconnect/axi.py
test/test_axi.py

index 75b992ef8732bfc4139bc68df81cd59409d44797..dc8a32b3a5018d991e3bf947a2f27cd8c821fa63 100644 (file)
@@ -5,10 +5,14 @@
 """AXI4 Full/Lite support for LiteX"""
 
 from migen import *
+from migen.genlib import roundrobin
+from migen.genlib.misc import split, displacer, chooser, WaitTimer
 
 from litex.soc.interconnect import stream
 from litex.build.generic_platform import *
 
+from litex.soc.interconnect import csr_bus
+
 # AXI Definition -----------------------------------------------------------------------------------
 
 BURST_FIXED    = 0b00
@@ -72,6 +76,24 @@ def _connect_axi(master, slave):
         r.extend(m.connect(s))
     return r
 
+def _axi_layout_flat(axi):
+    # yields tuples (channel, name, direction)
+    def get_dir(channel, direction):
+        if channel in ["b", "r"]:
+            return {DIR_M_TO_S: DIR_S_TO_M, DIR_S_TO_M: DIR_M_TO_S}[direction]
+        return direction
+    for ch in ["aw", "w", "b", "ar", "r"]:
+        channel = getattr(axi, ch)
+        for group in channel.layout:
+            if len(group) == 3:
+                name, _, direction = group
+                yield ch, name, get_dir(ch, direction)
+            else:
+                _, subgroups = group
+                for subgroup in subgroups:
+                    name, _, direction = subgroup
+                    yield ch, name, get_dir(ch, direction)
+
 class AXIInterface:
     def __init__(self, data_width=32, address_width=32, id_width=1, clock_domain="sys"):
         self.data_width    = data_width
@@ -88,6 +110,9 @@ class AXIInterface:
     def connect(self, slave):
         return _connect_axi(self, slave)
 
+    def layout_flat(self):
+        return list(_axi_layout_flat(self))
+
 # AXI Lite Definition ------------------------------------------------------------------------------
 
 def ax_lite_description(address_width):
@@ -161,6 +186,9 @@ class AXILiteInterface:
     def connect(self, slave):
         return _connect_axi(self, slave)
 
+    def layout_flat(self):
+        return list(_axi_layout_flat(self))
+
     def write(self, addr, data, strb=None):
         if strb is None:
             strb = 2**len(self.w.strb) - 1
@@ -621,14 +649,14 @@ def axi_lite_to_simple(axi_lite, port_adr, port_dat_r, port_dat_w=None, port_we=
     return fsm, comb
 
 class AXILite2CSR(Module):
-    def __init__(self, axi_lite=None, csr=None):
+    def __init__(self, axi_lite=None, bus_csr=None):
         if axi_lite is None:
             axi_lite = AXILiteInterface()
-        if csr is None:
-            csr = csr.bus.Interface()
+        if bus_csr is None:
+            bus_csr = csr_bus.Interface()
 
         self.axi_lite = axi_lite
-        self.csr = csr
+        self.csr = bus_csr
 
         fsm, comb = axi_lite_to_simple(self.axi_lite,
                                        port_adr=self.csr.adr, port_dat_r=self.csr.dat_r,
@@ -852,3 +880,59 @@ class AXILiteConverter(Module):
             raise NotImplementedError("AXILiteUpConverter")
         else:
             self.comb += master.connect(slave)
+
+# AXILite Timeout ----------------------------------------------------------------------------------
+
+class AXILiteTimeout(Module):
+    """Protect master against slave timeouts (master _has_ to respond correctly)"""
+    def __init__(self, master, cycles):
+        self.error = Signal()
+
+        # # #
+
+        timer = WaitTimer(int(cycles))
+        self.submodules += timer
+        is_write = Signal()
+        is_read  = Signal()
+
+        self.submodules.fsm = fsm = FSM()
+        fsm.act("WAIT",
+            is_write.eq((master.aw.valid & ~master.aw.ready) | (master.w.valid & ~master.w.ready)),
+            is_read.eq(master.ar.valid & ~master.ar.ready),
+            timer.wait.eq(is_write | is_read),
+            # done is updated in `sync`, so we must make sure that `ready` has not been issued
+            # by slave during that single cycle, by checking `timer.wait`
+            If(timer.done & timer.wait,
+                self.error.eq(1),
+                If(is_write,
+                    NextState("RESPOND-WRITE")
+                ).Else(
+                    NextState("RESPOND-READ")
+                )
+            )
+        )
+        fsm.act("RESPOND-WRITE",
+            master.aw.ready.eq(master.aw.valid),
+            master.w.ready.eq(master.w.valid),
+            master.b.valid.eq(~master.aw.valid & ~master.w.valid),
+            master.b.resp.eq(RESP_SLVERR),
+            If(master.b.valid & master.b.ready,
+                NextState("WAIT")
+            )
+        )
+        fsm.act("RESPOND-READ",
+            master.ar.ready.eq(master.ar.valid),
+            master.r.valid.eq(~master.ar.valid),
+            master.r.resp.eq(RESP_SLVERR),
+            master.r.data.eq(2**len(master.r.data) - 1),
+            If(master.r.valid & master.r.ready,
+                NextState("WAIT")
+            )
+        )
+
+# AXILite Interconnect -----------------------------------------------------------------------------
+
+class AXILiteInterconnectPointToPoint(Module):
+    def __init__(self, master, slave):
+        self.comb += master.connect(slave)
+
index 84b2b52e174b365340fc74cdaf8fb220feb3d090..6a6059e762cb9014da4a0c59482e29881fdc962d 100644 (file)
@@ -333,8 +333,8 @@ class AXILiteChecker:
     def __init__(self, latency=None, rdata_generator=None):
         self.latency = latency or (lambda: 0)
         self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de)
-        self.writes = []
-        self.reads = []
+        self.writes = []  # (addr, data, strb)
+        self.reads = []  # (addr, data)
 
     def delay(self):
         for _ in range(self.latency()):
@@ -555,7 +555,7 @@ class TestAXILite(unittest.TestCase):
 
         dut = DUT(width_from=width_from, width_to=width_to)
         checker = AXILiteChecker(latency, rdata_generator)
-        run_simulation(dut, [generator(dut.master), checker.handler(dut.slave)], vcd_name='sim.vcd')
+        run_simulation(dut, [generator(dut.master), checker.handler(dut.slave)])
         self.assertEqual(checker.writes, write_expected)
         self.assertEqual(checker.reads, read_expected)
 
@@ -637,3 +637,92 @@ class TestAXILite(unittest.TestCase):
         ]
         self.converter_test(width_from=32, width_to=16,
                             write_pattern=write_pattern, write_expected=write_expected)
+
+    def axilite_pattern_generator(self, axi_lite, pattern):
+        for rw, addr, data in pattern:
+            assert rw in ["w", "r"]
+            if rw == "w":
+                resp = (yield from axi_lite.write(addr, data, 2**len(axi_lite.w.strb) - 1))
+                self.assertEqual(resp, RESP_OKAY)
+            else:
+                rdata, resp = (yield from axi_lite.read(addr))
+                self.assertEqual(resp, RESP_OKAY)
+                self.assertEqual(rdata, data)
+        for _ in range(16):
+            yield
+
+    def test_axilite_interconnect_p2p(self):
+        class DUT(Module):
+            def __init__(self):
+                self.master = master = AXILiteInterface()
+                self.slave  = slave  = AXILiteInterface()
+                self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave)
+
+        pattern = [
+            ("w", 0x00000004, 0x11111111),
+            ("w", 0x0000000c, 0x22222222),
+            ("r", 0x00000010, 0x33333333),
+            ("r", 0x00000018, 0x44444444),
+        ]
+
+        def rdata_generator(adr):
+            for rw, a, v in pattern:
+                if rw == "r" and a == adr:
+                    return v
+            return 0xbaadc0de
+
+        dut = DUT()
+        checker = AXILiteChecker(rdata_generator=rdata_generator)
+        generators = [
+            self.axilite_pattern_generator(dut.master, pattern),
+            checker.handler(dut.slave),
+        ]
+        run_simulation(dut, generators)
+        self.assertEqual(checker.writes, [(addr, data, 0b1111) for rw, addr, data in pattern if rw == "w"])
+        self.assertEqual(checker.reads, [(addr, data) for rw, addr, data in pattern if rw == "r"])
+
+    def test_axilite_timeout(self):
+        class DUT(Module):
+            def __init__(self):
+                self.master = master = AXILiteInterface()
+                self.slave  = slave  = AXILiteInterface()
+                self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave)
+                self.submodules.timeout = AXILiteTimeout(master, 16)
+
+        @passive
+        def timeout(ticks):
+            for _ in range(ticks):
+                yield
+            raise TimeoutError("Timeout after %d ticks" % ticks)
+
+        def generator(axi_lite):
+            resp = (yield from axi_lite.write(0x00001000, 0x11111111))
+            self.assertEqual(resp, RESP_OKAY)
+            resp = (yield from axi_lite.write(0x00002000, 0x22222222))
+            self.assertEqual(resp, RESP_SLVERR)
+            data, resp = (yield from axi_lite.read(0x00003000))
+            self.assertEqual(resp, RESP_SLVERR)
+            self.assertEqual(data, 0xffffffff)
+            yield
+
+        def checker(axi_lite):
+            for _ in range(16):
+                yield
+            yield axi_lite.aw.ready.eq(1)
+            yield axi_lite.w.ready.eq(1)
+            yield
+            yield axi_lite.aw.ready.eq(0)
+            yield axi_lite.w.ready.eq(0)
+            yield axi_lite.b.valid.eq(1)
+            yield
+            while not (yield axi_lite.b.ready):
+                yield
+            yield axi_lite.b.valid.eq(0)
+
+        dut = DUT()
+        generators = [
+            generator(dut.master),
+            checker(dut.slave),
+            timeout(300),
+        ]
+        run_simulation(dut, generators, vcd_name='sim.vcd')