1 # This file is Copyright (c) 2020 Antmicro <www.antmicro.com>
9 from litex
.soc
.interconnect
.axi
import *
10 from litex
.soc
.interconnect
import wishbone
, csr_bus
12 # Helpers ------------------------------------------------------------------------------------------
14 def _int_or_call(int_or_func
):
15 if callable(int_or_func
):
20 def timeout_generator(ticks
):
22 for i
in range(ticks
):
23 if os
.environ
.get("TIMEOUT_DEBUG", "") == "1":
24 print("tick {}".format(i
))
26 raise TimeoutError("Timeout after %d ticks" % ticks
)
29 def __init__(self
, ready_latency
=0, response_latency
=0, rdata_generator
=None):
30 self
.ready_latency
= ready_latency
31 self
.response_latency
= response_latency
32 self
.rdata_generator
= rdata_generator
or (lambda adr
: 0xbaadc0de)
33 self
.writes
= [] # (addr, data, strb)
34 self
.reads
= [] # (addr, data)
36 def delay(self
, latency
):
37 for _
in range(_int_or_call(latency
)):
40 def handle_write(self
, axi_lite
):
42 while not (yield axi_lite
.aw
.valid
):
44 yield from self
.delay(self
.ready_latency
)
45 addr
= (yield axi_lite
.aw
.addr
)
46 yield axi_lite
.aw
.ready
.eq(1)
48 yield axi_lite
.aw
.ready
.eq(0)
49 while not (yield axi_lite
.w
.valid
):
51 yield from self
.delay(self
.ready_latency
)
53 data
= (yield axi_lite
.w
.data
)
54 strb
= (yield axi_lite
.w
.strb
)
55 yield axi_lite
.w
.ready
.eq(1)
57 yield axi_lite
.w
.ready
.eq(0)
58 yield from self
.delay(self
.response_latency
)
60 yield axi_lite
.b
.valid
.eq(1)
61 yield axi_lite
.b
.resp
.eq(RESP_OKAY
)
63 while not (yield axi_lite
.b
.ready
):
65 yield axi_lite
.b
.valid
.eq(0)
66 self
.writes
.append((addr
, data
, strb
))
68 def handle_read(self
, axi_lite
):
70 while not (yield axi_lite
.ar
.valid
):
72 yield from self
.delay(self
.ready_latency
)
73 addr
= (yield axi_lite
.ar
.addr
)
74 yield axi_lite
.ar
.ready
.eq(1)
76 yield axi_lite
.ar
.ready
.eq(0)
77 yield from self
.delay(self
.response_latency
)
79 data
= self
.rdata_generator(addr
)
80 yield axi_lite
.r
.valid
.eq(1)
81 yield axi_lite
.r
.resp
.eq(RESP_OKAY
)
82 yield axi_lite
.r
.data
.eq(data
)
84 while not (yield axi_lite
.r
.ready
):
86 yield axi_lite
.r
.valid
.eq(0)
87 yield axi_lite
.r
.data
.eq(0)
88 self
.reads
.append((addr
, data
))
91 def handler(self
, axi_lite
):
93 if (yield axi_lite
.aw
.valid
):
94 yield from self
.handle_write(axi_lite
)
95 if (yield axi_lite
.ar
.valid
):
96 yield from self
.handle_read(axi_lite
)
100 def _write_handler(self
, axi_lite
):
102 yield from self
.handle_write(axi_lite
)
106 def _read_handler(self
, axi_lite
):
108 yield from self
.handle_read(axi_lite
)
111 def parallel_handlers(self
, axi_lite
):
112 return self
._write
_handler
(axi_lite
), self
._read
_handler
(axi_lite
)
114 class AXILitePatternGenerator
:
115 def __init__(self
, axi_lite
, pattern
, delay
=0):
116 # patter: (rw, addr, data)
117 self
.axi_lite
= axi_lite
118 self
.pattern
= pattern
121 self
.read_errors
= []
122 self
.resp_errors
= {"w": 0, "r": 0}
125 for rw
, addr
, data
in self
.pattern
:
126 assert rw
in ["w", "r"]
128 strb
= 2**len(self
.axi_lite
.w
.strb
) - 1
129 resp
= (yield from self
.axi_lite
.write(addr
, data
, strb
))
131 rdata
, resp
= (yield from self
.axi_lite
.read(addr
))
133 self
.read_errors
.append((rdata
, data
))
135 if resp
!= RESP_OKAY
:
136 self
.resp_errors
[rw
] += 1
138 for _
in range(_int_or_call(self
.delay
)):
143 # TestAXILite --------------------------------------------------------------------------------------
145 class TestAXILite(unittest
.TestCase
):
146 def test_wishbone2axi2wishbone(self
):
149 self
.wishbone
= wishbone
.Interface(data_width
=32)
153 axi
= AXILiteInterface(data_width
=32, address_width
=32)
154 wb
= wishbone
.Interface(data_width
=32)
156 wishbone2axi
= Wishbone2AXILite(self
.wishbone
, axi
)
157 axi2wishbone
= AXILite2Wishbone(axi
, wb
)
158 self
.submodules
+= wishbone2axi
, axi2wishbone
160 sram
= wishbone
.SRAM(1024, init
=[0x12345678, 0xa55aa55a])
161 self
.submodules
+= sram
162 self
.comb
+= wb
.connect(sram
.bus
)
166 if (yield from dut
.wishbone
.read(0)) != 0x12345678:
168 if (yield from dut
.wishbone
.read(1)) != 0xa55aa55a:
171 yield from dut
.wishbone
.write(i
, i
)
173 if (yield from dut
.wishbone
.read(i
)) != i
:
177 run_simulation(dut
, [generator(dut
)])
178 self
.assertEqual(dut
.errors
, 0)
180 def test_axilite2axi2mem(self
):
182 def __init__(self
, mem_bus
="wishbone"):
183 self
.axi_lite
= AXILiteInterface()
186 self
.submodules
.axil2axi
= AXILite2AXI(self
.axi_lite
, axi
)
188 interface_cls
, converter_cls
, sram_cls
= {
189 "wishbone": (wishbone
.Interface
, AXI2Wishbone
, wishbone
.SRAM
),
190 "axi_lite": (AXILiteInterface
, AXI2AXILite
, AXILiteSRAM
),
193 bus
= interface_cls()
194 self
.submodules
+= converter_cls(axi
, bus
)
195 sram
= sram_cls(1024, init
=[0x12345678, 0xa55aa55a])
196 self
.submodules
+= sram
197 self
.comb
+= bus
.connect(sram
.bus
)
199 def generator(axi_lite
, datas
, resps
):
200 data
, resp
= (yield from axi_lite
.read(0x00))
201 resps
.append((resp
, RESP_OKAY
))
202 datas
.append((data
, 0x12345678))
203 data
, resp
= (yield from axi_lite
.read(0x04))
204 resps
.append((resp
, RESP_OKAY
))
205 datas
.append((data
, 0xa55aa55a))
207 resp
= (yield from axi_lite
.write(4*i
, i
))
208 resps
.append((resp
, RESP_OKAY
))
210 data
, resp
= (yield from axi_lite
.read(4*i
))
211 resps
.append((resp
, RESP_OKAY
))
212 datas
.append((data
, i
))
214 for mem_bus
in ["wishbone", "axi_lite"]:
215 with self
.subTest(mem_bus
=mem_bus
):
216 # to have more verbose error messages store errors in list((actual, expected))
220 def actual_expected(results
): # split into (list(actual), list(expected))
221 return list(zip(*results
))
224 run_simulation(dut
, [generator(dut
.axi_lite
, datas
, resps
)])
225 self
.assertEqual(*actual_expected(resps
))
226 msg
= "\n".join("0x{:08x} vs 0x{:08x}".format(actual
, expected
) for actual
, expected
in datas
)
227 self
.assertEqual(*actual_expected(datas
), msg
="actual vs expected:\n" + msg
)
229 def test_axilite2csr(self
):
231 def csr_mem_handler(csr
, mem
):
233 adr
= (yield csr
.adr
)
234 yield csr
.dat_r
.eq(mem
[adr
])
236 mem
[adr
] = (yield csr
.dat_w
)
241 self
.axi_lite
= AXILiteInterface()
242 self
.csr
= csr_bus
.Interface()
243 self
.submodules
.axilite2csr
= AXILite2CSR(self
.axi_lite
, self
.csr
)
246 prng
= random
.Random(42)
247 mem_ref
= [prng
.randrange(255) for i
in range(100)]
252 for adr
, ref
in enumerate(mem_ref
):
254 data
, resp
= (yield from dut
.axi_lite
.read(adr
))
255 self
.assertEqual(resp
, 0b00)
259 write_data
= [prng
.randrange(255) for _
in mem_ref
]
261 for adr
, wdata
in enumerate(write_data
):
263 resp
= (yield from dut
.axi_lite
.write(adr
, wdata
))
264 self
.assertEqual(resp
, 0b00)
265 rdata
, resp
= (yield from dut
.axi_lite
.read(adr
))
266 self
.assertEqual(resp
, 0b00)
271 mem
= [v
for v
in mem_ref
]
272 run_simulation(dut
, [generator(dut
), csr_mem_handler(dut
.csr
, mem
)])
273 self
.assertEqual(dut
.errors
, 0)
275 def test_axilite_sram(self
):
277 def __init__(self
, size
, init
):
278 self
.axi_lite
= AXILiteInterface()
279 self
.submodules
.sram
= AXILiteSRAM(size
, init
=init
, bus
=self
.axi_lite
)
282 def generator(dut
, ref_init
):
283 for adr
, ref
in enumerate(ref_init
):
285 data
, resp
= (yield from dut
.axi_lite
.read(adr
))
286 self
.assertEqual(resp
, 0b00)
290 write_data
= [prng
.randrange(255) for _
in ref_init
]
292 for adr
, wdata
in enumerate(write_data
):
294 resp
= (yield from dut
.axi_lite
.write(adr
, wdata
))
295 self
.assertEqual(resp
, 0b00)
296 rdata
, resp
= (yield from dut
.axi_lite
.read(adr
))
297 self
.assertEqual(resp
, 0b00)
301 prng
= random
.Random(42)
302 init
= [prng
.randrange(2**32) for i
in range(100)]
304 dut
= DUT(size
=len(init
)*4, init
=[v
for v
in init
])
305 run_simulation(dut
, [generator(dut
, init
)])
306 self
.assertEqual(dut
.errors
, 0)
308 def converter_test(self
, width_from
, width_to
, parallel_rw
=False,
309 write_pattern
=None, write_expected
=None,
310 read_pattern
=None, read_expected
=None):
311 assert not (write_pattern
is None and read_pattern
is None)
313 if write_pattern
is None:
316 elif len(write_pattern
[0]) == 2:
318 write_pattern
= [(adr
, data
, 2**(width_from
//8)-1) for adr
, data
in write_pattern
]
320 if read_pattern
is None:
325 def __init__(self
, width_from
, width_to
):
326 self
.master
= AXILiteInterface(data_width
=width_from
)
327 self
.slave
= AXILiteInterface(data_width
=width_to
)
328 self
.submodules
.converter
= AXILiteConverter(self
.master
, self
.slave
)
330 prng
= random
.Random(42)
332 def write_generator(axi_lite
):
333 for addr
, data
, strb
in write_pattern
or []:
334 resp
= (yield from axi_lite
.write(addr
, data
, strb
))
335 self
.assertEqual(resp
, RESP_OKAY
)
336 for _
in range(prng
.randrange(3)):
341 def read_generator(axi_lite
):
342 for addr
, refdata
in read_pattern
or []:
343 data
, resp
= (yield from axi_lite
.read(addr
))
344 self
.assertEqual(resp
, RESP_OKAY
)
345 self
.assertEqual(data
, refdata
)
346 for _
in range(prng
.randrange(3)):
351 def sequential_generator(axi_lite
):
352 yield from write_generator(axi_lite
)
353 yield from read_generator(axi_lite
)
355 def rdata_generator(adr
):
356 for a
, v
in read_expected
:
364 _latency
= (_latency
+ 1) % 3
367 dut
= DUT(width_from
=width_from
, width_to
=width_to
)
368 checker
= AXILiteChecker(ready_latency
=latency
, rdata_generator
=rdata_generator
)
370 generators
= [write_generator(dut
.master
), read_generator(dut
.master
)]
372 generators
= [sequential_generator(dut
.master
)]
373 generators
+= checker
.parallel_handlers(dut
.slave
)
374 run_simulation(dut
, generators
)
375 self
.assertEqual(checker
.writes
, write_expected
)
376 self
.assertEqual(checker
.reads
, read_expected
)
378 def test_axilite_down_converter_32to16(self
):
380 (0x00000000, 0x22221111),
381 (0x00000004, 0x44443333),
382 (0x00000008, 0x66665555),
383 (0x00000100, 0x88887777),
386 (0x00000000, 0x1111, 0b11),
387 (0x00000002, 0x2222, 0b11),
388 (0x00000004, 0x3333, 0b11),
389 (0x00000006, 0x4444, 0b11),
390 (0x00000008, 0x5555, 0b11),
391 (0x0000000a, 0x6666, 0b11),
392 (0x00000100, 0x7777, 0b11),
393 (0x00000102, 0x8888, 0b11),
395 read_pattern
= write_pattern
396 read_expected
= [(adr
, data
) for (adr
, data
, _
) in write_expected
]
397 for parallel
in [False, True]:
398 with self
.subTest(parallel
=parallel
):
399 self
.converter_test(width_from
=32, width_to
=16, parallel_rw
=parallel
,
400 write_pattern
=write_pattern
, write_expected
=write_expected
,
401 read_pattern
=read_pattern
, read_expected
=read_expected
)
403 def test_axilite_down_converter_32to8(self
):
405 (0x00000000, 0x44332211),
406 (0x00000004, 0x88776655),
409 (0x00000000, 0x11, 0b1),
410 (0x00000001, 0x22, 0b1),
411 (0x00000002, 0x33, 0b1),
412 (0x00000003, 0x44, 0b1),
413 (0x00000004, 0x55, 0b1),
414 (0x00000005, 0x66, 0b1),
415 (0x00000006, 0x77, 0b1),
416 (0x00000007, 0x88, 0b1),
418 read_pattern
= write_pattern
419 read_expected
= [(adr
, data
) for (adr
, data
, _
) in write_expected
]
420 for parallel
in [False, True]:
421 with self
.subTest(parallel
=parallel
):
422 self
.converter_test(width_from
=32, width_to
=8, parallel_rw
=parallel
,
423 write_pattern
=write_pattern
, write_expected
=write_expected
,
424 read_pattern
=read_pattern
, read_expected
=read_expected
)
426 def test_axilite_down_converter_64to32(self
):
428 (0x00000000, 0x2222222211111111),
429 (0x00000008, 0x4444444433333333),
432 (0x00000000, 0x11111111, 0b1111),
433 (0x00000004, 0x22222222, 0b1111),
434 (0x00000008, 0x33333333, 0b1111),
435 (0x0000000c, 0x44444444, 0b1111),
437 read_pattern
= write_pattern
438 read_expected
= [(adr
, data
) for (adr
, data
, _
) in write_expected
]
439 for parallel
in [False, True]:
440 with self
.subTest(parallel
=parallel
):
441 self
.converter_test(width_from
=64, width_to
=32, parallel_rw
=parallel
,
442 write_pattern
=write_pattern
, write_expected
=write_expected
,
443 read_pattern
=read_pattern
, read_expected
=read_expected
)
445 def test_axilite_down_converter_strb(self
):
447 (0x00000000, 0x22221111, 0b1100),
448 (0x00000004, 0x44443333, 0b1111),
449 (0x00000008, 0x66665555, 0b1011),
450 (0x00000100, 0x88887777, 0b0011),
453 (0x00000002, 0x2222, 0b11),
454 (0x00000004, 0x3333, 0b11),
455 (0x00000006, 0x4444, 0b11),
456 (0x00000008, 0x5555, 0b11),
457 (0x0000000a, 0x6666, 0b10),
458 (0x00000100, 0x7777, 0b11),
460 self
.converter_test(width_from
=32, width_to
=16,
461 write_pattern
=write_pattern
, write_expected
=write_expected
)
463 def test_axilite_up_converter_16to32(self
):
465 (0x00000000, 0x1111),
466 (0x00000002, 0x2222),
467 (0x00000006, 0x3333),
468 (0x00000004, 0x4444),
469 (0x00000102, 0x5555),
472 (0x00000000, 0x00001111, 0b0011),
473 (0x00000000, 0x22220000, 0b1100),
474 (0x00000004, 0x33330000, 0b1100),
475 (0x00000004, 0x00004444, 0b0011),
476 (0x00000100, 0x55550000, 0b1100),
478 read_pattern
= write_pattern
480 (0x00000000, 0x22221111),
481 (0x00000000, 0x22221111),
482 (0x00000004, 0x33334444),
483 (0x00000004, 0x33334444),
484 (0x00000100, 0x55550000),
486 for parallel
in [False, True]:
487 with self
.subTest(parallel
=parallel
):
488 self
.converter_test(width_from
=16, width_to
=32, parallel_rw
=parallel
,
489 write_pattern
=write_pattern
, write_expected
=write_expected
,
490 read_pattern
=read_pattern
, read_expected
=read_expected
)
492 def test_axilite_up_converter_8to32(self
):
501 (0x00000000, 0x00000011, 0b0001),
502 (0x00000000, 0x00002200, 0b0010),
503 (0x00000000, 0x33000000, 0b1000),
504 (0x00000000, 0x00440000, 0b0100),
505 (0x00000100, 0x00005500, 0b0010),
507 read_pattern
= write_pattern
509 (0x00000000, 0x33442211),
510 (0x00000000, 0x33442211),
511 (0x00000000, 0x33442211),
512 (0x00000000, 0x33442211),
513 (0x00000100, 0x00005500),
515 for parallel
in [False, True]:
516 with self
.subTest(parallel
=parallel
):
517 self
.converter_test(width_from
=8, width_to
=32, parallel_rw
=parallel
,
518 write_pattern
=write_pattern
, write_expected
=write_expected
,
519 read_pattern
=read_pattern
, read_expected
=read_expected
)
521 def test_axilite_up_converter_strb(self
):
523 (0x00000000, 0x1111, 0b10),
524 (0x00000002, 0x2222, 0b11),
525 (0x00000006, 0x3333, 0b11),
526 (0x00000004, 0x4444, 0b01),
527 (0x00000102, 0x5555, 0b01),
530 (0x00000000, 0x00001111, 0b0010),
531 (0x00000000, 0x22220000, 0b1100),
532 (0x00000004, 0x33330000, 0b1100),
533 (0x00000004, 0x00004444, 0b0001),
534 (0x00000100, 0x55550000, 0b0100),
536 self
.converter_test(width_from
=16, width_to
=32,
537 write_pattern
=write_pattern
, write_expected
=write_expected
)
539 # TestAXILiteInterconnet ---------------------------------------------------------------------------
541 class TestAXILiteInterconnect(unittest
.TestCase
):
542 def test_interconnect_p2p(self
):
545 self
.master
= master
= AXILiteInterface()
546 self
.slave
= slave
= AXILiteInterface()
547 self
.submodules
.interconnect
= AXILiteInterconnectPointToPoint(master
, slave
)
550 ("w", 0x00000004, 0x11111111),
551 ("w", 0x0000000c, 0x22222222),
552 ("r", 0x00000010, 0x33333333),
553 ("r", 0x00000018, 0x44444444),
556 def rdata_generator(adr
):
557 for rw
, a
, v
in pattern
:
558 if rw
== "r" and a
== adr
:
563 checker
= AXILiteChecker(rdata_generator
=rdata_generator
)
565 AXILitePatternGenerator(dut
.master
, pattern
).handler(),
566 checker
.handler(dut
.slave
),
568 run_simulation(dut
, generators
)
569 self
.assertEqual(checker
.writes
, [(addr
, data
, 0b1111) for rw
, addr
, data
in pattern
if rw
== "w"])
570 self
.assertEqual(checker
.reads
, [(addr
, data
) for rw
, addr
, data
in pattern
if rw
== "r"])
572 def test_timeout(self
):
575 self
.master
= master
= AXILiteInterface()
576 self
.slave
= slave
= AXILiteInterface()
577 self
.submodules
.interconnect
= AXILiteInterconnectPointToPoint(master
, slave
)
578 self
.submodules
.timeout
= AXILiteTimeout(master
, 16)
580 def generator(axi_lite
):
581 resp
= (yield from axi_lite
.write(0x00001000, 0x11111111))
582 self
.assertEqual(resp
, RESP_OKAY
)
583 resp
= (yield from axi_lite
.write(0x00002000, 0x22222222))
584 self
.assertEqual(resp
, RESP_SLVERR
)
585 data
, resp
= (yield from axi_lite
.read(0x00003000))
586 self
.assertEqual(resp
, RESP_SLVERR
)
587 self
.assertEqual(data
, 0xffffffff)
590 def checker(axi_lite
):
593 yield axi_lite
.aw
.ready
.eq(1)
594 yield axi_lite
.w
.ready
.eq(1)
596 yield axi_lite
.aw
.ready
.eq(0)
597 yield axi_lite
.w
.ready
.eq(0)
598 yield axi_lite
.b
.valid
.eq(1)
600 while not (yield axi_lite
.b
.ready
):
602 yield axi_lite
.b
.valid
.eq(0)
606 generator(dut
.master
),
608 timeout_generator(300),
610 run_simulation(dut
, generators
)
612 def test_arbiter_order(self
):
614 def __init__(self
, n_masters
):
615 self
.masters
= [AXILiteInterface() for _
in range(n_masters
)]
616 self
.slave
= AXILiteInterface()
617 self
.submodules
.arbiter
= AXILiteArbiter(self
.masters
, self
.slave
)
619 def generator(n
, axi_lite
, delay
=0):
624 resp
= (yield from axi_lite
.write(gen(i
), gen(i
)))
625 self
.assertEqual(resp
, RESP_OKAY
)
626 for _
in range(delay
):
629 data
, resp
= (yield from axi_lite
.read(gen(i
)))
630 self
.assertEqual(resp
, RESP_OKAY
)
631 for _
in range(delay
):
638 # with no delay each master will do all transfers at once
639 with self
.subTest(delay
=0):
641 checker
= AXILiteChecker()
642 generators
= [generator(i
, master
, delay
=0) for i
, master
in enumerate(dut
.masters
)]
643 generators
+= [timeout_generator(300), checker
.handler(dut
.slave
)]
644 run_simulation(dut
, generators
)
645 order
= [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
646 self
.assertEqual([addr
for addr
, data
, strb
in checker
.writes
], order
)
647 self
.assertEqual([addr
for addr
, data
in checker
.reads
], order
)
649 # with some delay, the round-robin arbiter will iterate over masters
650 with self
.subTest(delay
=1):
652 checker
= AXILiteChecker()
653 generators
= [generator(i
, master
, delay
=1) for i
, master
in enumerate(dut
.masters
)]
654 generators
+= [timeout_generator(300), checker
.handler(dut
.slave
)]
655 run_simulation(dut
, generators
)
656 order
= [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
657 self
.assertEqual([addr
for addr
, data
, strb
in checker
.writes
], order
)
658 self
.assertEqual([addr
for addr
, data
in checker
.reads
], order
)
660 def test_arbiter_holds_grant_until_response(self
):
662 def __init__(self
, n_masters
):
663 self
.masters
= [AXILiteInterface() for _
in range(n_masters
)]
664 self
.slave
= AXILiteInterface()
665 self
.submodules
.arbiter
= AXILiteArbiter(self
.masters
, self
.slave
)
667 def generator(n
, axi_lite
, delay
=0):
672 resp
= (yield from axi_lite
.write(gen(i
), gen(i
)))
673 self
.assertEqual(resp
, RESP_OKAY
)
674 for _
in range(delay
):
677 data
, resp
= (yield from axi_lite
.read(gen(i
)))
678 self
.assertEqual(resp
, RESP_OKAY
)
679 for _
in range(delay
):
686 # with no delay each master will do all transfers at once
687 with self
.subTest(delay
=0):
689 checker
= AXILiteChecker(response_latency
=lambda: 3)
690 generators
= [generator(i
, master
, delay
=0) for i
, master
in enumerate(dut
.masters
)]
691 generators
+= [timeout_generator(300), checker
.handler(dut
.slave
)]
692 run_simulation(dut
, generators
)
693 order
= [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
694 self
.assertEqual([addr
for addr
, data
, strb
in checker
.writes
], order
)
695 self
.assertEqual([addr
for addr
, data
in checker
.reads
], order
)
697 # with some delay, the round-robin arbiter will iterate over masters
698 with self
.subTest(delay
=1):
700 checker
= AXILiteChecker(response_latency
=lambda: 3)
701 generators
= [generator(i
, master
, delay
=1) for i
, master
in enumerate(dut
.masters
)]
702 generators
+= [timeout_generator(300), checker
.handler(dut
.slave
)]
703 run_simulation(dut
, generators
)
704 order
= [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
705 self
.assertEqual([addr
for addr
, data
, strb
in checker
.writes
], order
)
706 self
.assertEqual([addr
for addr
, data
in checker
.reads
], order
)
708 def address_decoder(self
, i
, size
=0x100, python
=False):
709 # bytes to 32-bit words aligned
711 _origin
= (size
* i
) >> 2
712 if python
: # for python integers
713 shift
= log2_int(_size
)
714 return lambda a
: ((a
>> shift
) == (_origin
>> shift
))
716 return lambda a
: (a
[log2_int(_size
):] == (_origin
>> log2_int(_size
)))
718 def decoder_test(self
, n_slaves
, pattern
, generator_delay
=0):
720 def __init__(self
, decoders
):
721 self
.master
= AXILiteInterface()
722 self
.slaves
= [AXILiteInterface() for _
in range(len(decoders
))]
723 slaves
= list(zip(decoders
, self
.slaves
))
724 self
.submodules
.decoder
= AXILiteDecoder(self
.master
, slaves
)
726 def rdata_generator(adr
):
727 for rw
, a
, v
in pattern
:
728 if rw
== "r" and a
== adr
:
732 dut
= DUT([self
.address_decoder(i
) for i
in range(n_slaves
)])
733 checkers
= [AXILiteChecker(rdata_generator
=rdata_generator
) for _
in dut
.slaves
]
735 generators
= [AXILitePatternGenerator(dut
.master
, pattern
, delay
=generator_delay
).handler()]
736 generators
+= [checker
.handler(slave
) for (slave
, checker
) in zip(dut
.slaves
, checkers
)]
737 generators
+= [timeout_generator(300)]
738 run_simulation(dut
, generators
)
742 def test_decoder_write(self
):
743 for delay
in [0, 1, 0]:
744 with self
.subTest(delay
=delay
):
745 slaves
= self
.decoder_test(n_slaves
=3, pattern
=[
755 ], generator_delay
=delay
)
757 def addr(checker_list
):
758 return [entry
[0] for entry
in checker_list
]
760 self
.assertEqual(addr(slaves
[0].writes
), [0x010, 0x011, 0x012])
761 self
.assertEqual(addr(slaves
[1].writes
), [0x110, 0x111, 0x112])
762 self
.assertEqual(addr(slaves
[2].writes
), [0x210, 0x211, 0x212])
764 self
.assertEqual(slave
.reads
, [])
766 def test_decoder_read(self
):
768 with self
.subTest(delay
=delay
):
769 slaves
= self
.decoder_test(n_slaves
=3, pattern
=[
779 ], generator_delay
=delay
)
781 def addr(checker_list
):
782 return [entry
[0] for entry
in checker_list
]
784 self
.assertEqual(addr(slaves
[0].reads
), [0x010, 0x011, 0x012])
785 self
.assertEqual(addr(slaves
[1].reads
), [0x110, 0x111, 0x112])
786 self
.assertEqual(addr(slaves
[2].reads
), [0x210, 0x211, 0x212])
788 self
.assertEqual(slave
.writes
, [])
790 def test_decoder_read_write(self
):
792 with self
.subTest(delay
=delay
):
793 slaves
= self
.decoder_test(n_slaves
=3, pattern
=[
800 ], generator_delay
=delay
)
802 def addr(checker_list
):
803 return [entry
[0] for entry
in checker_list
]
805 self
.assertEqual(addr(slaves
[0].writes
), [0x010])
806 self
.assertEqual(addr(slaves
[0].reads
), [0x011])
807 self
.assertEqual(addr(slaves
[1].writes
), [0x110])
808 self
.assertEqual(addr(slaves
[1].reads
), [0x111])
809 self
.assertEqual(addr(slaves
[2].writes
), [0x210])
810 self
.assertEqual(addr(slaves
[2].reads
), [0x211])
812 def test_decoder_stall(self
):
813 with self
.assertRaises(TimeoutError
):
814 self
.decoder_test(n_slaves
=3, pattern
=[
817 with self
.assertRaises(TimeoutError
):
818 self
.decoder_test(n_slaves
=3, pattern
=[
822 def interconnect_test(self
, master_patterns
, slave_decoders
,
823 master_delay
=0, slave_ready_latency
=0, slave_response_latency
=0,
824 disconnected_slaves
=None, timeout
=300, interconnect
=AXILiteInterconnectShared
,
826 # number of masters/slaves is defined by the number of patterns/decoders
827 # master_patterns: list of patterns per master, pattern = list(tuple(rw, addr, data))
828 # slave_decoders: list of address decoders per slave
829 # delay/latency: control the speed of masters/slaves
830 # disconnected_slaves: list of slave numbers that shouldn't respond to any transactions
832 def __init__(self
, n_masters
, decoders
, **kwargs
):
833 self
.masters
= [AXILiteInterface(name
="master") for _
in range(n_masters
)]
834 self
.slaves
= [AXILiteInterface(name
="slave") for _
in range(len(decoders
))]
835 slaves
= list(zip(decoders
, self
.slaves
))
836 self
.submodules
.interconnect
= interconnect(self
.masters
, slaves
, **kwargs
)
838 class ReadDataGenerator
:
839 # Generates data based on decoded addresses and data defined in master_patterns
840 def __init__(self
, patterns
):
842 for pattern
in patterns
:
843 for rw
, addr
, val
in pattern
:
845 assert addr
not in self
.mem
849 # on miss will give default data depending on slave n
850 return lambda addr
: self
.mem
.get(addr
, 0xbaad0000 + n
)
852 def new_checker(rdata_generator
):
853 return AXILiteChecker(ready_latency
=slave_ready_latency
,
854 response_latency
=slave_response_latency
,
855 rdata_generator
=rdata_generator
)
858 dut
= DUT(len(master_patterns
), slave_decoders
, **kwargs
)
859 rdata_generator
= ReadDataGenerator(master_patterns
)
860 checkers
= [new_checker(rdata_generator
.getter(i
)) for i
, _
in enumerate(master_patterns
)]
861 pattern_generators
= [AXILitePatternGenerator(dut
.masters
[i
], pattern
, delay
=master_delay
)
862 for i
, pattern
in enumerate(master_patterns
)]
865 generators
= [gen
.handler() for gen
in pattern_generators
]
866 generators
+= [checker
.handler(slave
)
867 for i
, (slave
, checker
) in enumerate(zip(dut
.slaves
, checkers
))
868 if i
not in (disconnected_slaves
or [])]
869 generators
+= [timeout_generator(timeout
)]
870 run_simulation(dut
, generators
)
872 return pattern_generators
, checkers
874 def test_interconnect_shared_basic(self
):
876 [("w", 0x000, 0), ("w", 0x101, 0), ("w", 0x202, 0)],
877 [("w", 0x010, 0), ("w", 0x111, 0), ("w", 0x112, 0)],
878 [("w", 0x220, 0), ("w", 0x221, 0), ("w", 0x222, 0)],
880 slave_decoders
= [self
.address_decoder(i
) for i
in range(3)]
882 generators
, checkers
= self
.interconnect_test(master_patterns
, slave_decoders
,
885 for gen
in generators
:
886 self
.assertEqual(gen
.errors
, 0)
888 def addr(checker_list
):
889 return [entry
[0] for entry
in checker_list
]
891 self
.assertEqual(addr(checkers
[0].writes
), [0x000, 0x010])
892 self
.assertEqual(addr(checkers
[1].writes
), [0x101, 0x111, 0x112])
893 self
.assertEqual(addr(checkers
[2].writes
), [0x220, 0x221, 0x202, 0x222])
894 self
.assertEqual(addr(checkers
[0].reads
), [])
895 self
.assertEqual(addr(checkers
[1].reads
), [])
896 self
.assertEqual(addr(checkers
[2].reads
), [])
898 def interconnect_stress_test(self
, timeout
=1000, **kwargs
):
899 prng
= random
.Random(42)
904 slave_region_size
= 0x10000000
905 # for testing purpose each master will access only its own region of a slave
906 master_region_size
= 0x1000
907 assert n_masters
*master_region_size
< slave_region_size
909 def gen_pattern(n
, length
):
910 assert length
< master_region_size
911 for i_access
in range(length
):
912 rw
= "w" if prng
.randint(0, 1) == 0 else "r"
913 i_slave
= prng
.randrange(n_slaves
)
914 addr
= i_slave
*slave_region_size
+ n
*master_region_size
+ i_access
918 master_patterns
= [list(gen_pattern(i
, pattern_length
)) for i
in range(n_masters
)]
919 slave_decoders
= [self
.address_decoder(i
, size
=slave_region_size
) for i
in range(n_slaves
)]
920 slave_decoders_py
= [self
.address_decoder(i
, size
=slave_region_size
, python
=True)
921 for i
in range(n_slaves
)]
923 generators
, checkers
= self
.interconnect_test(master_patterns
, slave_decoders
,
924 timeout
=timeout
, **kwargs
)
926 for gen
in generators
:
927 read_errors
= [" 0x{:08x} vs 0x{:08x}".format(v
, ref
) for v
, ref
in gen
.read_errors
]
928 msg
= "\ngen.resp_errors = {}\ngen.read_errors = \n{}".format(
929 gen
.resp_errors
, "\n".join(read_errors
))
930 if not kwargs
.get("disconnected_slaves", None):
931 self
.assertEqual(gen
.errors
, 0, msg
=msg
)
932 else: # when some slaves are disconnected we should have some errors
933 self
.assertNotEqual(gen
.errors
, 0, msg
=msg
)
935 # make sure all the accesses at slave side are in correct address region
936 for i_slave
, (checker
, decoder
) in enumerate(zip(checkers
, slave_decoders_py
)):
937 for addr
in (entry
[0] for entry
in checker
.writes
+ checker
.reads
):
938 # compensate for the fact that decoders work on word-aligned addresses
939 self
.assertNotEqual(decoder(addr
>> 2), 0)
941 def test_interconnect_shared_stress_no_delay(self
):
942 self
.interconnect_stress_test(timeout
=1000,
944 slave_ready_latency
=0,
945 slave_response_latency
=0)
947 def test_interconnect_shared_stress_rand_short(self
):
948 prng
= random
.Random(42)
949 rand
= lambda: prng
.randrange(4)
950 self
.interconnect_stress_test(timeout
=2000,
952 slave_ready_latency
=rand
,
953 slave_response_latency
=rand
)
955 def test_interconnect_shared_stress_rand_long(self
):
956 prng
= random
.Random(42)
957 rand
= lambda: prng
.randrange(16)
958 self
.interconnect_stress_test(timeout
=4000,
960 slave_ready_latency
=rand
,
961 slave_response_latency
=rand
)
963 def test_interconnect_shared_stress_timeout(self
):
964 self
.interconnect_stress_test(timeout
=4000,
965 disconnected_slaves
=[1],
968 def test_crossbar_stress_no_delay(self
):
969 self
.interconnect_stress_test(timeout
=1000,
971 slave_ready_latency
=0,
972 slave_response_latency
=0,
973 interconnect
=AXILiteCrossbar
)
975 def test_crossbar_stress_rand(self
):
976 prng
= random
.Random(42)
977 rand
= lambda: prng
.randrange(4)
978 self
.interconnect_stress_test(timeout
=2000,
980 slave_ready_latency
=rand
,
981 slave_response_latency
=rand
,
982 interconnect
=AXILiteCrossbar
)