1 # This file is Copyright (c) 2020 Antmicro <www.antmicro.com>
9 from litex
.soc
.interconnect
.axi
import *
10 from litex
.soc
.interconnect
import wishbone
, csr_bus
12 # Helpers ------------------------------------------------------------------------------------------
14 def _int_or_call(int_or_func
):
15 if callable(int_or_func
):
20 def timeout_generator(ticks
):
22 for i
in range(ticks
):
23 if os
.environ
.get("TIMEOUT_DEBUG", "") == "1":
24 print("tick {}".format(i
))
26 raise TimeoutError("Timeout after %d ticks" % ticks
)
29 def __init__(self
, ready_latency
=0, response_latency
=0, rdata_generator
=None):
30 self
.ready_latency
= ready_latency
31 self
.response_latency
= response_latency
32 self
.rdata_generator
= rdata_generator
or (lambda adr
: 0xbaadc0de)
33 self
.writes
= [] # (addr, data, strb)
34 self
.reads
= [] # (addr, data)
36 def delay(self
, latency
):
37 for _
in range(_int_or_call(latency
)):
40 def handle_write(self
, axi_lite
):
42 while not (yield axi_lite
.aw
.valid
):
44 yield from self
.delay(self
.ready_latency
)
45 addr
= (yield axi_lite
.aw
.addr
)
46 yield axi_lite
.aw
.ready
.eq(1)
48 yield axi_lite
.aw
.ready
.eq(0)
49 while not (yield axi_lite
.w
.valid
):
51 yield from self
.delay(self
.ready_latency
)
53 data
= (yield axi_lite
.w
.data
)
54 strb
= (yield axi_lite
.w
.strb
)
55 yield axi_lite
.w
.ready
.eq(1)
57 yield axi_lite
.w
.ready
.eq(0)
58 yield from self
.delay(self
.response_latency
)
60 yield axi_lite
.b
.valid
.eq(1)
61 yield axi_lite
.b
.resp
.eq(RESP_OKAY
)
63 while not (yield axi_lite
.b
.ready
):
65 yield axi_lite
.b
.valid
.eq(0)
66 self
.writes
.append((addr
, data
, strb
))
68 def handle_read(self
, axi_lite
):
70 while not (yield axi_lite
.ar
.valid
):
72 yield from self
.delay(self
.ready_latency
)
73 addr
= (yield axi_lite
.ar
.addr
)
74 yield axi_lite
.ar
.ready
.eq(1)
76 yield axi_lite
.ar
.ready
.eq(0)
77 yield from self
.delay(self
.response_latency
)
79 data
= self
.rdata_generator(addr
)
80 yield axi_lite
.r
.valid
.eq(1)
81 yield axi_lite
.r
.resp
.eq(RESP_OKAY
)
82 yield axi_lite
.r
.data
.eq(data
)
84 while not (yield axi_lite
.r
.ready
):
86 yield axi_lite
.r
.valid
.eq(0)
87 yield axi_lite
.r
.data
.eq(0)
88 self
.reads
.append((addr
, data
))
91 def handler(self
, axi_lite
):
93 if (yield axi_lite
.aw
.valid
):
94 yield from self
.handle_write(axi_lite
)
95 if (yield axi_lite
.ar
.valid
):
96 yield from self
.handle_read(axi_lite
)
100 def _write_handler(self
, axi_lite
):
102 yield from self
.handle_write(axi_lite
)
106 def _read_handler(self
, axi_lite
):
108 yield from self
.handle_read(axi_lite
)
111 def parallel_handlers(self
, axi_lite
):
112 return self
._write
_handler
(axi_lite
), self
._read
_handler
(axi_lite
)
114 class AXILitePatternGenerator
:
115 def __init__(self
, axi_lite
, pattern
, delay
=0):
116 # patter: (rw, addr, data)
117 self
.axi_lite
= axi_lite
118 self
.pattern
= pattern
121 self
.read_errors
= []
122 self
.resp_errors
= {"w": 0, "r": 0}
125 for rw
, addr
, data
in self
.pattern
:
126 assert rw
in ["w", "r"]
128 strb
= 2**len(self
.axi_lite
.w
.strb
) - 1
129 resp
= (yield from self
.axi_lite
.write(addr
, data
, strb
))
131 rdata
, resp
= (yield from self
.axi_lite
.read(addr
))
133 self
.read_errors
.append((rdata
, data
))
135 if resp
!= RESP_OKAY
:
136 self
.resp_errors
[rw
] += 1
138 for _
in range(_int_or_call(self
.delay
)):
143 # TestAXILite --------------------------------------------------------------------------------------
145 class TestAXILite(unittest
.TestCase
):
146 def test_wishbone2axi2wishbone(self
):
149 self
.wishbone
= wishbone
.Interface(data_width
=32, adr_width
=30)
153 axi
= AXILiteInterface(data_width
=32, address_width
=32)
154 wb
= wishbone
.Interface(data_width
=32, adr_width
=30)
156 wishbone2axi
= Wishbone2AXILite(self
.wishbone
, axi
)
157 axi2wishbone
= AXILite2Wishbone(axi
, wb
)
158 self
.submodules
+= wishbone2axi
, axi2wishbone
160 sram
= wishbone
.SRAM(1024, init
=[0x12345678, 0xa55aa55a])
161 self
.submodules
+= sram
162 self
.comb
+= wb
.connect(sram
.bus
)
166 if (yield from dut
.wishbone
.read(0)) != 0x12345678:
168 if (yield from dut
.wishbone
.read(1)) != 0xa55aa55a:
171 yield from dut
.wishbone
.write(i
, i
)
173 if (yield from dut
.wishbone
.read(i
)) != i
:
177 run_simulation(dut
, [generator(dut
)])
178 self
.assertEqual(dut
.errors
, 0)
180 def test_axilite2axi2mem(self
):
182 def __init__(self
, mem_bus
="wishbone"):
183 self
.axi_lite
= AXILiteInterface()
186 self
.submodules
.axil2axi
= AXILite2AXI(self
.axi_lite
, axi
)
188 interface_cls
, converter_cls
, sram_cls
= {
189 "wishbone": (wishbone
.Interface
, AXI2Wishbone
, wishbone
.SRAM
),
190 "axi_lite": (AXILiteInterface
, AXI2AXILite
, AXILiteSRAM
),
193 bus_kwargs
= {"adr_width" : 30} if mem_bus
== "wishbone" else {}
194 bus
= interface_cls(**bus_kwargs
)
195 self
.submodules
+= converter_cls(axi
, bus
)
196 sram
= sram_cls(1024, init
=[0x12345678, 0xa55aa55a])
197 self
.submodules
+= sram
198 self
.comb
+= bus
.connect(sram
.bus
)
200 def generator(axi_lite
, datas
, resps
):
201 data
, resp
= (yield from axi_lite
.read(0x00))
202 resps
.append((resp
, RESP_OKAY
))
203 datas
.append((data
, 0x12345678))
204 data
, resp
= (yield from axi_lite
.read(0x04))
205 resps
.append((resp
, RESP_OKAY
))
206 datas
.append((data
, 0xa55aa55a))
208 resp
= (yield from axi_lite
.write(4*i
, i
))
209 resps
.append((resp
, RESP_OKAY
))
211 data
, resp
= (yield from axi_lite
.read(4*i
))
212 resps
.append((resp
, RESP_OKAY
))
213 datas
.append((data
, i
))
215 for mem_bus
in ["wishbone", "axi_lite"]:
216 with self
.subTest(mem_bus
=mem_bus
):
217 # to have more verbose error messages store errors in list((actual, expected))
221 def actual_expected(results
): # split into (list(actual), list(expected))
222 return list(zip(*results
))
225 run_simulation(dut
, [generator(dut
.axi_lite
, datas
, resps
)])
226 self
.assertEqual(*actual_expected(resps
))
227 msg
= "\n".join("0x{:08x} vs 0x{:08x}".format(actual
, expected
) for actual
, expected
in datas
)
228 self
.assertEqual(*actual_expected(datas
), msg
="actual vs expected:\n" + msg
)
230 def test_axilite2csr(self
):
232 def csr_mem_handler(csr
, mem
):
234 adr
= (yield csr
.adr
)
235 yield csr
.dat_r
.eq(mem
[adr
])
237 mem
[adr
] = (yield csr
.dat_w
)
242 self
.axi_lite
= AXILiteInterface()
243 self
.csr
= csr_bus
.Interface()
244 self
.submodules
.axilite2csr
= AXILite2CSR(self
.axi_lite
, self
.csr
)
247 prng
= random
.Random(42)
248 mem_ref
= [prng
.randrange(255) for i
in range(100)]
253 for adr
, ref
in enumerate(mem_ref
):
255 data
, resp
= (yield from dut
.axi_lite
.read(adr
))
256 self
.assertEqual(resp
, 0b00)
260 write_data
= [prng
.randrange(255) for _
in mem_ref
]
262 for adr
, wdata
in enumerate(write_data
):
264 resp
= (yield from dut
.axi_lite
.write(adr
, wdata
))
265 self
.assertEqual(resp
, 0b00)
266 rdata
, resp
= (yield from dut
.axi_lite
.read(adr
))
267 self
.assertEqual(resp
, 0b00)
272 mem
= [v
for v
in mem_ref
]
273 run_simulation(dut
, [generator(dut
), csr_mem_handler(dut
.csr
, mem
)])
274 self
.assertEqual(dut
.errors
, 0)
276 def test_axilite_sram(self
):
278 def __init__(self
, size
, init
):
279 self
.axi_lite
= AXILiteInterface()
280 self
.submodules
.sram
= AXILiteSRAM(size
, init
=init
, bus
=self
.axi_lite
)
283 def generator(dut
, ref_init
):
284 for adr
, ref
in enumerate(ref_init
):
286 data
, resp
= (yield from dut
.axi_lite
.read(adr
))
287 self
.assertEqual(resp
, 0b00)
291 write_data
= [prng
.randrange(255) for _
in ref_init
]
293 for adr
, wdata
in enumerate(write_data
):
295 resp
= (yield from dut
.axi_lite
.write(adr
, wdata
))
296 self
.assertEqual(resp
, 0b00)
297 rdata
, resp
= (yield from dut
.axi_lite
.read(adr
))
298 self
.assertEqual(resp
, 0b00)
302 prng
= random
.Random(42)
303 init
= [prng
.randrange(2**32) for i
in range(100)]
305 dut
= DUT(size
=len(init
)*4, init
=[v
for v
in init
])
306 run_simulation(dut
, [generator(dut
, init
)])
307 self
.assertEqual(dut
.errors
, 0)
309 def converter_test(self
, width_from
, width_to
, parallel_rw
=False,
310 write_pattern
=None, write_expected
=None,
311 read_pattern
=None, read_expected
=None):
312 assert not (write_pattern
is None and read_pattern
is None)
314 if write_pattern
is None:
317 elif len(write_pattern
[0]) == 2:
319 write_pattern
= [(adr
, data
, 2**(width_from
//8)-1) for adr
, data
in write_pattern
]
321 if read_pattern
is None:
326 def __init__(self
, width_from
, width_to
):
327 self
.master
= AXILiteInterface(data_width
=width_from
)
328 self
.slave
= AXILiteInterface(data_width
=width_to
)
329 self
.submodules
.converter
= AXILiteConverter(self
.master
, self
.slave
)
331 prng
= random
.Random(42)
333 def write_generator(axi_lite
):
334 for addr
, data
, strb
in write_pattern
or []:
335 resp
= (yield from axi_lite
.write(addr
, data
, strb
))
336 self
.assertEqual(resp
, RESP_OKAY
)
337 for _
in range(prng
.randrange(3)):
342 def read_generator(axi_lite
):
343 for addr
, refdata
in read_pattern
or []:
344 data
, resp
= (yield from axi_lite
.read(addr
))
345 self
.assertEqual(resp
, RESP_OKAY
)
346 self
.assertEqual(data
, refdata
)
347 for _
in range(prng
.randrange(3)):
352 def sequential_generator(axi_lite
):
353 yield from write_generator(axi_lite
)
354 yield from read_generator(axi_lite
)
356 def rdata_generator(adr
):
357 for a
, v
in read_expected
:
365 _latency
= (_latency
+ 1) % 3
368 dut
= DUT(width_from
=width_from
, width_to
=width_to
)
369 checker
= AXILiteChecker(ready_latency
=latency
, rdata_generator
=rdata_generator
)
371 generators
= [write_generator(dut
.master
), read_generator(dut
.master
)]
373 generators
= [sequential_generator(dut
.master
)]
374 generators
+= checker
.parallel_handlers(dut
.slave
)
375 run_simulation(dut
, generators
)
376 self
.assertEqual(checker
.writes
, write_expected
)
377 self
.assertEqual(checker
.reads
, read_expected
)
379 def test_axilite_down_converter_32to16(self
):
381 (0x00000000, 0x22221111),
382 (0x00000004, 0x44443333),
383 (0x00000008, 0x66665555),
384 (0x00000100, 0x88887777),
387 (0x00000000, 0x1111, 0b11),
388 (0x00000002, 0x2222, 0b11),
389 (0x00000004, 0x3333, 0b11),
390 (0x00000006, 0x4444, 0b11),
391 (0x00000008, 0x5555, 0b11),
392 (0x0000000a, 0x6666, 0b11),
393 (0x00000100, 0x7777, 0b11),
394 (0x00000102, 0x8888, 0b11),
396 read_pattern
= write_pattern
397 read_expected
= [(adr
, data
) for (adr
, data
, _
) in write_expected
]
398 for parallel
in [False, True]:
399 with self
.subTest(parallel
=parallel
):
400 self
.converter_test(width_from
=32, width_to
=16, parallel_rw
=parallel
,
401 write_pattern
=write_pattern
, write_expected
=write_expected
,
402 read_pattern
=read_pattern
, read_expected
=read_expected
)
404 def test_axilite_down_converter_32to8(self
):
406 (0x00000000, 0x44332211),
407 (0x00000004, 0x88776655),
410 (0x00000000, 0x11, 0b1),
411 (0x00000001, 0x22, 0b1),
412 (0x00000002, 0x33, 0b1),
413 (0x00000003, 0x44, 0b1),
414 (0x00000004, 0x55, 0b1),
415 (0x00000005, 0x66, 0b1),
416 (0x00000006, 0x77, 0b1),
417 (0x00000007, 0x88, 0b1),
419 read_pattern
= write_pattern
420 read_expected
= [(adr
, data
) for (adr
, data
, _
) in write_expected
]
421 for parallel
in [False, True]:
422 with self
.subTest(parallel
=parallel
):
423 self
.converter_test(width_from
=32, width_to
=8, parallel_rw
=parallel
,
424 write_pattern
=write_pattern
, write_expected
=write_expected
,
425 read_pattern
=read_pattern
, read_expected
=read_expected
)
427 def test_axilite_down_converter_64to32(self
):
429 (0x00000000, 0x2222222211111111),
430 (0x00000008, 0x4444444433333333),
433 (0x00000000, 0x11111111, 0b1111),
434 (0x00000004, 0x22222222, 0b1111),
435 (0x00000008, 0x33333333, 0b1111),
436 (0x0000000c, 0x44444444, 0b1111),
438 read_pattern
= write_pattern
439 read_expected
= [(adr
, data
) for (adr
, data
, _
) in write_expected
]
440 for parallel
in [False, True]:
441 with self
.subTest(parallel
=parallel
):
442 self
.converter_test(width_from
=64, width_to
=32, parallel_rw
=parallel
,
443 write_pattern
=write_pattern
, write_expected
=write_expected
,
444 read_pattern
=read_pattern
, read_expected
=read_expected
)
446 def test_axilite_down_converter_strb(self
):
448 (0x00000000, 0x22221111, 0b1100),
449 (0x00000004, 0x44443333, 0b1111),
450 (0x00000008, 0x66665555, 0b1011),
451 (0x00000100, 0x88887777, 0b0011),
454 (0x00000002, 0x2222, 0b11),
455 (0x00000004, 0x3333, 0b11),
456 (0x00000006, 0x4444, 0b11),
457 (0x00000008, 0x5555, 0b11),
458 (0x0000000a, 0x6666, 0b10),
459 (0x00000100, 0x7777, 0b11),
461 self
.converter_test(width_from
=32, width_to
=16,
462 write_pattern
=write_pattern
, write_expected
=write_expected
)
464 def test_axilite_up_converter_16to32(self
):
466 (0x00000000, 0x1111),
467 (0x00000002, 0x2222),
468 (0x00000006, 0x3333),
469 (0x00000004, 0x4444),
470 (0x00000102, 0x5555),
473 (0x00000000, 0x00001111, 0b0011),
474 (0x00000000, 0x22220000, 0b1100),
475 (0x00000004, 0x33330000, 0b1100),
476 (0x00000004, 0x00004444, 0b0011),
477 (0x00000100, 0x55550000, 0b1100),
479 read_pattern
= write_pattern
481 (0x00000000, 0x22221111),
482 (0x00000000, 0x22221111),
483 (0x00000004, 0x33334444),
484 (0x00000004, 0x33334444),
485 (0x00000100, 0x55550000),
487 for parallel
in [False, True]:
488 with self
.subTest(parallel
=parallel
):
489 self
.converter_test(width_from
=16, width_to
=32, parallel_rw
=parallel
,
490 write_pattern
=write_pattern
, write_expected
=write_expected
,
491 read_pattern
=read_pattern
, read_expected
=read_expected
)
493 def test_axilite_up_converter_8to32(self
):
502 (0x00000000, 0x00000011, 0b0001),
503 (0x00000000, 0x00002200, 0b0010),
504 (0x00000000, 0x33000000, 0b1000),
505 (0x00000000, 0x00440000, 0b0100),
506 (0x00000100, 0x00005500, 0b0010),
508 read_pattern
= write_pattern
510 (0x00000000, 0x33442211),
511 (0x00000000, 0x33442211),
512 (0x00000000, 0x33442211),
513 (0x00000000, 0x33442211),
514 (0x00000100, 0x00005500),
516 for parallel
in [False, True]:
517 with self
.subTest(parallel
=parallel
):
518 self
.converter_test(width_from
=8, width_to
=32, parallel_rw
=parallel
,
519 write_pattern
=write_pattern
, write_expected
=write_expected
,
520 read_pattern
=read_pattern
, read_expected
=read_expected
)
522 def test_axilite_up_converter_strb(self
):
524 (0x00000000, 0x1111, 0b10),
525 (0x00000002, 0x2222, 0b11),
526 (0x00000006, 0x3333, 0b11),
527 (0x00000004, 0x4444, 0b01),
528 (0x00000102, 0x5555, 0b01),
531 (0x00000000, 0x00001111, 0b0010),
532 (0x00000000, 0x22220000, 0b1100),
533 (0x00000004, 0x33330000, 0b1100),
534 (0x00000004, 0x00004444, 0b0001),
535 (0x00000100, 0x55550000, 0b0100),
537 self
.converter_test(width_from
=16, width_to
=32,
538 write_pattern
=write_pattern
, write_expected
=write_expected
)
540 # TestAXILiteInterconnet ---------------------------------------------------------------------------
542 class TestAXILiteInterconnect(unittest
.TestCase
):
543 def test_interconnect_p2p(self
):
546 self
.master
= master
= AXILiteInterface()
547 self
.slave
= slave
= AXILiteInterface()
548 self
.submodules
.interconnect
= AXILiteInterconnectPointToPoint(master
, slave
)
551 ("w", 0x00000004, 0x11111111),
552 ("w", 0x0000000c, 0x22222222),
553 ("r", 0x00000010, 0x33333333),
554 ("r", 0x00000018, 0x44444444),
557 def rdata_generator(adr
):
558 for rw
, a
, v
in pattern
:
559 if rw
== "r" and a
== adr
:
564 checker
= AXILiteChecker(rdata_generator
=rdata_generator
)
566 AXILitePatternGenerator(dut
.master
, pattern
).handler(),
567 checker
.handler(dut
.slave
),
569 run_simulation(dut
, generators
)
570 self
.assertEqual(checker
.writes
, [(addr
, data
, 0b1111) for rw
, addr
, data
in pattern
if rw
== "w"])
571 self
.assertEqual(checker
.reads
, [(addr
, data
) for rw
, addr
, data
in pattern
if rw
== "r"])
573 def test_timeout(self
):
576 self
.master
= master
= AXILiteInterface()
577 self
.slave
= slave
= AXILiteInterface()
578 self
.submodules
.interconnect
= AXILiteInterconnectPointToPoint(master
, slave
)
579 self
.submodules
.timeout
= AXILiteTimeout(master
, 16)
581 def generator(axi_lite
):
582 resp
= (yield from axi_lite
.write(0x00001000, 0x11111111))
583 self
.assertEqual(resp
, RESP_OKAY
)
584 resp
= (yield from axi_lite
.write(0x00002000, 0x22222222))
585 self
.assertEqual(resp
, RESP_SLVERR
)
586 data
, resp
= (yield from axi_lite
.read(0x00003000))
587 self
.assertEqual(resp
, RESP_SLVERR
)
588 self
.assertEqual(data
, 0xffffffff)
591 def checker(axi_lite
):
594 yield axi_lite
.aw
.ready
.eq(1)
595 yield axi_lite
.w
.ready
.eq(1)
597 yield axi_lite
.aw
.ready
.eq(0)
598 yield axi_lite
.w
.ready
.eq(0)
599 yield axi_lite
.b
.valid
.eq(1)
601 while not (yield axi_lite
.b
.ready
):
603 yield axi_lite
.b
.valid
.eq(0)
607 generator(dut
.master
),
609 timeout_generator(300),
611 run_simulation(dut
, generators
)
613 def test_arbiter_order(self
):
615 def __init__(self
, n_masters
):
616 self
.masters
= [AXILiteInterface() for _
in range(n_masters
)]
617 self
.slave
= AXILiteInterface()
618 self
.submodules
.arbiter
= AXILiteArbiter(self
.masters
, self
.slave
)
620 def generator(n
, axi_lite
, delay
=0):
625 resp
= (yield from axi_lite
.write(gen(i
), gen(i
)))
626 self
.assertEqual(resp
, RESP_OKAY
)
627 for _
in range(delay
):
630 data
, resp
= (yield from axi_lite
.read(gen(i
)))
631 self
.assertEqual(resp
, RESP_OKAY
)
632 for _
in range(delay
):
639 # with no delay each master will do all transfers at once
640 with self
.subTest(delay
=0):
642 checker
= AXILiteChecker()
643 generators
= [generator(i
, master
, delay
=0) for i
, master
in enumerate(dut
.masters
)]
644 generators
+= [timeout_generator(300), checker
.handler(dut
.slave
)]
645 run_simulation(dut
, generators
)
646 order
= [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
647 self
.assertEqual([addr
for addr
, data
, strb
in checker
.writes
], order
)
648 self
.assertEqual([addr
for addr
, data
in checker
.reads
], order
)
650 # with some delay, the round-robin arbiter will iterate over masters
651 with self
.subTest(delay
=1):
653 checker
= AXILiteChecker()
654 generators
= [generator(i
, master
, delay
=1) for i
, master
in enumerate(dut
.masters
)]
655 generators
+= [timeout_generator(300), checker
.handler(dut
.slave
)]
656 run_simulation(dut
, generators
)
657 order
= [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
658 self
.assertEqual([addr
for addr
, data
, strb
in checker
.writes
], order
)
659 self
.assertEqual([addr
for addr
, data
in checker
.reads
], order
)
661 def test_arbiter_holds_grant_until_response(self
):
663 def __init__(self
, n_masters
):
664 self
.masters
= [AXILiteInterface() for _
in range(n_masters
)]
665 self
.slave
= AXILiteInterface()
666 self
.submodules
.arbiter
= AXILiteArbiter(self
.masters
, self
.slave
)
668 def generator(n
, axi_lite
, delay
=0):
673 resp
= (yield from axi_lite
.write(gen(i
), gen(i
)))
674 self
.assertEqual(resp
, RESP_OKAY
)
675 for _
in range(delay
):
678 data
, resp
= (yield from axi_lite
.read(gen(i
)))
679 self
.assertEqual(resp
, RESP_OKAY
)
680 for _
in range(delay
):
687 # with no delay each master will do all transfers at once
688 with self
.subTest(delay
=0):
690 checker
= AXILiteChecker(response_latency
=lambda: 3)
691 generators
= [generator(i
, master
, delay
=0) for i
, master
in enumerate(dut
.masters
)]
692 generators
+= [timeout_generator(300), checker
.handler(dut
.slave
)]
693 run_simulation(dut
, generators
)
694 order
= [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
695 self
.assertEqual([addr
for addr
, data
, strb
in checker
.writes
], order
)
696 self
.assertEqual([addr
for addr
, data
in checker
.reads
], order
)
698 # with some delay, the round-robin arbiter will iterate over masters
699 with self
.subTest(delay
=1):
701 checker
= AXILiteChecker(response_latency
=lambda: 3)
702 generators
= [generator(i
, master
, delay
=1) for i
, master
in enumerate(dut
.masters
)]
703 generators
+= [timeout_generator(300), checker
.handler(dut
.slave
)]
704 run_simulation(dut
, generators
)
705 order
= [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
706 self
.assertEqual([addr
for addr
, data
, strb
in checker
.writes
], order
)
707 self
.assertEqual([addr
for addr
, data
in checker
.reads
], order
)
709 def address_decoder(self
, i
, size
=0x100, python
=False):
710 # bytes to 32-bit words aligned
712 _origin
= (size
* i
) >> 2
713 if python
: # for python integers
714 shift
= log2_int(_size
)
715 return lambda a
: ((a
>> shift
) == (_origin
>> shift
))
717 return lambda a
: (a
[log2_int(_size
):] == (_origin
>> log2_int(_size
)))
719 def decoder_test(self
, n_slaves
, pattern
, generator_delay
=0):
721 def __init__(self
, decoders
):
722 self
.master
= AXILiteInterface()
723 self
.slaves
= [AXILiteInterface() for _
in range(len(decoders
))]
724 slaves
= list(zip(decoders
, self
.slaves
))
725 self
.submodules
.decoder
= AXILiteDecoder(self
.master
, slaves
)
727 def rdata_generator(adr
):
728 for rw
, a
, v
in pattern
:
729 if rw
== "r" and a
== adr
:
733 dut
= DUT([self
.address_decoder(i
) for i
in range(n_slaves
)])
734 checkers
= [AXILiteChecker(rdata_generator
=rdata_generator
) for _
in dut
.slaves
]
736 generators
= [AXILitePatternGenerator(dut
.master
, pattern
, delay
=generator_delay
).handler()]
737 generators
+= [checker
.handler(slave
) for (slave
, checker
) in zip(dut
.slaves
, checkers
)]
738 generators
+= [timeout_generator(300)]
739 run_simulation(dut
, generators
)
743 def test_decoder_write(self
):
744 for delay
in [0, 1, 0]:
745 with self
.subTest(delay
=delay
):
746 slaves
= self
.decoder_test(n_slaves
=3, pattern
=[
756 ], generator_delay
=delay
)
758 def addr(checker_list
):
759 return [entry
[0] for entry
in checker_list
]
761 self
.assertEqual(addr(slaves
[0].writes
), [0x010, 0x011, 0x012])
762 self
.assertEqual(addr(slaves
[1].writes
), [0x110, 0x111, 0x112])
763 self
.assertEqual(addr(slaves
[2].writes
), [0x210, 0x211, 0x212])
765 self
.assertEqual(slave
.reads
, [])
767 def test_decoder_read(self
):
769 with self
.subTest(delay
=delay
):
770 slaves
= self
.decoder_test(n_slaves
=3, pattern
=[
780 ], generator_delay
=delay
)
782 def addr(checker_list
):
783 return [entry
[0] for entry
in checker_list
]
785 self
.assertEqual(addr(slaves
[0].reads
), [0x010, 0x011, 0x012])
786 self
.assertEqual(addr(slaves
[1].reads
), [0x110, 0x111, 0x112])
787 self
.assertEqual(addr(slaves
[2].reads
), [0x210, 0x211, 0x212])
789 self
.assertEqual(slave
.writes
, [])
791 def test_decoder_read_write(self
):
793 with self
.subTest(delay
=delay
):
794 slaves
= self
.decoder_test(n_slaves
=3, pattern
=[
801 ], generator_delay
=delay
)
803 def addr(checker_list
):
804 return [entry
[0] for entry
in checker_list
]
806 self
.assertEqual(addr(slaves
[0].writes
), [0x010])
807 self
.assertEqual(addr(slaves
[0].reads
), [0x011])
808 self
.assertEqual(addr(slaves
[1].writes
), [0x110])
809 self
.assertEqual(addr(slaves
[1].reads
), [0x111])
810 self
.assertEqual(addr(slaves
[2].writes
), [0x210])
811 self
.assertEqual(addr(slaves
[2].reads
), [0x211])
813 def test_decoder_stall(self
):
814 with self
.assertRaises(TimeoutError
):
815 self
.decoder_test(n_slaves
=3, pattern
=[
818 with self
.assertRaises(TimeoutError
):
819 self
.decoder_test(n_slaves
=3, pattern
=[
823 def interconnect_test(self
, master_patterns
, slave_decoders
,
824 master_delay
=0, slave_ready_latency
=0, slave_response_latency
=0,
825 disconnected_slaves
=None, timeout
=300, interconnect
=AXILiteInterconnectShared
,
827 # number of masters/slaves is defined by the number of patterns/decoders
828 # master_patterns: list of patterns per master, pattern = list(tuple(rw, addr, data))
829 # slave_decoders: list of address decoders per slave
830 # delay/latency: control the speed of masters/slaves
831 # disconnected_slaves: list of slave numbers that shouldn't respond to any transactions
833 def __init__(self
, n_masters
, decoders
, **kwargs
):
834 self
.masters
= [AXILiteInterface(name
="master") for _
in range(n_masters
)]
835 self
.slaves
= [AXILiteInterface(name
="slave") for _
in range(len(decoders
))]
836 slaves
= list(zip(decoders
, self
.slaves
))
837 self
.submodules
.interconnect
= interconnect(self
.masters
, slaves
, **kwargs
)
839 class ReadDataGenerator
:
840 # Generates data based on decoded addresses and data defined in master_patterns
841 def __init__(self
, patterns
):
843 for pattern
in patterns
:
844 for rw
, addr
, val
in pattern
:
846 assert addr
not in self
.mem
850 # on miss will give default data depending on slave n
851 return lambda addr
: self
.mem
.get(addr
, 0xbaad0000 + n
)
853 def new_checker(rdata_generator
):
854 return AXILiteChecker(ready_latency
=slave_ready_latency
,
855 response_latency
=slave_response_latency
,
856 rdata_generator
=rdata_generator
)
859 dut
= DUT(len(master_patterns
), slave_decoders
, **kwargs
)
860 rdata_generator
= ReadDataGenerator(master_patterns
)
861 checkers
= [new_checker(rdata_generator
.getter(i
)) for i
, _
in enumerate(master_patterns
)]
862 pattern_generators
= [AXILitePatternGenerator(dut
.masters
[i
], pattern
, delay
=master_delay
)
863 for i
, pattern
in enumerate(master_patterns
)]
866 generators
= [gen
.handler() for gen
in pattern_generators
]
867 generators
+= [checker
.handler(slave
)
868 for i
, (slave
, checker
) in enumerate(zip(dut
.slaves
, checkers
))
869 if i
not in (disconnected_slaves
or [])]
870 generators
+= [timeout_generator(timeout
)]
871 run_simulation(dut
, generators
)
873 return pattern_generators
, checkers
875 def test_interconnect_shared_basic(self
):
877 [("w", 0x000, 0), ("w", 0x101, 0), ("w", 0x202, 0)],
878 [("w", 0x010, 0), ("w", 0x111, 0), ("w", 0x112, 0)],
879 [("w", 0x220, 0), ("w", 0x221, 0), ("w", 0x222, 0)],
881 slave_decoders
= [self
.address_decoder(i
) for i
in range(3)]
883 generators
, checkers
= self
.interconnect_test(master_patterns
, slave_decoders
,
886 for gen
in generators
:
887 self
.assertEqual(gen
.errors
, 0)
889 def addr(checker_list
):
890 return [entry
[0] for entry
in checker_list
]
892 self
.assertEqual(addr(checkers
[0].writes
), [0x000, 0x010])
893 self
.assertEqual(addr(checkers
[1].writes
), [0x101, 0x111, 0x112])
894 self
.assertEqual(addr(checkers
[2].writes
), [0x220, 0x221, 0x202, 0x222])
895 self
.assertEqual(addr(checkers
[0].reads
), [])
896 self
.assertEqual(addr(checkers
[1].reads
), [])
897 self
.assertEqual(addr(checkers
[2].reads
), [])
899 def interconnect_stress_test(self
, timeout
=1000, **kwargs
):
900 prng
= random
.Random(42)
905 slave_region_size
= 0x10000000
906 # for testing purpose each master will access only its own region of a slave
907 master_region_size
= 0x1000
908 assert n_masters
*master_region_size
< slave_region_size
910 def gen_pattern(n
, length
):
911 assert length
< master_region_size
912 for i_access
in range(length
):
913 rw
= "w" if prng
.randint(0, 1) == 0 else "r"
914 i_slave
= prng
.randrange(n_slaves
)
915 addr
= i_slave
*slave_region_size
+ n
*master_region_size
+ i_access
919 master_patterns
= [list(gen_pattern(i
, pattern_length
)) for i
in range(n_masters
)]
920 slave_decoders
= [self
.address_decoder(i
, size
=slave_region_size
) for i
in range(n_slaves
)]
921 slave_decoders_py
= [self
.address_decoder(i
, size
=slave_region_size
, python
=True)
922 for i
in range(n_slaves
)]
924 generators
, checkers
= self
.interconnect_test(master_patterns
, slave_decoders
,
925 timeout
=timeout
, **kwargs
)
927 for gen
in generators
:
928 read_errors
= [" 0x{:08x} vs 0x{:08x}".format(v
, ref
) for v
, ref
in gen
.read_errors
]
929 msg
= "\ngen.resp_errors = {}\ngen.read_errors = \n{}".format(
930 gen
.resp_errors
, "\n".join(read_errors
))
931 if not kwargs
.get("disconnected_slaves", None):
932 self
.assertEqual(gen
.errors
, 0, msg
=msg
)
933 else: # when some slaves are disconnected we should have some errors
934 self
.assertNotEqual(gen
.errors
, 0, msg
=msg
)
936 # make sure all the accesses at slave side are in correct address region
937 for i_slave
, (checker
, decoder
) in enumerate(zip(checkers
, slave_decoders_py
)):
938 for addr
in (entry
[0] for entry
in checker
.writes
+ checker
.reads
):
939 # compensate for the fact that decoders work on word-aligned addresses
940 self
.assertNotEqual(decoder(addr
>> 2), 0)
942 def test_interconnect_shared_stress_no_delay(self
):
943 self
.interconnect_stress_test(timeout
=1000,
945 slave_ready_latency
=0,
946 slave_response_latency
=0)
948 def test_interconnect_shared_stress_rand_short(self
):
949 prng
= random
.Random(42)
950 rand
= lambda: prng
.randrange(4)
951 self
.interconnect_stress_test(timeout
=2000,
953 slave_ready_latency
=rand
,
954 slave_response_latency
=rand
)
956 def test_interconnect_shared_stress_rand_long(self
):
957 prng
= random
.Random(42)
958 rand
= lambda: prng
.randrange(16)
959 self
.interconnect_stress_test(timeout
=4000,
961 slave_ready_latency
=rand
,
962 slave_response_latency
=rand
)
964 def test_interconnect_shared_stress_timeout(self
):
965 self
.interconnect_stress_test(timeout
=4000,
966 disconnected_slaves
=[1],
969 def test_crossbar_stress_no_delay(self
):
970 self
.interconnect_stress_test(timeout
=1000,
972 slave_ready_latency
=0,
973 slave_response_latency
=0,
974 interconnect
=AXILiteCrossbar
)
976 def test_crossbar_stress_rand(self
):
977 prng
= random
.Random(42)
978 rand
= lambda: prng
.randrange(4)
979 self
.interconnect_stress_test(timeout
=2000,
981 slave_ready_latency
=rand
,
982 slave_response_latency
=rand
,
983 interconnect
=AXILiteCrossbar
)