test/axi: add shared AXI Lite interconnect tests
authorJędrzej Boczar <jboczar@antmicro.com>
Wed, 22 Jul 2020 12:01:02 +0000 (14:01 +0200)
committerJędrzej Boczar <jboczar@antmicro.com>
Wed, 22 Jul 2020 15:16:33 +0000 (17:16 +0200)
litex/soc/interconnect/axi.py
test/test_axi.py

index e76abe301facb173a4484c7de0eebea7863ba553..71040ac7de261bd3cc1fc4258c04a4653aeae754 100644 (file)
@@ -134,16 +134,16 @@ def r_lite_description(data_width):
     ]
 
 class AXILiteInterface:
-    def __init__(self, data_width=32, address_width=32, clock_domain="sys"):
+    def __init__(self, data_width=32, address_width=32, clock_domain="sys", name=None):
         self.data_width    = data_width
         self.address_width = address_width
         self.clock_domain  = clock_domain
 
-        self.aw = stream.Endpoint(ax_lite_description(address_width))
-        self.w  = stream.Endpoint(w_lite_description(data_width))
-        self.b  = stream.Endpoint(b_lite_description())
-        self.ar = stream.Endpoint(ax_lite_description(address_width))
-        self.r  = stream.Endpoint(r_lite_description(data_width))
+        self.aw = stream.Endpoint(ax_lite_description(address_width), name=name)
+        self.w  = stream.Endpoint(w_lite_description(data_width), name=name)
+        self.b  = stream.Endpoint(b_lite_description(), name=name)
+        self.ar = stream.Endpoint(ax_lite_description(address_width), name=name)
+        self.r  = stream.Endpoint(r_lite_description(data_width), name=name)
 
     def get_ios(self, bus_name="wb"):
         subsignals = []
@@ -1116,11 +1116,11 @@ class AXILiteInterconnectShared(Module):
     {slaves}
     """.format(slaves=AXILiteDecoder._doc_slaves)
 
-    def __init__(self, masters, slaves, register=False, timeout_cycles=1e6):
+    def __init__(self, masters, slaves, timeout_cycles=1e6):
         # TODO: data width
         shared = AXILiteInterface()
         self.submodules.arbiter = AXILiteArbiter(masters, shared)
-        self.submodules.decoder = AXILiteDecoder(shared, slaves, register)
+        self.submodules.decoder = AXILiteDecoder(shared, slaves)
         if timeout_cycles is not None:
             self.submodules.timeout = AXILiteTimeout(shared, timeout_cycles)
 
@@ -1132,7 +1132,7 @@ class AXILiteCrossbar(Module):
     {slaves}
     """.format(slaves=AXILiteDecoder._doc_slaves)
 
-    def __init__(self, masters, slaves, register=False):
+    def __init__(self, masters, slaves, timeout_cycles=1e6):
         matches, busses = zip(*slaves)
         access_m_s = [[AXILiteInterface() for j in slaves] for i in masters]  # a[master][slave]
         access_s_m = list(zip(*access_m_s))  # a[slave][master]
index 1c8345569460ce1960f7ee59ccc9b493129abfb7..f76e48b06989e85aa4d818a380de6ea48727042f 100644 (file)
@@ -329,16 +329,21 @@ class TestAXI(unittest.TestCase):
 
 # TestAXILite --------------------------------------------------------------------------------------
 
+def _int_or_call(int_or_func):
+    if callable(int_or_func):
+        return int_or_func()
+    return int_or_func
+
 class AXILiteChecker:
-    def __init__(self, ready_latency=None, response_latency=None, rdata_generator=None):
-        self.ready_latency = ready_latency or (lambda: 0)
-        self.response_latency = response_latency or (lambda: 0)
+    def __init__(self, ready_latency=0, response_latency=0, rdata_generator=None):
+        self.ready_latency = ready_latency
+        self.response_latency = response_latency
         self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de)
         self.writes = []  # (addr, data, strb)
         self.reads = []  # (addr, data)
 
     def delay(self, latency):
-        for _ in range(latency()):
+        for _ in range(_int_or_call(latency)):
             yield
 
     def handle_write(self, axi_lite):
@@ -401,8 +406,11 @@ class AXILiteChecker:
             yield
 
 @passive
-def timeout(ticks):
-    for _ in range(ticks):
+def timeout_generator(ticks):
+    import os
+    for i in range(ticks):
+        if os.environ.get("TIMEOUT_DEBUG", "") == "1":
+            print("tick {}".format(i))
         yield
     raise TimeoutError("Timeout after %d ticks" % ticks)
 
@@ -655,22 +663,35 @@ class TestAXILite(unittest.TestCase):
 
 # TestAXILiteInterconnet ---------------------------------------------------------------------------
 
-class TestAXILiteInterconnect(unittest.TestCase):
-    def axilite_pattern_generator(self, axi_lite, pattern, delay=0):
-        for rw, addr, data in pattern:
+class AXILitePatternGenerator:
+    def __init__(self, axi_lite, pattern, delay=0):
+        self.axi_lite = axi_lite
+        self.pattern = pattern
+        self.delay = delay
+        self.errors = 0
+        self.read_errors = []
+        self.resp_errors = {"w": 0, "r": 0}
+
+    def handler(self):
+        for rw, addr, data in self.pattern:
             assert rw in ["w", "r"]
             if rw == "w":
-                resp = (yield from axi_lite.write(addr, data, 2**len(axi_lite.w.strb) - 1))
-                self.assertEqual(resp, RESP_OKAY)
+                strb = 2**len(self.axi_lite.w.strb) - 1
+                resp = (yield from self.axi_lite.write(addr, data, strb))
             else:
-                rdata, resp = (yield from axi_lite.read(addr))
-                self.assertEqual(resp, RESP_OKAY)
-                self.assertEqual(rdata, data)
-            for _ in range(delay):
+                rdata, resp = (yield from self.axi_lite.read(addr))
+                if rdata != data:
+                    self.read_errors.append((rdata, data))
+                    self.errors += 1
+            if resp != RESP_OKAY:
+                self.resp_errors[rw] += 1
+                self.errors += 1
+            for _ in range(_int_or_call(self.delay)):
                 yield
         for _ in range(16):
             yield
 
+class TestAXILiteInterconnect(unittest.TestCase):
     def test_interconnect_p2p(self):
         class DUT(Module):
             def __init__(self):
@@ -694,7 +715,7 @@ class TestAXILiteInterconnect(unittest.TestCase):
         dut = DUT()
         checker = AXILiteChecker(rdata_generator=rdata_generator)
         generators = [
-            self.axilite_pattern_generator(dut.master, pattern),
+            AXILitePatternGenerator(dut.master, pattern).handler(),
             checker.handler(dut.slave),
         ]
         run_simulation(dut, generators)
@@ -737,7 +758,7 @@ class TestAXILiteInterconnect(unittest.TestCase):
         generators = [
             generator(dut.master),
             checker(dut.slave),
-            timeout(300),
+            timeout_generator(300),
         ]
         run_simulation(dut, generators)
 
@@ -772,7 +793,7 @@ class TestAXILiteInterconnect(unittest.TestCase):
             dut = DUT(n_masters)
             checker = AXILiteChecker()
             generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
-            generators += [timeout(300), checker.handler(dut.slave)]
+            generators += [timeout_generator(300), checker.handler(dut.slave)]
             run_simulation(dut, generators)
             order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
             self.assertEqual([addr for addr, data, strb in checker.writes], order)
@@ -783,7 +804,7 @@ class TestAXILiteInterconnect(unittest.TestCase):
             dut = DUT(n_masters)
             checker = AXILiteChecker()
             generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
-            generators += [timeout(300), checker.handler(dut.slave)]
+            generators += [timeout_generator(300), checker.handler(dut.slave)]
             run_simulation(dut, generators)
             order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
             self.assertEqual([addr for addr, data, strb in checker.writes], order)
@@ -820,7 +841,7 @@ class TestAXILiteInterconnect(unittest.TestCase):
             dut = DUT(n_masters)
             checker = AXILiteChecker(response_latency=lambda: 3)
             generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
-            generators += [timeout(300), checker.handler(dut.slave)]
+            generators += [timeout_generator(300), checker.handler(dut.slave)]
             run_simulation(dut, generators)
             order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
             self.assertEqual([addr for addr, data, strb in checker.writes], order)
@@ -831,12 +852,22 @@ class TestAXILiteInterconnect(unittest.TestCase):
             dut = DUT(n_masters)
             checker = AXILiteChecker(response_latency=lambda: 3)
             generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
-            generators += [timeout(300), checker.handler(dut.slave)]
+            generators += [timeout_generator(300), checker.handler(dut.slave)]
             run_simulation(dut, generators)
             order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
             self.assertEqual([addr for addr, data, strb in checker.writes], order)
             self.assertEqual([addr for addr, data in checker.reads], order)
 
+    def address_decoder(self, i, size=0x100, python=False):
+        # bytes to 32-bit words aligned
+        _size   = (size) >> 2
+        _origin = (size * i) >> 2
+        if python:  # for python integers
+            shift = log2_int(_size)
+            return lambda a: ((a >> shift) == (_origin >> shift))
+        # for migen signals
+        return lambda a: (a[log2_int(_size):] == (_origin >> log2_int(_size)))
+
     def decoder_test(self, n_slaves, pattern, generator_delay=0):
         class DUT(Module):
             def __init__(self, decoders):
@@ -845,25 +876,19 @@ class TestAXILiteInterconnect(unittest.TestCase):
                 slaves = list(zip(decoders, self.slaves))
                 self.submodules.decoder = AXILiteDecoder(self.master, slaves)
 
-        def decoder(i):
-            # bytes to 32-bit words aligned
-            size   = (0x100) >> 2
-            origin = (0x100 * i) >> 2
-            return lambda a: (a[log2_int(size):] == (origin >> log2_int(size)))
-
         def rdata_generator(adr):
             for rw, a, v in pattern:
                 if rw == "r" and a == adr:
                     return v
             return 0xbaadc0de
 
-        dut = DUT([decoder(i) for i in range(n_slaves)])
+        dut = DUT([self.address_decoder(i) for i in range(n_slaves)])
         checkers = [AXILiteChecker(rdata_generator=rdata_generator) for _ in dut.slaves]
 
-        generators = [self.axilite_pattern_generator(dut.master, pattern, delay=generator_delay)]
+        generators = [AXILitePatternGenerator(dut.master, pattern, delay=generator_delay).handler()]
         generators += [checker.handler(slave) for (slave, checker) in zip(dut.slaves, checkers)]
-        generators += [timeout(300)]
-        run_simulation(dut, generators, vcd_name='sim.vcd')
+        generators += [timeout_generator(300)]
+        run_simulation(dut, generators)
 
         return checkers
 
@@ -946,3 +971,136 @@ class TestAXILiteInterconnect(unittest.TestCase):
             self.decoder_test(n_slaves=3, pattern=[
                 ("r", 0x300, 1),
             ])
+
+    def interconnect_shared_test(self, master_patterns, slave_decoders,
+                                 master_delay=0, slave_ready_latency=0, slave_response_latency=0,
+                                 timeout=300, **kwargs):
+        # number of masters/slaves is defined by the number of patterns/decoders
+        # master_patterns: list of patterns per master, pattern = list(tuple(rw, addr, data))
+        # slave_decoders: list of address decoders per slave
+        class DUT(Module):
+            def __init__(self, n_masters, decoders, **kwargs):
+                self.masters = [AXILiteInterface(name="master") for _ in range(n_masters)]
+                self.slaves  = [AXILiteInterface(name="slave") for _ in range(len(decoders))]
+                slaves = list(zip(decoders, self.slaves))
+                self.submodules.interconnect = AXILiteInterconnectShared(self.masters, slaves, **kwargs)
+
+        class ReadDataGenerator:
+            # Generates data based on decoded addresses and data defined in master_patterns
+            def __init__(self, patterns):
+                self.mem = {}
+                for pattern in patterns:
+                    for rw, addr, val in pattern:
+                        if rw == "r":
+                            assert addr not in self.mem
+                            self.mem[addr] = val
+
+            def getter(self, n):
+                # on miss will give default data depending on slave n
+                return lambda addr: self.mem.get(addr, 0xbaad0000 + n)
+
+        def new_checker(rdata_generator):
+            return AXILiteChecker(ready_latency=slave_ready_latency,
+                                  response_latency=slave_response_latency,
+                                  rdata_generator=rdata_generator)
+
+        # perpare test
+        dut = DUT(len(master_patterns), slave_decoders, **kwargs)
+        rdata_generator = ReadDataGenerator(master_patterns)
+        checkers = [new_checker(rdata_generator.getter(i)) for i, _ in enumerate(master_patterns)]
+        pattern_generators = [AXILitePatternGenerator(dut.masters[i], pattern, delay=master_delay)
+                              for i, pattern in enumerate(master_patterns)]
+
+        # run simulator
+        generators = [gen.handler() for gen in pattern_generators]
+        generators += [checker.handler(slave) for (slave, checker) in zip(dut.slaves, checkers)]
+        generators += [timeout_generator(timeout)]
+        run_simulation(dut, generators)
+
+        return pattern_generators, checkers
+
+    def test_interconnect_shared_basic(self):
+        master_patterns = [
+            [("w", 0x000, 0), ("w", 0x101, 0), ("w", 0x202, 0)],
+            [("w", 0x010, 0), ("w", 0x111, 0), ("w", 0x112, 0)],
+            [("w", 0x220, 0), ("w", 0x221, 0), ("w", 0x222, 0)],
+        ]
+        slave_decoders = [self.address_decoder(i) for i in range(3)]
+
+        generators, checkers = self.interconnect_shared_test(master_patterns, slave_decoders,
+                                                             master_delay=1)
+
+        for gen in generators:
+            self.assertEqual(gen.errors, 0)
+
+        def addr(checker_list):
+            return [entry[0] for entry in checker_list]
+
+        self.assertEqual(addr(checkers[0].writes), [0x000, 0x010])
+        self.assertEqual(addr(checkers[1].writes), [0x101, 0x111, 0x112])
+        self.assertEqual(addr(checkers[2].writes), [0x220, 0x221, 0x202, 0x222])
+        self.assertEqual(addr(checkers[0].reads), [])
+        self.assertEqual(addr(checkers[1].reads), [])
+        self.assertEqual(addr(checkers[2].reads), [])
+
+    def interconnect_shared_stress_test(self, timeout=1000, **kwargs):
+        prng = random.Random(42)
+
+        n_masters = 3
+        n_slaves = 3
+        pattern_length = 64
+        slave_region_size = 0x10000000
+        # for testing purpose each master will access only its own region of a slave
+        master_region_size = 0x1000
+        assert n_masters*master_region_size < slave_region_size
+
+        def gen_pattern(n, length):
+            assert length < master_region_size
+            for i_access in range(length):
+                rw = "w" if prng.randint(0, 1) == 0 else "r"
+                i_slave = prng.randrange(n_slaves)
+                addr = i_slave*slave_region_size + n*master_region_size + i_access
+                data = addr
+                yield rw, addr, data
+
+        master_patterns   = [list(gen_pattern(i, pattern_length)) for i in range(n_masters)]
+        slave_decoders    = [self.address_decoder(i, size=slave_region_size) for i in range(n_slaves)]
+        slave_decoders_py = [self.address_decoder(i, size=slave_region_size, python=True)
+                             for i in range(n_slaves)]
+
+        generators, checkers = self.interconnect_shared_test(master_patterns, slave_decoders,
+                                                             timeout=timeout, **kwargs)
+
+        for gen in generators:
+            read_errors = ["  0x{:08x} vs 0x{:08x}".format(v, ref) for v, ref in gen.read_errors]
+            msg = "\ngen.resp_errors = {}\ngen.read_errors = \n{}".format(
+                gen.resp_errors, "\n".join(read_errors))
+            self.assertEqual(gen.errors, 0, msg=msg)
+
+        # make sure all the accesses at slave side are in correct address region
+        for i_slave, (checker, decoder) in enumerate(zip(checkers, slave_decoders_py)):
+            for addr in (entry[0] for entry in checker.writes + checker.reads):
+                # compensate for the fact that decoders work on word-aligned addresses
+                self.assertNotEqual(decoder(addr >> 2), 0)
+
+    def test_interconnect_shared_stress_no_delay(self):
+        self.interconnect_shared_stress_test(timeout=1000,
+                                             master_delay=0,
+                                             slave_ready_latency=0,
+                                             slave_response_latency=0)
+
+    def test_interconnect_shared_stress_rand_short(self):
+        prng = random.Random(42)
+        rand = lambda: prng.randrange(4)
+        self.interconnect_shared_stress_test(timeout=2000,
+                                             master_delay=rand,
+                                             slave_ready_latency=rand,
+                                             slave_response_latency=rand)
+
+    def test_interconnect_shared_stress_rand_long(self):
+        prng = random.Random(42)
+        rand = lambda: prng.randrange(16)
+        self.interconnect_shared_stress_test(timeout=4000,
+                                             master_delay=rand,
+                                             slave_ready_latency=rand,
+                                             slave_response_latency=rand)