soc/interconnect/axi: separate AXI Lite converter channels
[litex.git] / test / test_axi_lite.py
1 # This file is Copyright (c) 2020 Antmicro <www.antmicro.com>
2 # License: BSD
3
4 import unittest
5 import random
6
7 from migen import *
8
9 from litex.soc.interconnect.axi import *
10 from litex.soc.interconnect import wishbone, csr_bus
11
12 # Helpers ------------------------------------------------------------------------------------------
13
14 def _int_or_call(int_or_func):
15 if callable(int_or_func):
16 return int_or_func()
17 return int_or_func
18
19 @passive
20 def timeout_generator(ticks):
21 import os
22 for i in range(ticks):
23 if os.environ.get("TIMEOUT_DEBUG", "") == "1":
24 print("tick {}".format(i))
25 yield
26 raise TimeoutError("Timeout after %d ticks" % ticks)
27
28 class AXILiteChecker:
29 def __init__(self, ready_latency=0, response_latency=0, rdata_generator=None):
30 self.ready_latency = ready_latency
31 self.response_latency = response_latency
32 self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de)
33 self.writes = [] # (addr, data, strb)
34 self.reads = [] # (addr, data)
35
36 def delay(self, latency):
37 for _ in range(_int_or_call(latency)):
38 yield
39
40 def handle_write(self, axi_lite):
41 # aw
42 while not (yield axi_lite.aw.valid):
43 yield
44 yield from self.delay(self.ready_latency)
45 addr = (yield axi_lite.aw.addr)
46 yield axi_lite.aw.ready.eq(1)
47 yield
48 yield axi_lite.aw.ready.eq(0)
49 while not (yield axi_lite.w.valid):
50 yield
51 yield from self.delay(self.ready_latency)
52 # w
53 data = (yield axi_lite.w.data)
54 strb = (yield axi_lite.w.strb)
55 yield axi_lite.w.ready.eq(1)
56 yield
57 yield axi_lite.w.ready.eq(0)
58 yield from self.delay(self.response_latency)
59 # b
60 yield axi_lite.b.valid.eq(1)
61 yield axi_lite.b.resp.eq(RESP_OKAY)
62 yield
63 while not (yield axi_lite.b.ready):
64 yield
65 yield axi_lite.b.valid.eq(0)
66 self.writes.append((addr, data, strb))
67
68 def handle_read(self, axi_lite):
69 # ar
70 while not (yield axi_lite.ar.valid):
71 yield
72 yield from self.delay(self.ready_latency)
73 addr = (yield axi_lite.ar.addr)
74 yield axi_lite.ar.ready.eq(1)
75 yield
76 yield axi_lite.ar.ready.eq(0)
77 yield from self.delay(self.response_latency)
78 # r
79 data = self.rdata_generator(addr)
80 yield axi_lite.r.valid.eq(1)
81 yield axi_lite.r.resp.eq(RESP_OKAY)
82 yield axi_lite.r.data.eq(data)
83 yield
84 while not (yield axi_lite.r.ready):
85 yield
86 yield axi_lite.r.valid.eq(0)
87 yield axi_lite.r.data.eq(0)
88 self.reads.append((addr, data))
89
90 @passive
91 def handler(self, axi_lite):
92 while True:
93 if (yield axi_lite.aw.valid):
94 yield from self.handle_write(axi_lite)
95 if (yield axi_lite.ar.valid):
96 yield from self.handle_read(axi_lite)
97 yield
98
99 @passive
100 def _write_handler(self, axi_lite):
101 while True:
102 yield from self.handle_write(axi_lite)
103 yield
104
105 @passive
106 def _read_handler(self, axi_lite):
107 while True:
108 yield from self.handle_read(axi_lite)
109 yield
110
111 def parallel_handlers(self, axi_lite):
112 return self._write_handler(axi_lite), self._read_handler(axi_lite)
113
114 class AXILitePatternGenerator:
115 def __init__(self, axi_lite, pattern, delay=0):
116 # patter: (rw, addr, data)
117 self.axi_lite = axi_lite
118 self.pattern = pattern
119 self.delay = delay
120 self.errors = 0
121 self.read_errors = []
122 self.resp_errors = {"w": 0, "r": 0}
123
124 def handler(self):
125 for rw, addr, data in self.pattern:
126 assert rw in ["w", "r"]
127 if rw == "w":
128 strb = 2**len(self.axi_lite.w.strb) - 1
129 resp = (yield from self.axi_lite.write(addr, data, strb))
130 else:
131 rdata, resp = (yield from self.axi_lite.read(addr))
132 if rdata != data:
133 self.read_errors.append((rdata, data))
134 self.errors += 1
135 if resp != RESP_OKAY:
136 self.resp_errors[rw] += 1
137 self.errors += 1
138 for _ in range(_int_or_call(self.delay)):
139 yield
140 for _ in range(16):
141 yield
142
143 # TestAXILite --------------------------------------------------------------------------------------
144
145 class TestAXILite(unittest.TestCase):
146 def test_wishbone2axi2wishbone(self):
147 class DUT(Module):
148 def __init__(self):
149 self.wishbone = wishbone.Interface(data_width=32)
150
151 # # #
152
153 axi = AXILiteInterface(data_width=32, address_width=32)
154 wb = wishbone.Interface(data_width=32)
155
156 wishbone2axi = Wishbone2AXILite(self.wishbone, axi)
157 axi2wishbone = AXILite2Wishbone(axi, wb)
158 self.submodules += wishbone2axi, axi2wishbone
159
160 sram = wishbone.SRAM(1024, init=[0x12345678, 0xa55aa55a])
161 self.submodules += sram
162 self.comb += wb.connect(sram.bus)
163
164 def generator(dut):
165 dut.errors = 0
166 if (yield from dut.wishbone.read(0)) != 0x12345678:
167 dut.errors += 1
168 if (yield from dut.wishbone.read(1)) != 0xa55aa55a:
169 dut.errors += 1
170 for i in range(32):
171 yield from dut.wishbone.write(i, i)
172 for i in range(32):
173 if (yield from dut.wishbone.read(i)) != i:
174 dut.errors += 1
175
176 dut = DUT()
177 run_simulation(dut, [generator(dut)])
178 self.assertEqual(dut.errors, 0)
179
180 def test_axilite2csr(self):
181 @passive
182 def csr_mem_handler(csr, mem):
183 while True:
184 adr = (yield csr.adr)
185 yield csr.dat_r.eq(mem[adr])
186 if (yield csr.we):
187 mem[adr] = (yield csr.dat_w)
188 yield
189
190 class DUT(Module):
191 def __init__(self):
192 self.axi_lite = AXILiteInterface()
193 self.csr = csr_bus.Interface()
194 self.submodules.axilite2csr = AXILite2CSR(self.axi_lite, self.csr)
195 self.errors = 0
196
197 prng = random.Random(42)
198 mem_ref = [prng.randrange(255) for i in range(100)]
199
200 def generator(dut):
201 dut.errors = 0
202
203 for adr, ref in enumerate(mem_ref):
204 adr = adr << 2
205 data, resp = (yield from dut.axi_lite.read(adr))
206 self.assertEqual(resp, 0b00)
207 if data != ref:
208 dut.errors += 1
209
210 write_data = [prng.randrange(255) for _ in mem_ref]
211
212 for adr, wdata in enumerate(write_data):
213 adr = adr << 2
214 resp = (yield from dut.axi_lite.write(adr, wdata))
215 self.assertEqual(resp, 0b00)
216 rdata, resp = (yield from dut.axi_lite.read(adr))
217 self.assertEqual(resp, 0b00)
218 if rdata != wdata:
219 dut.errors += 1
220
221 dut = DUT()
222 mem = [v for v in mem_ref]
223 run_simulation(dut, [generator(dut), csr_mem_handler(dut.csr, mem)])
224 self.assertEqual(dut.errors, 0)
225
226 def test_axilite_sram(self):
227 class DUT(Module):
228 def __init__(self, size, init):
229 self.axi_lite = AXILiteInterface()
230 self.submodules.sram = AXILiteSRAM(size, init=init, bus=self.axi_lite)
231 self.errors = 0
232
233 def generator(dut, ref_init):
234 for adr, ref in enumerate(ref_init):
235 adr = adr << 2
236 data, resp = (yield from dut.axi_lite.read(adr))
237 self.assertEqual(resp, 0b00)
238 if data != ref:
239 dut.errors += 1
240
241 write_data = [prng.randrange(255) for _ in ref_init]
242
243 for adr, wdata in enumerate(write_data):
244 adr = adr << 2
245 resp = (yield from dut.axi_lite.write(adr, wdata))
246 self.assertEqual(resp, 0b00)
247 rdata, resp = (yield from dut.axi_lite.read(adr))
248 self.assertEqual(resp, 0b00)
249 if rdata != wdata:
250 dut.errors += 1
251
252 prng = random.Random(42)
253 init = [prng.randrange(2**32) for i in range(100)]
254
255 dut = DUT(size=len(init)*4, init=[v for v in init])
256 run_simulation(dut, [generator(dut, init)])
257 self.assertEqual(dut.errors, 0)
258
259 def converter_test(self, width_from, width_to, parallel_rw=False,
260 write_pattern=None, write_expected=None,
261 read_pattern=None, read_expected=None):
262 assert not (write_pattern is None and read_pattern is None)
263
264 if write_pattern is None:
265 write_pattern = []
266 write_expected = []
267 elif len(write_pattern[0]) == 2:
268 # add w.strb
269 write_pattern = [(adr, data, 2**(width_from//8)-1) for adr, data in write_pattern]
270
271 if read_pattern is None:
272 read_pattern = []
273 read_expected = []
274
275 class DUT(Module):
276 def __init__(self, width_from, width_to):
277 self.master = AXILiteInterface(data_width=width_from)
278 self.slave = AXILiteInterface(data_width=width_to)
279 self.submodules.converter = AXILiteConverter(self.master, self.slave)
280
281 prng = random.Random(42)
282
283 def write_generator(axi_lite):
284 for addr, data, strb in write_pattern or []:
285 resp = (yield from axi_lite.write(addr, data, strb))
286 self.assertEqual(resp, RESP_OKAY)
287 for _ in range(prng.randrange(3)):
288 yield
289 for _ in range(16):
290 yield
291
292 def read_generator(axi_lite):
293 for addr, refdata in read_pattern or []:
294 data, resp = (yield from axi_lite.read(addr))
295 self.assertEqual(resp, RESP_OKAY)
296 self.assertEqual(data, refdata)
297 for _ in range(prng.randrange(3)):
298 yield
299 for _ in range(4):
300 yield
301
302 def sequential_generator(axi_lite):
303 yield from write_generator(axi_lite)
304 yield from read_generator(axi_lite)
305
306 def rdata_generator(adr):
307 for a, v in read_expected:
308 if a == adr:
309 return v
310 return 0xbaadc0de
311
312 _latency = 0
313 def latency():
314 nonlocal _latency
315 _latency = (_latency + 1) % 3
316 return _latency
317
318 dut = DUT(width_from=width_from, width_to=width_to)
319 checker = AXILiteChecker(ready_latency=latency, rdata_generator=rdata_generator)
320 if parallel_rw:
321 generators = [write_generator(dut.master), read_generator(dut.master)]
322 else:
323 generators = [sequential_generator(dut.master)]
324 generators += checker.parallel_handlers(dut.slave)
325 run_simulation(dut, generators)
326 self.assertEqual(checker.writes, write_expected)
327 self.assertEqual(checker.reads, read_expected)
328
329 def test_axilite_down_converter_32to16(self):
330 write_pattern = [
331 (0x00000000, 0x22221111),
332 (0x00000004, 0x44443333),
333 (0x00000008, 0x66665555),
334 (0x00000100, 0x88887777),
335 ]
336 write_expected = [
337 (0x00000000, 0x1111, 0b11),
338 (0x00000002, 0x2222, 0b11),
339 (0x00000004, 0x3333, 0b11),
340 (0x00000006, 0x4444, 0b11),
341 (0x00000008, 0x5555, 0b11),
342 (0x0000000a, 0x6666, 0b11),
343 (0x00000100, 0x7777, 0b11),
344 (0x00000102, 0x8888, 0b11),
345 ]
346 read_pattern = write_pattern
347 read_expected = [(adr, data) for (adr, data, _) in write_expected]
348 for parallel in [False, True]:
349 with self.subTest(parallel=parallel):
350 self.converter_test(width_from=32, width_to=16, parallel_rw=parallel,
351 write_pattern=write_pattern, write_expected=write_expected,
352 read_pattern=read_pattern, read_expected=read_expected)
353
354 def test_axilite_down_converter_32to8(self):
355 write_pattern = [
356 (0x00000000, 0x44332211),
357 (0x00000004, 0x88776655),
358 ]
359 write_expected = [
360 (0x00000000, 0x11, 0b1),
361 (0x00000001, 0x22, 0b1),
362 (0x00000002, 0x33, 0b1),
363 (0x00000003, 0x44, 0b1),
364 (0x00000004, 0x55, 0b1),
365 (0x00000005, 0x66, 0b1),
366 (0x00000006, 0x77, 0b1),
367 (0x00000007, 0x88, 0b1),
368 ]
369 read_pattern = write_pattern
370 read_expected = [(adr, data) for (adr, data, _) in write_expected]
371 for parallel in [False, True]:
372 with self.subTest(parallel=parallel):
373 self.converter_test(width_from=32, width_to=8, parallel_rw=parallel,
374 write_pattern=write_pattern, write_expected=write_expected,
375 read_pattern=read_pattern, read_expected=read_expected)
376
377 def test_axilite_down_converter_64to32(self):
378 write_pattern = [
379 (0x00000000, 0x2222222211111111),
380 (0x00000008, 0x4444444433333333),
381 ]
382 write_expected = [
383 (0x00000000, 0x11111111, 0b1111),
384 (0x00000004, 0x22222222, 0b1111),
385 (0x00000008, 0x33333333, 0b1111),
386 (0x0000000c, 0x44444444, 0b1111),
387 ]
388 read_pattern = write_pattern
389 read_expected = [(adr, data) for (adr, data, _) in write_expected]
390 for parallel in [False, True]:
391 with self.subTest(parallel=parallel):
392 self.converter_test(width_from=64, width_to=32, parallel_rw=parallel,
393 write_pattern=write_pattern, write_expected=write_expected,
394 read_pattern=read_pattern, read_expected=read_expected)
395
396 def test_axilite_down_converter_strb(self):
397 write_pattern = [
398 (0x00000000, 0x22221111, 0b1100),
399 (0x00000004, 0x44443333, 0b1111),
400 (0x00000008, 0x66665555, 0b1011),
401 (0x00000100, 0x88887777, 0b0011),
402 ]
403 write_expected = [
404 (0x00000002, 0x2222, 0b11),
405 (0x00000004, 0x3333, 0b11),
406 (0x00000006, 0x4444, 0b11),
407 (0x00000008, 0x5555, 0b11),
408 (0x0000000a, 0x6666, 0b10),
409 (0x00000100, 0x7777, 0b11),
410 ]
411 self.converter_test(width_from=32, width_to=16,
412 write_pattern=write_pattern, write_expected=write_expected)
413
414 # TestAXILiteInterconnet ---------------------------------------------------------------------------
415
416 class TestAXILiteInterconnect(unittest.TestCase):
417 def test_interconnect_p2p(self):
418 class DUT(Module):
419 def __init__(self):
420 self.master = master = AXILiteInterface()
421 self.slave = slave = AXILiteInterface()
422 self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave)
423
424 pattern = [
425 ("w", 0x00000004, 0x11111111),
426 ("w", 0x0000000c, 0x22222222),
427 ("r", 0x00000010, 0x33333333),
428 ("r", 0x00000018, 0x44444444),
429 ]
430
431 def rdata_generator(adr):
432 for rw, a, v in pattern:
433 if rw == "r" and a == adr:
434 return v
435 return 0xbaadc0de
436
437 dut = DUT()
438 checker = AXILiteChecker(rdata_generator=rdata_generator)
439 generators = [
440 AXILitePatternGenerator(dut.master, pattern).handler(),
441 checker.handler(dut.slave),
442 ]
443 run_simulation(dut, generators)
444 self.assertEqual(checker.writes, [(addr, data, 0b1111) for rw, addr, data in pattern if rw == "w"])
445 self.assertEqual(checker.reads, [(addr, data) for rw, addr, data in pattern if rw == "r"])
446
447 def test_timeout(self):
448 class DUT(Module):
449 def __init__(self):
450 self.master = master = AXILiteInterface()
451 self.slave = slave = AXILiteInterface()
452 self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave)
453 self.submodules.timeout = AXILiteTimeout(master, 16)
454
455 def generator(axi_lite):
456 resp = (yield from axi_lite.write(0x00001000, 0x11111111))
457 self.assertEqual(resp, RESP_OKAY)
458 resp = (yield from axi_lite.write(0x00002000, 0x22222222))
459 self.assertEqual(resp, RESP_SLVERR)
460 data, resp = (yield from axi_lite.read(0x00003000))
461 self.assertEqual(resp, RESP_SLVERR)
462 self.assertEqual(data, 0xffffffff)
463 yield
464
465 def checker(axi_lite):
466 for _ in range(16):
467 yield
468 yield axi_lite.aw.ready.eq(1)
469 yield axi_lite.w.ready.eq(1)
470 yield
471 yield axi_lite.aw.ready.eq(0)
472 yield axi_lite.w.ready.eq(0)
473 yield axi_lite.b.valid.eq(1)
474 yield
475 while not (yield axi_lite.b.ready):
476 yield
477 yield axi_lite.b.valid.eq(0)
478
479 dut = DUT()
480 generators = [
481 generator(dut.master),
482 checker(dut.slave),
483 timeout_generator(300),
484 ]
485 run_simulation(dut, generators)
486
487 def test_arbiter_order(self):
488 class DUT(Module):
489 def __init__(self, n_masters):
490 self.masters = [AXILiteInterface() for _ in range(n_masters)]
491 self.slave = AXILiteInterface()
492 self.submodules.arbiter = AXILiteArbiter(self.masters, self.slave)
493
494 def generator(n, axi_lite, delay=0):
495 def gen(i):
496 return 100*n + i
497
498 for i in range(4):
499 resp = (yield from axi_lite.write(gen(i), gen(i)))
500 self.assertEqual(resp, RESP_OKAY)
501 for _ in range(delay):
502 yield
503 for i in range(4):
504 data, resp = (yield from axi_lite.read(gen(i)))
505 self.assertEqual(resp, RESP_OKAY)
506 for _ in range(delay):
507 yield
508 for _ in range(8):
509 yield
510
511 n_masters = 3
512
513 # with no delay each master will do all transfers at once
514 with self.subTest(delay=0):
515 dut = DUT(n_masters)
516 checker = AXILiteChecker()
517 generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
518 generators += [timeout_generator(300), checker.handler(dut.slave)]
519 run_simulation(dut, generators)
520 order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
521 self.assertEqual([addr for addr, data, strb in checker.writes], order)
522 self.assertEqual([addr for addr, data in checker.reads], order)
523
524 # with some delay, the round-robin arbiter will iterate over masters
525 with self.subTest(delay=1):
526 dut = DUT(n_masters)
527 checker = AXILiteChecker()
528 generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
529 generators += [timeout_generator(300), checker.handler(dut.slave)]
530 run_simulation(dut, generators)
531 order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
532 self.assertEqual([addr for addr, data, strb in checker.writes], order)
533 self.assertEqual([addr for addr, data in checker.reads], order)
534
535 def test_arbiter_holds_grant_until_response(self):
536 class DUT(Module):
537 def __init__(self, n_masters):
538 self.masters = [AXILiteInterface() for _ in range(n_masters)]
539 self.slave = AXILiteInterface()
540 self.submodules.arbiter = AXILiteArbiter(self.masters, self.slave)
541
542 def generator(n, axi_lite, delay=0):
543 def gen(i):
544 return 100*n + i
545
546 for i in range(4):
547 resp = (yield from axi_lite.write(gen(i), gen(i)))
548 self.assertEqual(resp, RESP_OKAY)
549 for _ in range(delay):
550 yield
551 for i in range(4):
552 data, resp = (yield from axi_lite.read(gen(i)))
553 self.assertEqual(resp, RESP_OKAY)
554 for _ in range(delay):
555 yield
556 for _ in range(8):
557 yield
558
559 n_masters = 3
560
561 # with no delay each master will do all transfers at once
562 with self.subTest(delay=0):
563 dut = DUT(n_masters)
564 checker = AXILiteChecker(response_latency=lambda: 3)
565 generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
566 generators += [timeout_generator(300), checker.handler(dut.slave)]
567 run_simulation(dut, generators)
568 order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
569 self.assertEqual([addr for addr, data, strb in checker.writes], order)
570 self.assertEqual([addr for addr, data in checker.reads], order)
571
572 # with some delay, the round-robin arbiter will iterate over masters
573 with self.subTest(delay=1):
574 dut = DUT(n_masters)
575 checker = AXILiteChecker(response_latency=lambda: 3)
576 generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
577 generators += [timeout_generator(300), checker.handler(dut.slave)]
578 run_simulation(dut, generators)
579 order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
580 self.assertEqual([addr for addr, data, strb in checker.writes], order)
581 self.assertEqual([addr for addr, data in checker.reads], order)
582
583 def address_decoder(self, i, size=0x100, python=False):
584 # bytes to 32-bit words aligned
585 _size = (size) >> 2
586 _origin = (size * i) >> 2
587 if python: # for python integers
588 shift = log2_int(_size)
589 return lambda a: ((a >> shift) == (_origin >> shift))
590 # for migen signals
591 return lambda a: (a[log2_int(_size):] == (_origin >> log2_int(_size)))
592
593 def decoder_test(self, n_slaves, pattern, generator_delay=0):
594 class DUT(Module):
595 def __init__(self, decoders):
596 self.master = AXILiteInterface()
597 self.slaves = [AXILiteInterface() for _ in range(len(decoders))]
598 slaves = list(zip(decoders, self.slaves))
599 self.submodules.decoder = AXILiteDecoder(self.master, slaves)
600
601 def rdata_generator(adr):
602 for rw, a, v in pattern:
603 if rw == "r" and a == adr:
604 return v
605 return 0xbaadc0de
606
607 dut = DUT([self.address_decoder(i) for i in range(n_slaves)])
608 checkers = [AXILiteChecker(rdata_generator=rdata_generator) for _ in dut.slaves]
609
610 generators = [AXILitePatternGenerator(dut.master, pattern, delay=generator_delay).handler()]
611 generators += [checker.handler(slave) for (slave, checker) in zip(dut.slaves, checkers)]
612 generators += [timeout_generator(300)]
613 run_simulation(dut, generators)
614
615 return checkers
616
617 def test_decoder_write(self):
618 for delay in [0, 1, 0]:
619 with self.subTest(delay=delay):
620 slaves = self.decoder_test(n_slaves=3, pattern=[
621 ("w", 0x010, 1),
622 ("w", 0x110, 2),
623 ("w", 0x210, 3),
624 ("w", 0x011, 1),
625 ("w", 0x012, 1),
626 ("w", 0x111, 2),
627 ("w", 0x112, 2),
628 ("w", 0x211, 3),
629 ("w", 0x212, 3),
630 ], generator_delay=delay)
631
632 def addr(checker_list):
633 return [entry[0] for entry in checker_list]
634
635 self.assertEqual(addr(slaves[0].writes), [0x010, 0x011, 0x012])
636 self.assertEqual(addr(slaves[1].writes), [0x110, 0x111, 0x112])
637 self.assertEqual(addr(slaves[2].writes), [0x210, 0x211, 0x212])
638 for slave in slaves:
639 self.assertEqual(slave.reads, [])
640
641 def test_decoder_read(self):
642 for delay in [0, 1]:
643 with self.subTest(delay=delay):
644 slaves = self.decoder_test(n_slaves=3, pattern=[
645 ("r", 0x010, 1),
646 ("r", 0x110, 2),
647 ("r", 0x210, 3),
648 ("r", 0x011, 1),
649 ("r", 0x012, 1),
650 ("r", 0x111, 2),
651 ("r", 0x112, 2),
652 ("r", 0x211, 3),
653 ("r", 0x212, 3),
654 ], generator_delay=delay)
655
656 def addr(checker_list):
657 return [entry[0] for entry in checker_list]
658
659 self.assertEqual(addr(slaves[0].reads), [0x010, 0x011, 0x012])
660 self.assertEqual(addr(slaves[1].reads), [0x110, 0x111, 0x112])
661 self.assertEqual(addr(slaves[2].reads), [0x210, 0x211, 0x212])
662 for slave in slaves:
663 self.assertEqual(slave.writes, [])
664
665 def test_decoder_read_write(self):
666 for delay in [0, 1]:
667 with self.subTest(delay=delay):
668 slaves = self.decoder_test(n_slaves=3, pattern=[
669 ("w", 0x010, 1),
670 ("w", 0x110, 2),
671 ("r", 0x111, 2),
672 ("r", 0x011, 1),
673 ("r", 0x211, 3),
674 ("w", 0x210, 3),
675 ], generator_delay=delay)
676
677 def addr(checker_list):
678 return [entry[0] for entry in checker_list]
679
680 self.assertEqual(addr(slaves[0].writes), [0x010])
681 self.assertEqual(addr(slaves[0].reads), [0x011])
682 self.assertEqual(addr(slaves[1].writes), [0x110])
683 self.assertEqual(addr(slaves[1].reads), [0x111])
684 self.assertEqual(addr(slaves[2].writes), [0x210])
685 self.assertEqual(addr(slaves[2].reads), [0x211])
686
687 def test_decoder_stall(self):
688 with self.assertRaises(TimeoutError):
689 self.decoder_test(n_slaves=3, pattern=[
690 ("w", 0x300, 1),
691 ])
692 with self.assertRaises(TimeoutError):
693 self.decoder_test(n_slaves=3, pattern=[
694 ("r", 0x300, 1),
695 ])
696
697 def interconnect_test(self, master_patterns, slave_decoders,
698 master_delay=0, slave_ready_latency=0, slave_response_latency=0,
699 disconnected_slaves=None, timeout=300, interconnect=AXILiteInterconnectShared,
700 **kwargs):
701 # number of masters/slaves is defined by the number of patterns/decoders
702 # master_patterns: list of patterns per master, pattern = list(tuple(rw, addr, data))
703 # slave_decoders: list of address decoders per slave
704 # delay/latency: control the speed of masters/slaves
705 # disconnected_slaves: list of slave numbers that shouldn't respond to any transactions
706 class DUT(Module):
707 def __init__(self, n_masters, decoders, **kwargs):
708 self.masters = [AXILiteInterface(name="master") for _ in range(n_masters)]
709 self.slaves = [AXILiteInterface(name="slave") for _ in range(len(decoders))]
710 slaves = list(zip(decoders, self.slaves))
711 self.submodules.interconnect = interconnect(self.masters, slaves, **kwargs)
712
713 class ReadDataGenerator:
714 # Generates data based on decoded addresses and data defined in master_patterns
715 def __init__(self, patterns):
716 self.mem = {}
717 for pattern in patterns:
718 for rw, addr, val in pattern:
719 if rw == "r":
720 assert addr not in self.mem
721 self.mem[addr] = val
722
723 def getter(self, n):
724 # on miss will give default data depending on slave n
725 return lambda addr: self.mem.get(addr, 0xbaad0000 + n)
726
727 def new_checker(rdata_generator):
728 return AXILiteChecker(ready_latency=slave_ready_latency,
729 response_latency=slave_response_latency,
730 rdata_generator=rdata_generator)
731
732 # perpare test
733 dut = DUT(len(master_patterns), slave_decoders, **kwargs)
734 rdata_generator = ReadDataGenerator(master_patterns)
735 checkers = [new_checker(rdata_generator.getter(i)) for i, _ in enumerate(master_patterns)]
736 pattern_generators = [AXILitePatternGenerator(dut.masters[i], pattern, delay=master_delay)
737 for i, pattern in enumerate(master_patterns)]
738
739 # run simulator
740 generators = [gen.handler() for gen in pattern_generators]
741 generators += [checker.handler(slave)
742 for i, (slave, checker) in enumerate(zip(dut.slaves, checkers))
743 if i not in (disconnected_slaves or [])]
744 generators += [timeout_generator(timeout)]
745 run_simulation(dut, generators)
746
747 return pattern_generators, checkers
748
749 def test_interconnect_shared_basic(self):
750 master_patterns = [
751 [("w", 0x000, 0), ("w", 0x101, 0), ("w", 0x202, 0)],
752 [("w", 0x010, 0), ("w", 0x111, 0), ("w", 0x112, 0)],
753 [("w", 0x220, 0), ("w", 0x221, 0), ("w", 0x222, 0)],
754 ]
755 slave_decoders = [self.address_decoder(i) for i in range(3)]
756
757 generators, checkers = self.interconnect_test(master_patterns, slave_decoders,
758 master_delay=1)
759
760 for gen in generators:
761 self.assertEqual(gen.errors, 0)
762
763 def addr(checker_list):
764 return [entry[0] for entry in checker_list]
765
766 self.assertEqual(addr(checkers[0].writes), [0x000, 0x010])
767 self.assertEqual(addr(checkers[1].writes), [0x101, 0x111, 0x112])
768 self.assertEqual(addr(checkers[2].writes), [0x220, 0x221, 0x202, 0x222])
769 self.assertEqual(addr(checkers[0].reads), [])
770 self.assertEqual(addr(checkers[1].reads), [])
771 self.assertEqual(addr(checkers[2].reads), [])
772
773 def interconnect_stress_test(self, timeout=1000, **kwargs):
774 prng = random.Random(42)
775
776 n_masters = 3
777 n_slaves = 3
778 pattern_length = 64
779 slave_region_size = 0x10000000
780 # for testing purpose each master will access only its own region of a slave
781 master_region_size = 0x1000
782 assert n_masters*master_region_size < slave_region_size
783
784 def gen_pattern(n, length):
785 assert length < master_region_size
786 for i_access in range(length):
787 rw = "w" if prng.randint(0, 1) == 0 else "r"
788 i_slave = prng.randrange(n_slaves)
789 addr = i_slave*slave_region_size + n*master_region_size + i_access
790 data = addr
791 yield rw, addr, data
792
793 master_patterns = [list(gen_pattern(i, pattern_length)) for i in range(n_masters)]
794 slave_decoders = [self.address_decoder(i, size=slave_region_size) for i in range(n_slaves)]
795 slave_decoders_py = [self.address_decoder(i, size=slave_region_size, python=True)
796 for i in range(n_slaves)]
797
798 generators, checkers = self.interconnect_test(master_patterns, slave_decoders,
799 timeout=timeout, **kwargs)
800
801 for gen in generators:
802 read_errors = [" 0x{:08x} vs 0x{:08x}".format(v, ref) for v, ref in gen.read_errors]
803 msg = "\ngen.resp_errors = {}\ngen.read_errors = \n{}".format(
804 gen.resp_errors, "\n".join(read_errors))
805 if not kwargs.get("disconnected_slaves", None):
806 self.assertEqual(gen.errors, 0, msg=msg)
807 else: # when some slaves are disconnected we should have some errors
808 self.assertNotEqual(gen.errors, 0, msg=msg)
809
810 # make sure all the accesses at slave side are in correct address region
811 for i_slave, (checker, decoder) in enumerate(zip(checkers, slave_decoders_py)):
812 for addr in (entry[0] for entry in checker.writes + checker.reads):
813 # compensate for the fact that decoders work on word-aligned addresses
814 self.assertNotEqual(decoder(addr >> 2), 0)
815
816 def test_interconnect_shared_stress_no_delay(self):
817 self.interconnect_stress_test(timeout=1000,
818 master_delay=0,
819 slave_ready_latency=0,
820 slave_response_latency=0)
821
822 def test_interconnect_shared_stress_rand_short(self):
823 prng = random.Random(42)
824 rand = lambda: prng.randrange(4)
825 self.interconnect_stress_test(timeout=2000,
826 master_delay=rand,
827 slave_ready_latency=rand,
828 slave_response_latency=rand)
829
830 def test_interconnect_shared_stress_rand_long(self):
831 prng = random.Random(42)
832 rand = lambda: prng.randrange(16)
833 self.interconnect_stress_test(timeout=4000,
834 master_delay=rand,
835 slave_ready_latency=rand,
836 slave_response_latency=rand)
837
838 def test_interconnect_shared_stress_timeout(self):
839 self.interconnect_stress_test(timeout=4000,
840 disconnected_slaves=[1],
841 timeout_cycles=50)
842
843 def test_crossbar_stress_no_delay(self):
844 self.interconnect_stress_test(timeout=1000,
845 master_delay=0,
846 slave_ready_latency=0,
847 slave_response_latency=0,
848 interconnect=AXILiteCrossbar)
849
850 def test_crossbar_stress_rand(self):
851 prng = random.Random(42)
852 rand = lambda: prng.randrange(4)
853 self.interconnect_stress_test(timeout=2000,
854 master_delay=rand,
855 slave_ready_latency=rand,
856 slave_response_latency=rand,
857 interconnect=AXILiteCrossbar)