soc/integration/csr_bridge: use registered version only when SDRAM is present.
[litex.git] / test / test_axi_lite.py
1 # This file is Copyright (c) 2020 Antmicro <www.antmicro.com>
2 # License: BSD
3
4 import unittest
5 import random
6
7 from migen import *
8
9 from litex.soc.interconnect.axi import *
10 from litex.soc.interconnect import wishbone, csr_bus
11
12 # Helpers ------------------------------------------------------------------------------------------
13
14 def _int_or_call(int_or_func):
15 if callable(int_or_func):
16 return int_or_func()
17 return int_or_func
18
19 @passive
20 def timeout_generator(ticks):
21 import os
22 for i in range(ticks):
23 if os.environ.get("TIMEOUT_DEBUG", "") == "1":
24 print("tick {}".format(i))
25 yield
26 raise TimeoutError("Timeout after %d ticks" % ticks)
27
28 class AXILiteChecker:
29 def __init__(self, ready_latency=0, response_latency=0, rdata_generator=None):
30 self.ready_latency = ready_latency
31 self.response_latency = response_latency
32 self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de)
33 self.writes = [] # (addr, data, strb)
34 self.reads = [] # (addr, data)
35
36 def delay(self, latency):
37 for _ in range(_int_or_call(latency)):
38 yield
39
40 def handle_write(self, axi_lite):
41 # aw
42 while not (yield axi_lite.aw.valid):
43 yield
44 yield from self.delay(self.ready_latency)
45 addr = (yield axi_lite.aw.addr)
46 yield axi_lite.aw.ready.eq(1)
47 yield
48 yield axi_lite.aw.ready.eq(0)
49 while not (yield axi_lite.w.valid):
50 yield
51 yield from self.delay(self.ready_latency)
52 # w
53 data = (yield axi_lite.w.data)
54 strb = (yield axi_lite.w.strb)
55 yield axi_lite.w.ready.eq(1)
56 yield
57 yield axi_lite.w.ready.eq(0)
58 yield from self.delay(self.response_latency)
59 # b
60 yield axi_lite.b.valid.eq(1)
61 yield axi_lite.b.resp.eq(RESP_OKAY)
62 yield
63 while not (yield axi_lite.b.ready):
64 yield
65 yield axi_lite.b.valid.eq(0)
66 self.writes.append((addr, data, strb))
67
68 def handle_read(self, axi_lite):
69 # ar
70 while not (yield axi_lite.ar.valid):
71 yield
72 yield from self.delay(self.ready_latency)
73 addr = (yield axi_lite.ar.addr)
74 yield axi_lite.ar.ready.eq(1)
75 yield
76 yield axi_lite.ar.ready.eq(0)
77 yield from self.delay(self.response_latency)
78 # r
79 data = self.rdata_generator(addr)
80 yield axi_lite.r.valid.eq(1)
81 yield axi_lite.r.resp.eq(RESP_OKAY)
82 yield axi_lite.r.data.eq(data)
83 yield
84 while not (yield axi_lite.r.ready):
85 yield
86 yield axi_lite.r.valid.eq(0)
87 yield axi_lite.r.data.eq(0)
88 self.reads.append((addr, data))
89
90 @passive
91 def handler(self, axi_lite):
92 while True:
93 if (yield axi_lite.aw.valid):
94 yield from self.handle_write(axi_lite)
95 if (yield axi_lite.ar.valid):
96 yield from self.handle_read(axi_lite)
97 yield
98
99 @passive
100 def _write_handler(self, axi_lite):
101 while True:
102 yield from self.handle_write(axi_lite)
103 yield
104
105 @passive
106 def _read_handler(self, axi_lite):
107 while True:
108 yield from self.handle_read(axi_lite)
109 yield
110
111 def parallel_handlers(self, axi_lite):
112 return self._write_handler(axi_lite), self._read_handler(axi_lite)
113
114 class AXILitePatternGenerator:
115 def __init__(self, axi_lite, pattern, delay=0):
116 # patter: (rw, addr, data)
117 self.axi_lite = axi_lite
118 self.pattern = pattern
119 self.delay = delay
120 self.errors = 0
121 self.read_errors = []
122 self.resp_errors = {"w": 0, "r": 0}
123
124 def handler(self):
125 for rw, addr, data in self.pattern:
126 assert rw in ["w", "r"]
127 if rw == "w":
128 strb = 2**len(self.axi_lite.w.strb) - 1
129 resp = (yield from self.axi_lite.write(addr, data, strb))
130 else:
131 rdata, resp = (yield from self.axi_lite.read(addr))
132 if rdata != data:
133 self.read_errors.append((rdata, data))
134 self.errors += 1
135 if resp != RESP_OKAY:
136 self.resp_errors[rw] += 1
137 self.errors += 1
138 for _ in range(_int_or_call(self.delay)):
139 yield
140 for _ in range(16):
141 yield
142
143 # TestAXILite --------------------------------------------------------------------------------------
144
145 class TestAXILite(unittest.TestCase):
146 def test_wishbone2axi2wishbone(self):
147 class DUT(Module):
148 def __init__(self):
149 self.wishbone = wishbone.Interface(data_width=32, adr_width=30)
150
151 # # #
152
153 axi = AXILiteInterface(data_width=32, address_width=32)
154 wb = wishbone.Interface(data_width=32, adr_width=30)
155
156 wishbone2axi = Wishbone2AXILite(self.wishbone, axi)
157 axi2wishbone = AXILite2Wishbone(axi, wb)
158 self.submodules += wishbone2axi, axi2wishbone
159
160 sram = wishbone.SRAM(1024, init=[0x12345678, 0xa55aa55a])
161 self.submodules += sram
162 self.comb += wb.connect(sram.bus)
163
164 def generator(dut):
165 dut.errors = 0
166 if (yield from dut.wishbone.read(0)) != 0x12345678:
167 dut.errors += 1
168 if (yield from dut.wishbone.read(1)) != 0xa55aa55a:
169 dut.errors += 1
170 for i in range(32):
171 yield from dut.wishbone.write(i, i)
172 for i in range(32):
173 if (yield from dut.wishbone.read(i)) != i:
174 dut.errors += 1
175
176 dut = DUT()
177 run_simulation(dut, [generator(dut)])
178 self.assertEqual(dut.errors, 0)
179
180 def test_axilite2axi2mem(self):
181 class DUT(Module):
182 def __init__(self, mem_bus="wishbone"):
183 self.axi_lite = AXILiteInterface()
184
185 axi = AXIInterface()
186 self.submodules.axil2axi = AXILite2AXI(self.axi_lite, axi)
187
188 interface_cls, converter_cls, sram_cls = {
189 "wishbone": (wishbone.Interface, AXI2Wishbone, wishbone.SRAM),
190 "axi_lite": (AXILiteInterface, AXI2AXILite, AXILiteSRAM),
191 }[mem_bus]
192
193 bus_kwargs = {"adr_width" : 30} if mem_bus == "wishbone" else {}
194 bus = interface_cls(**bus_kwargs)
195 self.submodules += converter_cls(axi, bus)
196 sram = sram_cls(1024, init=[0x12345678, 0xa55aa55a])
197 self.submodules += sram
198 self.comb += bus.connect(sram.bus)
199
200 def generator(axi_lite, datas, resps):
201 data, resp = (yield from axi_lite.read(0x00))
202 resps.append((resp, RESP_OKAY))
203 datas.append((data, 0x12345678))
204 data, resp = (yield from axi_lite.read(0x04))
205 resps.append((resp, RESP_OKAY))
206 datas.append((data, 0xa55aa55a))
207 for i in range(32):
208 resp = (yield from axi_lite.write(4*i, i))
209 resps.append((resp, RESP_OKAY))
210 for i in range(32):
211 data, resp = (yield from axi_lite.read(4*i))
212 resps.append((resp, RESP_OKAY))
213 datas.append((data, i))
214
215 for mem_bus in ["wishbone", "axi_lite"]:
216 with self.subTest(mem_bus=mem_bus):
217 # to have more verbose error messages store errors in list((actual, expected))
218 datas = []
219 resps = []
220
221 def actual_expected(results): # split into (list(actual), list(expected))
222 return list(zip(*results))
223
224 dut = DUT(mem_bus)
225 run_simulation(dut, [generator(dut.axi_lite, datas, resps)])
226 self.assertEqual(*actual_expected(resps))
227 msg = "\n".join("0x{:08x} vs 0x{:08x}".format(actual, expected) for actual, expected in datas)
228 self.assertEqual(*actual_expected(datas), msg="actual vs expected:\n" + msg)
229
230 def test_axilite2csr(self):
231 @passive
232 def csr_mem_handler(csr, mem):
233 while True:
234 adr = (yield csr.adr)
235 yield csr.dat_r.eq(mem[adr])
236 if (yield csr.we):
237 mem[adr] = (yield csr.dat_w)
238 yield
239
240 class DUT(Module):
241 def __init__(self):
242 self.axi_lite = AXILiteInterface()
243 self.csr = csr_bus.Interface()
244 self.submodules.axilite2csr = AXILite2CSR(self.axi_lite, self.csr)
245 self.errors = 0
246
247 prng = random.Random(42)
248 mem_ref = [prng.randrange(255) for i in range(100)]
249
250 def generator(dut):
251 dut.errors = 0
252
253 for adr, ref in enumerate(mem_ref):
254 adr = adr << 2
255 data, resp = (yield from dut.axi_lite.read(adr))
256 self.assertEqual(resp, 0b00)
257 if data != ref:
258 dut.errors += 1
259
260 write_data = [prng.randrange(255) for _ in mem_ref]
261
262 for adr, wdata in enumerate(write_data):
263 adr = adr << 2
264 resp = (yield from dut.axi_lite.write(adr, wdata))
265 self.assertEqual(resp, 0b00)
266 rdata, resp = (yield from dut.axi_lite.read(adr))
267 self.assertEqual(resp, 0b00)
268 if rdata != wdata:
269 dut.errors += 1
270
271 dut = DUT()
272 mem = [v for v in mem_ref]
273 run_simulation(dut, [generator(dut), csr_mem_handler(dut.csr, mem)])
274 self.assertEqual(dut.errors, 0)
275
276 def test_axilite_sram(self):
277 class DUT(Module):
278 def __init__(self, size, init):
279 self.axi_lite = AXILiteInterface()
280 self.submodules.sram = AXILiteSRAM(size, init=init, bus=self.axi_lite)
281 self.errors = 0
282
283 def generator(dut, ref_init):
284 for adr, ref in enumerate(ref_init):
285 adr = adr << 2
286 data, resp = (yield from dut.axi_lite.read(adr))
287 self.assertEqual(resp, 0b00)
288 if data != ref:
289 dut.errors += 1
290
291 write_data = [prng.randrange(255) for _ in ref_init]
292
293 for adr, wdata in enumerate(write_data):
294 adr = adr << 2
295 resp = (yield from dut.axi_lite.write(adr, wdata))
296 self.assertEqual(resp, 0b00)
297 rdata, resp = (yield from dut.axi_lite.read(adr))
298 self.assertEqual(resp, 0b00)
299 if rdata != wdata:
300 dut.errors += 1
301
302 prng = random.Random(42)
303 init = [prng.randrange(2**32) for i in range(100)]
304
305 dut = DUT(size=len(init)*4, init=[v for v in init])
306 run_simulation(dut, [generator(dut, init)])
307 self.assertEqual(dut.errors, 0)
308
309 def converter_test(self, width_from, width_to, parallel_rw=False,
310 write_pattern=None, write_expected=None,
311 read_pattern=None, read_expected=None):
312 assert not (write_pattern is None and read_pattern is None)
313
314 if write_pattern is None:
315 write_pattern = []
316 write_expected = []
317 elif len(write_pattern[0]) == 2:
318 # add w.strb
319 write_pattern = [(adr, data, 2**(width_from//8)-1) for adr, data in write_pattern]
320
321 if read_pattern is None:
322 read_pattern = []
323 read_expected = []
324
325 class DUT(Module):
326 def __init__(self, width_from, width_to):
327 self.master = AXILiteInterface(data_width=width_from)
328 self.slave = AXILiteInterface(data_width=width_to)
329 self.submodules.converter = AXILiteConverter(self.master, self.slave)
330
331 prng = random.Random(42)
332
333 def write_generator(axi_lite):
334 for addr, data, strb in write_pattern or []:
335 resp = (yield from axi_lite.write(addr, data, strb))
336 self.assertEqual(resp, RESP_OKAY)
337 for _ in range(prng.randrange(3)):
338 yield
339 for _ in range(16):
340 yield
341
342 def read_generator(axi_lite):
343 for addr, refdata in read_pattern or []:
344 data, resp = (yield from axi_lite.read(addr))
345 self.assertEqual(resp, RESP_OKAY)
346 self.assertEqual(data, refdata)
347 for _ in range(prng.randrange(3)):
348 yield
349 for _ in range(4):
350 yield
351
352 def sequential_generator(axi_lite):
353 yield from write_generator(axi_lite)
354 yield from read_generator(axi_lite)
355
356 def rdata_generator(adr):
357 for a, v in read_expected:
358 if a == adr:
359 return v
360 return 0xbaadc0de
361
362 _latency = 0
363 def latency():
364 nonlocal _latency
365 _latency = (_latency + 1) % 3
366 return _latency
367
368 dut = DUT(width_from=width_from, width_to=width_to)
369 checker = AXILiteChecker(ready_latency=latency, rdata_generator=rdata_generator)
370 if parallel_rw:
371 generators = [write_generator(dut.master), read_generator(dut.master)]
372 else:
373 generators = [sequential_generator(dut.master)]
374 generators += checker.parallel_handlers(dut.slave)
375 run_simulation(dut, generators)
376 self.assertEqual(checker.writes, write_expected)
377 self.assertEqual(checker.reads, read_expected)
378
379 def test_axilite_down_converter_32to16(self):
380 write_pattern = [
381 (0x00000000, 0x22221111),
382 (0x00000004, 0x44443333),
383 (0x00000008, 0x66665555),
384 (0x00000100, 0x88887777),
385 ]
386 write_expected = [
387 (0x00000000, 0x1111, 0b11),
388 (0x00000002, 0x2222, 0b11),
389 (0x00000004, 0x3333, 0b11),
390 (0x00000006, 0x4444, 0b11),
391 (0x00000008, 0x5555, 0b11),
392 (0x0000000a, 0x6666, 0b11),
393 (0x00000100, 0x7777, 0b11),
394 (0x00000102, 0x8888, 0b11),
395 ]
396 read_pattern = write_pattern
397 read_expected = [(adr, data) for (adr, data, _) in write_expected]
398 for parallel in [False, True]:
399 with self.subTest(parallel=parallel):
400 self.converter_test(width_from=32, width_to=16, parallel_rw=parallel,
401 write_pattern=write_pattern, write_expected=write_expected,
402 read_pattern=read_pattern, read_expected=read_expected)
403
404 def test_axilite_down_converter_32to8(self):
405 write_pattern = [
406 (0x00000000, 0x44332211),
407 (0x00000004, 0x88776655),
408 ]
409 write_expected = [
410 (0x00000000, 0x11, 0b1),
411 (0x00000001, 0x22, 0b1),
412 (0x00000002, 0x33, 0b1),
413 (0x00000003, 0x44, 0b1),
414 (0x00000004, 0x55, 0b1),
415 (0x00000005, 0x66, 0b1),
416 (0x00000006, 0x77, 0b1),
417 (0x00000007, 0x88, 0b1),
418 ]
419 read_pattern = write_pattern
420 read_expected = [(adr, data) for (adr, data, _) in write_expected]
421 for parallel in [False, True]:
422 with self.subTest(parallel=parallel):
423 self.converter_test(width_from=32, width_to=8, parallel_rw=parallel,
424 write_pattern=write_pattern, write_expected=write_expected,
425 read_pattern=read_pattern, read_expected=read_expected)
426
427 def test_axilite_down_converter_64to32(self):
428 write_pattern = [
429 (0x00000000, 0x2222222211111111),
430 (0x00000008, 0x4444444433333333),
431 ]
432 write_expected = [
433 (0x00000000, 0x11111111, 0b1111),
434 (0x00000004, 0x22222222, 0b1111),
435 (0x00000008, 0x33333333, 0b1111),
436 (0x0000000c, 0x44444444, 0b1111),
437 ]
438 read_pattern = write_pattern
439 read_expected = [(adr, data) for (adr, data, _) in write_expected]
440 for parallel in [False, True]:
441 with self.subTest(parallel=parallel):
442 self.converter_test(width_from=64, width_to=32, parallel_rw=parallel,
443 write_pattern=write_pattern, write_expected=write_expected,
444 read_pattern=read_pattern, read_expected=read_expected)
445
446 def test_axilite_down_converter_strb(self):
447 write_pattern = [
448 (0x00000000, 0x22221111, 0b1100),
449 (0x00000004, 0x44443333, 0b1111),
450 (0x00000008, 0x66665555, 0b1011),
451 (0x00000100, 0x88887777, 0b0011),
452 ]
453 write_expected = [
454 (0x00000002, 0x2222, 0b11),
455 (0x00000004, 0x3333, 0b11),
456 (0x00000006, 0x4444, 0b11),
457 (0x00000008, 0x5555, 0b11),
458 (0x0000000a, 0x6666, 0b10),
459 (0x00000100, 0x7777, 0b11),
460 ]
461 self.converter_test(width_from=32, width_to=16,
462 write_pattern=write_pattern, write_expected=write_expected)
463
464 def test_axilite_up_converter_16to32(self):
465 write_pattern = [
466 (0x00000000, 0x1111),
467 (0x00000002, 0x2222),
468 (0x00000006, 0x3333),
469 (0x00000004, 0x4444),
470 (0x00000102, 0x5555),
471 ]
472 write_expected = [
473 (0x00000000, 0x00001111, 0b0011),
474 (0x00000000, 0x22220000, 0b1100),
475 (0x00000004, 0x33330000, 0b1100),
476 (0x00000004, 0x00004444, 0b0011),
477 (0x00000100, 0x55550000, 0b1100),
478 ]
479 read_pattern = write_pattern
480 read_expected = [
481 (0x00000000, 0x22221111),
482 (0x00000000, 0x22221111),
483 (0x00000004, 0x33334444),
484 (0x00000004, 0x33334444),
485 (0x00000100, 0x55550000),
486 ]
487 for parallel in [False, True]:
488 with self.subTest(parallel=parallel):
489 self.converter_test(width_from=16, width_to=32, parallel_rw=parallel,
490 write_pattern=write_pattern, write_expected=write_expected,
491 read_pattern=read_pattern, read_expected=read_expected)
492
493 def test_axilite_up_converter_8to32(self):
494 write_pattern = [
495 (0x00000000, 0x11),
496 (0x00000001, 0x22),
497 (0x00000003, 0x33),
498 (0x00000002, 0x44),
499 (0x00000101, 0x55),
500 ]
501 write_expected = [
502 (0x00000000, 0x00000011, 0b0001),
503 (0x00000000, 0x00002200, 0b0010),
504 (0x00000000, 0x33000000, 0b1000),
505 (0x00000000, 0x00440000, 0b0100),
506 (0x00000100, 0x00005500, 0b0010),
507 ]
508 read_pattern = write_pattern
509 read_expected = [
510 (0x00000000, 0x33442211),
511 (0x00000000, 0x33442211),
512 (0x00000000, 0x33442211),
513 (0x00000000, 0x33442211),
514 (0x00000100, 0x00005500),
515 ]
516 for parallel in [False, True]:
517 with self.subTest(parallel=parallel):
518 self.converter_test(width_from=8, width_to=32, parallel_rw=parallel,
519 write_pattern=write_pattern, write_expected=write_expected,
520 read_pattern=read_pattern, read_expected=read_expected)
521
522 def test_axilite_up_converter_strb(self):
523 write_pattern = [
524 (0x00000000, 0x1111, 0b10),
525 (0x00000002, 0x2222, 0b11),
526 (0x00000006, 0x3333, 0b11),
527 (0x00000004, 0x4444, 0b01),
528 (0x00000102, 0x5555, 0b01),
529 ]
530 write_expected = [
531 (0x00000000, 0x00001111, 0b0010),
532 (0x00000000, 0x22220000, 0b1100),
533 (0x00000004, 0x33330000, 0b1100),
534 (0x00000004, 0x00004444, 0b0001),
535 (0x00000100, 0x55550000, 0b0100),
536 ]
537 self.converter_test(width_from=16, width_to=32,
538 write_pattern=write_pattern, write_expected=write_expected)
539
540 # TestAXILiteInterconnet ---------------------------------------------------------------------------
541
542 class TestAXILiteInterconnect(unittest.TestCase):
543 def test_interconnect_p2p(self):
544 class DUT(Module):
545 def __init__(self):
546 self.master = master = AXILiteInterface()
547 self.slave = slave = AXILiteInterface()
548 self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave)
549
550 pattern = [
551 ("w", 0x00000004, 0x11111111),
552 ("w", 0x0000000c, 0x22222222),
553 ("r", 0x00000010, 0x33333333),
554 ("r", 0x00000018, 0x44444444),
555 ]
556
557 def rdata_generator(adr):
558 for rw, a, v in pattern:
559 if rw == "r" and a == adr:
560 return v
561 return 0xbaadc0de
562
563 dut = DUT()
564 checker = AXILiteChecker(rdata_generator=rdata_generator)
565 generators = [
566 AXILitePatternGenerator(dut.master, pattern).handler(),
567 checker.handler(dut.slave),
568 ]
569 run_simulation(dut, generators)
570 self.assertEqual(checker.writes, [(addr, data, 0b1111) for rw, addr, data in pattern if rw == "w"])
571 self.assertEqual(checker.reads, [(addr, data) for rw, addr, data in pattern if rw == "r"])
572
573 def test_timeout(self):
574 class DUT(Module):
575 def __init__(self):
576 self.master = master = AXILiteInterface()
577 self.slave = slave = AXILiteInterface()
578 self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave)
579 self.submodules.timeout = AXILiteTimeout(master, 16)
580
581 def generator(axi_lite):
582 resp = (yield from axi_lite.write(0x00001000, 0x11111111))
583 self.assertEqual(resp, RESP_OKAY)
584 resp = (yield from axi_lite.write(0x00002000, 0x22222222))
585 self.assertEqual(resp, RESP_SLVERR)
586 data, resp = (yield from axi_lite.read(0x00003000))
587 self.assertEqual(resp, RESP_SLVERR)
588 self.assertEqual(data, 0xffffffff)
589 yield
590
591 def checker(axi_lite):
592 for _ in range(16):
593 yield
594 yield axi_lite.aw.ready.eq(1)
595 yield axi_lite.w.ready.eq(1)
596 yield
597 yield axi_lite.aw.ready.eq(0)
598 yield axi_lite.w.ready.eq(0)
599 yield axi_lite.b.valid.eq(1)
600 yield
601 while not (yield axi_lite.b.ready):
602 yield
603 yield axi_lite.b.valid.eq(0)
604
605 dut = DUT()
606 generators = [
607 generator(dut.master),
608 checker(dut.slave),
609 timeout_generator(300),
610 ]
611 run_simulation(dut, generators)
612
613 def test_arbiter_order(self):
614 class DUT(Module):
615 def __init__(self, n_masters):
616 self.masters = [AXILiteInterface() for _ in range(n_masters)]
617 self.slave = AXILiteInterface()
618 self.submodules.arbiter = AXILiteArbiter(self.masters, self.slave)
619
620 def generator(n, axi_lite, delay=0):
621 def gen(i):
622 return 100*n + i
623
624 for i in range(4):
625 resp = (yield from axi_lite.write(gen(i), gen(i)))
626 self.assertEqual(resp, RESP_OKAY)
627 for _ in range(delay):
628 yield
629 for i in range(4):
630 data, resp = (yield from axi_lite.read(gen(i)))
631 self.assertEqual(resp, RESP_OKAY)
632 for _ in range(delay):
633 yield
634 for _ in range(8):
635 yield
636
637 n_masters = 3
638
639 # with no delay each master will do all transfers at once
640 with self.subTest(delay=0):
641 dut = DUT(n_masters)
642 checker = AXILiteChecker()
643 generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
644 generators += [timeout_generator(300), checker.handler(dut.slave)]
645 run_simulation(dut, generators)
646 order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
647 self.assertEqual([addr for addr, data, strb in checker.writes], order)
648 self.assertEqual([addr for addr, data in checker.reads], order)
649
650 # with some delay, the round-robin arbiter will iterate over masters
651 with self.subTest(delay=1):
652 dut = DUT(n_masters)
653 checker = AXILiteChecker()
654 generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
655 generators += [timeout_generator(300), checker.handler(dut.slave)]
656 run_simulation(dut, generators)
657 order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
658 self.assertEqual([addr for addr, data, strb in checker.writes], order)
659 self.assertEqual([addr for addr, data in checker.reads], order)
660
661 def test_arbiter_holds_grant_until_response(self):
662 class DUT(Module):
663 def __init__(self, n_masters):
664 self.masters = [AXILiteInterface() for _ in range(n_masters)]
665 self.slave = AXILiteInterface()
666 self.submodules.arbiter = AXILiteArbiter(self.masters, self.slave)
667
668 def generator(n, axi_lite, delay=0):
669 def gen(i):
670 return 100*n + i
671
672 for i in range(4):
673 resp = (yield from axi_lite.write(gen(i), gen(i)))
674 self.assertEqual(resp, RESP_OKAY)
675 for _ in range(delay):
676 yield
677 for i in range(4):
678 data, resp = (yield from axi_lite.read(gen(i)))
679 self.assertEqual(resp, RESP_OKAY)
680 for _ in range(delay):
681 yield
682 for _ in range(8):
683 yield
684
685 n_masters = 3
686
687 # with no delay each master will do all transfers at once
688 with self.subTest(delay=0):
689 dut = DUT(n_masters)
690 checker = AXILiteChecker(response_latency=lambda: 3)
691 generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
692 generators += [timeout_generator(300), checker.handler(dut.slave)]
693 run_simulation(dut, generators)
694 order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
695 self.assertEqual([addr for addr, data, strb in checker.writes], order)
696 self.assertEqual([addr for addr, data in checker.reads], order)
697
698 # with some delay, the round-robin arbiter will iterate over masters
699 with self.subTest(delay=1):
700 dut = DUT(n_masters)
701 checker = AXILiteChecker(response_latency=lambda: 3)
702 generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
703 generators += [timeout_generator(300), checker.handler(dut.slave)]
704 run_simulation(dut, generators)
705 order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
706 self.assertEqual([addr for addr, data, strb in checker.writes], order)
707 self.assertEqual([addr for addr, data in checker.reads], order)
708
709 def address_decoder(self, i, size=0x100, python=False):
710 # bytes to 32-bit words aligned
711 _size = (size) >> 2
712 _origin = (size * i) >> 2
713 if python: # for python integers
714 shift = log2_int(_size)
715 return lambda a: ((a >> shift) == (_origin >> shift))
716 # for migen signals
717 return lambda a: (a[log2_int(_size):] == (_origin >> log2_int(_size)))
718
719 def decoder_test(self, n_slaves, pattern, generator_delay=0):
720 class DUT(Module):
721 def __init__(self, decoders):
722 self.master = AXILiteInterface()
723 self.slaves = [AXILiteInterface() for _ in range(len(decoders))]
724 slaves = list(zip(decoders, self.slaves))
725 self.submodules.decoder = AXILiteDecoder(self.master, slaves)
726
727 def rdata_generator(adr):
728 for rw, a, v in pattern:
729 if rw == "r" and a == adr:
730 return v
731 return 0xbaadc0de
732
733 dut = DUT([self.address_decoder(i) for i in range(n_slaves)])
734 checkers = [AXILiteChecker(rdata_generator=rdata_generator) for _ in dut.slaves]
735
736 generators = [AXILitePatternGenerator(dut.master, pattern, delay=generator_delay).handler()]
737 generators += [checker.handler(slave) for (slave, checker) in zip(dut.slaves, checkers)]
738 generators += [timeout_generator(300)]
739 run_simulation(dut, generators)
740
741 return checkers
742
743 def test_decoder_write(self):
744 for delay in [0, 1, 0]:
745 with self.subTest(delay=delay):
746 slaves = self.decoder_test(n_slaves=3, pattern=[
747 ("w", 0x010, 1),
748 ("w", 0x110, 2),
749 ("w", 0x210, 3),
750 ("w", 0x011, 1),
751 ("w", 0x012, 1),
752 ("w", 0x111, 2),
753 ("w", 0x112, 2),
754 ("w", 0x211, 3),
755 ("w", 0x212, 3),
756 ], generator_delay=delay)
757
758 def addr(checker_list):
759 return [entry[0] for entry in checker_list]
760
761 self.assertEqual(addr(slaves[0].writes), [0x010, 0x011, 0x012])
762 self.assertEqual(addr(slaves[1].writes), [0x110, 0x111, 0x112])
763 self.assertEqual(addr(slaves[2].writes), [0x210, 0x211, 0x212])
764 for slave in slaves:
765 self.assertEqual(slave.reads, [])
766
767 def test_decoder_read(self):
768 for delay in [0, 1]:
769 with self.subTest(delay=delay):
770 slaves = self.decoder_test(n_slaves=3, pattern=[
771 ("r", 0x010, 1),
772 ("r", 0x110, 2),
773 ("r", 0x210, 3),
774 ("r", 0x011, 1),
775 ("r", 0x012, 1),
776 ("r", 0x111, 2),
777 ("r", 0x112, 2),
778 ("r", 0x211, 3),
779 ("r", 0x212, 3),
780 ], generator_delay=delay)
781
782 def addr(checker_list):
783 return [entry[0] for entry in checker_list]
784
785 self.assertEqual(addr(slaves[0].reads), [0x010, 0x011, 0x012])
786 self.assertEqual(addr(slaves[1].reads), [0x110, 0x111, 0x112])
787 self.assertEqual(addr(slaves[2].reads), [0x210, 0x211, 0x212])
788 for slave in slaves:
789 self.assertEqual(slave.writes, [])
790
791 def test_decoder_read_write(self):
792 for delay in [0, 1]:
793 with self.subTest(delay=delay):
794 slaves = self.decoder_test(n_slaves=3, pattern=[
795 ("w", 0x010, 1),
796 ("w", 0x110, 2),
797 ("r", 0x111, 2),
798 ("r", 0x011, 1),
799 ("r", 0x211, 3),
800 ("w", 0x210, 3),
801 ], generator_delay=delay)
802
803 def addr(checker_list):
804 return [entry[0] for entry in checker_list]
805
806 self.assertEqual(addr(slaves[0].writes), [0x010])
807 self.assertEqual(addr(slaves[0].reads), [0x011])
808 self.assertEqual(addr(slaves[1].writes), [0x110])
809 self.assertEqual(addr(slaves[1].reads), [0x111])
810 self.assertEqual(addr(slaves[2].writes), [0x210])
811 self.assertEqual(addr(slaves[2].reads), [0x211])
812
813 def test_decoder_stall(self):
814 with self.assertRaises(TimeoutError):
815 self.decoder_test(n_slaves=3, pattern=[
816 ("w", 0x300, 1),
817 ])
818 with self.assertRaises(TimeoutError):
819 self.decoder_test(n_slaves=3, pattern=[
820 ("r", 0x300, 1),
821 ])
822
823 def interconnect_test(self, master_patterns, slave_decoders,
824 master_delay=0, slave_ready_latency=0, slave_response_latency=0,
825 disconnected_slaves=None, timeout=300, interconnect=AXILiteInterconnectShared,
826 **kwargs):
827 # number of masters/slaves is defined by the number of patterns/decoders
828 # master_patterns: list of patterns per master, pattern = list(tuple(rw, addr, data))
829 # slave_decoders: list of address decoders per slave
830 # delay/latency: control the speed of masters/slaves
831 # disconnected_slaves: list of slave numbers that shouldn't respond to any transactions
832 class DUT(Module):
833 def __init__(self, n_masters, decoders, **kwargs):
834 self.masters = [AXILiteInterface(name="master") for _ in range(n_masters)]
835 self.slaves = [AXILiteInterface(name="slave") for _ in range(len(decoders))]
836 slaves = list(zip(decoders, self.slaves))
837 self.submodules.interconnect = interconnect(self.masters, slaves, **kwargs)
838
839 class ReadDataGenerator:
840 # Generates data based on decoded addresses and data defined in master_patterns
841 def __init__(self, patterns):
842 self.mem = {}
843 for pattern in patterns:
844 for rw, addr, val in pattern:
845 if rw == "r":
846 assert addr not in self.mem
847 self.mem[addr] = val
848
849 def getter(self, n):
850 # on miss will give default data depending on slave n
851 return lambda addr: self.mem.get(addr, 0xbaad0000 + n)
852
853 def new_checker(rdata_generator):
854 return AXILiteChecker(ready_latency=slave_ready_latency,
855 response_latency=slave_response_latency,
856 rdata_generator=rdata_generator)
857
858 # perpare test
859 dut = DUT(len(master_patterns), slave_decoders, **kwargs)
860 rdata_generator = ReadDataGenerator(master_patterns)
861 checkers = [new_checker(rdata_generator.getter(i)) for i, _ in enumerate(master_patterns)]
862 pattern_generators = [AXILitePatternGenerator(dut.masters[i], pattern, delay=master_delay)
863 for i, pattern in enumerate(master_patterns)]
864
865 # run simulator
866 generators = [gen.handler() for gen in pattern_generators]
867 generators += [checker.handler(slave)
868 for i, (slave, checker) in enumerate(zip(dut.slaves, checkers))
869 if i not in (disconnected_slaves or [])]
870 generators += [timeout_generator(timeout)]
871 run_simulation(dut, generators)
872
873 return pattern_generators, checkers
874
875 def test_interconnect_shared_basic(self):
876 master_patterns = [
877 [("w", 0x000, 0), ("w", 0x101, 0), ("w", 0x202, 0)],
878 [("w", 0x010, 0), ("w", 0x111, 0), ("w", 0x112, 0)],
879 [("w", 0x220, 0), ("w", 0x221, 0), ("w", 0x222, 0)],
880 ]
881 slave_decoders = [self.address_decoder(i) for i in range(3)]
882
883 generators, checkers = self.interconnect_test(master_patterns, slave_decoders,
884 master_delay=1)
885
886 for gen in generators:
887 self.assertEqual(gen.errors, 0)
888
889 def addr(checker_list):
890 return [entry[0] for entry in checker_list]
891
892 self.assertEqual(addr(checkers[0].writes), [0x000, 0x010])
893 self.assertEqual(addr(checkers[1].writes), [0x101, 0x111, 0x112])
894 self.assertEqual(addr(checkers[2].writes), [0x220, 0x221, 0x202, 0x222])
895 self.assertEqual(addr(checkers[0].reads), [])
896 self.assertEqual(addr(checkers[1].reads), [])
897 self.assertEqual(addr(checkers[2].reads), [])
898
899 def interconnect_stress_test(self, timeout=1000, **kwargs):
900 prng = random.Random(42)
901
902 n_masters = 3
903 n_slaves = 3
904 pattern_length = 64
905 slave_region_size = 0x10000000
906 # for testing purpose each master will access only its own region of a slave
907 master_region_size = 0x1000
908 assert n_masters*master_region_size < slave_region_size
909
910 def gen_pattern(n, length):
911 assert length < master_region_size
912 for i_access in range(length):
913 rw = "w" if prng.randint(0, 1) == 0 else "r"
914 i_slave = prng.randrange(n_slaves)
915 addr = i_slave*slave_region_size + n*master_region_size + i_access
916 data = addr
917 yield rw, addr, data
918
919 master_patterns = [list(gen_pattern(i, pattern_length)) for i in range(n_masters)]
920 slave_decoders = [self.address_decoder(i, size=slave_region_size) for i in range(n_slaves)]
921 slave_decoders_py = [self.address_decoder(i, size=slave_region_size, python=True)
922 for i in range(n_slaves)]
923
924 generators, checkers = self.interconnect_test(master_patterns, slave_decoders,
925 timeout=timeout, **kwargs)
926
927 for gen in generators:
928 read_errors = [" 0x{:08x} vs 0x{:08x}".format(v, ref) for v, ref in gen.read_errors]
929 msg = "\ngen.resp_errors = {}\ngen.read_errors = \n{}".format(
930 gen.resp_errors, "\n".join(read_errors))
931 if not kwargs.get("disconnected_slaves", None):
932 self.assertEqual(gen.errors, 0, msg=msg)
933 else: # when some slaves are disconnected we should have some errors
934 self.assertNotEqual(gen.errors, 0, msg=msg)
935
936 # make sure all the accesses at slave side are in correct address region
937 for i_slave, (checker, decoder) in enumerate(zip(checkers, slave_decoders_py)):
938 for addr in (entry[0] for entry in checker.writes + checker.reads):
939 # compensate for the fact that decoders work on word-aligned addresses
940 self.assertNotEqual(decoder(addr >> 2), 0)
941
942 def test_interconnect_shared_stress_no_delay(self):
943 self.interconnect_stress_test(timeout=1000,
944 master_delay=0,
945 slave_ready_latency=0,
946 slave_response_latency=0)
947
948 def test_interconnect_shared_stress_rand_short(self):
949 prng = random.Random(42)
950 rand = lambda: prng.randrange(4)
951 self.interconnect_stress_test(timeout=2000,
952 master_delay=rand,
953 slave_ready_latency=rand,
954 slave_response_latency=rand)
955
956 def test_interconnect_shared_stress_rand_long(self):
957 prng = random.Random(42)
958 rand = lambda: prng.randrange(16)
959 self.interconnect_stress_test(timeout=4000,
960 master_delay=rand,
961 slave_ready_latency=rand,
962 slave_response_latency=rand)
963
964 def test_interconnect_shared_stress_timeout(self):
965 self.interconnect_stress_test(timeout=4000,
966 disconnected_slaves=[1],
967 timeout_cycles=50)
968
969 def test_crossbar_stress_no_delay(self):
970 self.interconnect_stress_test(timeout=1000,
971 master_delay=0,
972 slave_ready_latency=0,
973 slave_response_latency=0,
974 interconnect=AXILiteCrossbar)
975
976 def test_crossbar_stress_rand(self):
977 prng = random.Random(42)
978 rand = lambda: prng.randrange(4)
979 self.interconnect_stress_test(timeout=2000,
980 master_delay=rand,
981 slave_ready_latency=rand,
982 slave_response_latency=rand,
983 interconnect=AXILiteCrossbar)