160f7e239043e788b8006446f7c85592ee5cebc4
[litex.git] / test / test_axi_lite.py
1 # This file is Copyright (c) 2020 Antmicro <www.antmicro.com>
2 # License: BSD
3
4 import unittest
5 import random
6
7 from migen import *
8
9 from litex.soc.interconnect.axi import *
10 from litex.soc.interconnect import wishbone, csr_bus
11
12 # Helpers ------------------------------------------------------------------------------------------
13
14 def _int_or_call(int_or_func):
15 if callable(int_or_func):
16 return int_or_func()
17 return int_or_func
18
19 @passive
20 def timeout_generator(ticks):
21 import os
22 for i in range(ticks):
23 if os.environ.get("TIMEOUT_DEBUG", "") == "1":
24 print("tick {}".format(i))
25 yield
26 raise TimeoutError("Timeout after %d ticks" % ticks)
27
28 class AXILiteChecker:
29 def __init__(self, ready_latency=0, response_latency=0, rdata_generator=None):
30 self.ready_latency = ready_latency
31 self.response_latency = response_latency
32 self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de)
33 self.writes = [] # (addr, data, strb)
34 self.reads = [] # (addr, data)
35
36 def delay(self, latency):
37 for _ in range(_int_or_call(latency)):
38 yield
39
40 def handle_write(self, axi_lite):
41 # aw
42 while not (yield axi_lite.aw.valid):
43 yield
44 yield from self.delay(self.ready_latency)
45 addr = (yield axi_lite.aw.addr)
46 yield axi_lite.aw.ready.eq(1)
47 yield
48 yield axi_lite.aw.ready.eq(0)
49 while not (yield axi_lite.w.valid):
50 yield
51 yield from self.delay(self.ready_latency)
52 # w
53 data = (yield axi_lite.w.data)
54 strb = (yield axi_lite.w.strb)
55 yield axi_lite.w.ready.eq(1)
56 yield
57 yield axi_lite.w.ready.eq(0)
58 yield from self.delay(self.response_latency)
59 # b
60 yield axi_lite.b.valid.eq(1)
61 yield axi_lite.b.resp.eq(RESP_OKAY)
62 yield
63 while not (yield axi_lite.b.ready):
64 yield
65 yield axi_lite.b.valid.eq(0)
66 self.writes.append((addr, data, strb))
67
68 def handle_read(self, axi_lite):
69 # ar
70 while not (yield axi_lite.ar.valid):
71 yield
72 yield from self.delay(self.ready_latency)
73 addr = (yield axi_lite.ar.addr)
74 yield axi_lite.ar.ready.eq(1)
75 yield
76 yield axi_lite.ar.ready.eq(0)
77 yield from self.delay(self.response_latency)
78 # r
79 data = self.rdata_generator(addr)
80 yield axi_lite.r.valid.eq(1)
81 yield axi_lite.r.resp.eq(RESP_OKAY)
82 yield axi_lite.r.data.eq(data)
83 yield
84 while not (yield axi_lite.r.ready):
85 yield
86 yield axi_lite.r.valid.eq(0)
87 yield axi_lite.r.data.eq(0)
88 self.reads.append((addr, data))
89
90 @passive
91 def handler(self, axi_lite):
92 while True:
93 if (yield axi_lite.aw.valid):
94 yield from self.handle_write(axi_lite)
95 if (yield axi_lite.ar.valid):
96 yield from self.handle_read(axi_lite)
97 yield
98
99 @passive
100 def _write_handler(self, axi_lite):
101 while True:
102 yield from self.handle_write(axi_lite)
103 yield
104
105 @passive
106 def _read_handler(self, axi_lite):
107 while True:
108 yield from self.handle_read(axi_lite)
109 yield
110
111 def parallel_handlers(self, axi_lite):
112 return self._write_handler(axi_lite), self._read_handler(axi_lite)
113
114 class AXILitePatternGenerator:
115 def __init__(self, axi_lite, pattern, delay=0):
116 # patter: (rw, addr, data)
117 self.axi_lite = axi_lite
118 self.pattern = pattern
119 self.delay = delay
120 self.errors = 0
121 self.read_errors = []
122 self.resp_errors = {"w": 0, "r": 0}
123
124 def handler(self):
125 for rw, addr, data in self.pattern:
126 assert rw in ["w", "r"]
127 if rw == "w":
128 strb = 2**len(self.axi_lite.w.strb) - 1
129 resp = (yield from self.axi_lite.write(addr, data, strb))
130 else:
131 rdata, resp = (yield from self.axi_lite.read(addr))
132 if rdata != data:
133 self.read_errors.append((rdata, data))
134 self.errors += 1
135 if resp != RESP_OKAY:
136 self.resp_errors[rw] += 1
137 self.errors += 1
138 for _ in range(_int_or_call(self.delay)):
139 yield
140 for _ in range(16):
141 yield
142
143 # TestAXILite --------------------------------------------------------------------------------------
144
145 class TestAXILite(unittest.TestCase):
146 def test_wishbone2axi2wishbone(self):
147 class DUT(Module):
148 def __init__(self):
149 self.wishbone = wishbone.Interface(data_width=32)
150
151 # # #
152
153 axi = AXILiteInterface(data_width=32, address_width=32)
154 wb = wishbone.Interface(data_width=32)
155
156 wishbone2axi = Wishbone2AXILite(self.wishbone, axi)
157 axi2wishbone = AXILite2Wishbone(axi, wb)
158 self.submodules += wishbone2axi, axi2wishbone
159
160 sram = wishbone.SRAM(1024, init=[0x12345678, 0xa55aa55a])
161 self.submodules += sram
162 self.comb += wb.connect(sram.bus)
163
164 def generator(dut):
165 dut.errors = 0
166 if (yield from dut.wishbone.read(0)) != 0x12345678:
167 dut.errors += 1
168 if (yield from dut.wishbone.read(1)) != 0xa55aa55a:
169 dut.errors += 1
170 for i in range(32):
171 yield from dut.wishbone.write(i, i)
172 for i in range(32):
173 if (yield from dut.wishbone.read(i)) != i:
174 dut.errors += 1
175
176 dut = DUT()
177 run_simulation(dut, [generator(dut)])
178 self.assertEqual(dut.errors, 0)
179
180 def test_axilite2axi2mem(self):
181 class DUT(Module):
182 def __init__(self, mem_bus="wishbone"):
183 self.axi_lite = AXILiteInterface()
184
185 axi = AXIInterface()
186 self.submodules.axil2axi = AXILite2AXI(self.axi_lite, axi)
187
188 interface_cls, converter_cls, sram_cls = {
189 "wishbone": (wishbone.Interface, AXI2Wishbone, wishbone.SRAM),
190 "axi_lite": (AXILiteInterface, AXI2AXILite, AXILiteSRAM),
191 }[mem_bus]
192
193 bus = interface_cls()
194 self.submodules += converter_cls(axi, bus)
195 sram = sram_cls(1024, init=[0x12345678, 0xa55aa55a])
196 self.submodules += sram
197 self.comb += bus.connect(sram.bus)
198
199 def generator(axi_lite, datas, resps):
200 data, resp = (yield from axi_lite.read(0x00))
201 resps.append((resp, RESP_OKAY))
202 datas.append((data, 0x12345678))
203 data, resp = (yield from axi_lite.read(0x04))
204 resps.append((resp, RESP_OKAY))
205 datas.append((data, 0xa55aa55a))
206 for i in range(32):
207 resp = (yield from axi_lite.write(4*i, i))
208 resps.append((resp, RESP_OKAY))
209 for i in range(32):
210 data, resp = (yield from axi_lite.read(4*i))
211 resps.append((resp, RESP_OKAY))
212 datas.append((data, i))
213
214 for mem_bus in ["wishbone", "axi_lite"]:
215 with self.subTest(mem_bus=mem_bus):
216 # to have more verbose error messages store errors in list((actual, expected))
217 datas = []
218 resps = []
219
220 def actual_expected(results): # split into (list(actual), list(expected))
221 return list(zip(*results))
222
223 dut = DUT(mem_bus)
224 run_simulation(dut, [generator(dut.axi_lite, datas, resps)])
225 self.assertEqual(*actual_expected(resps))
226 msg = "\n".join("0x{:08x} vs 0x{:08x}".format(actual, expected) for actual, expected in datas)
227 self.assertEqual(*actual_expected(datas), msg="actual vs expected:\n" + msg)
228
229 def test_axilite2csr(self):
230 @passive
231 def csr_mem_handler(csr, mem):
232 while True:
233 adr = (yield csr.adr)
234 yield csr.dat_r.eq(mem[adr])
235 if (yield csr.we):
236 mem[adr] = (yield csr.dat_w)
237 yield
238
239 class DUT(Module):
240 def __init__(self):
241 self.axi_lite = AXILiteInterface()
242 self.csr = csr_bus.Interface()
243 self.submodules.axilite2csr = AXILite2CSR(self.axi_lite, self.csr)
244 self.errors = 0
245
246 prng = random.Random(42)
247 mem_ref = [prng.randrange(255) for i in range(100)]
248
249 def generator(dut):
250 dut.errors = 0
251
252 for adr, ref in enumerate(mem_ref):
253 adr = adr << 2
254 data, resp = (yield from dut.axi_lite.read(adr))
255 self.assertEqual(resp, 0b00)
256 if data != ref:
257 dut.errors += 1
258
259 write_data = [prng.randrange(255) for _ in mem_ref]
260
261 for adr, wdata in enumerate(write_data):
262 adr = adr << 2
263 resp = (yield from dut.axi_lite.write(adr, wdata))
264 self.assertEqual(resp, 0b00)
265 rdata, resp = (yield from dut.axi_lite.read(adr))
266 self.assertEqual(resp, 0b00)
267 if rdata != wdata:
268 dut.errors += 1
269
270 dut = DUT()
271 mem = [v for v in mem_ref]
272 run_simulation(dut, [generator(dut), csr_mem_handler(dut.csr, mem)])
273 self.assertEqual(dut.errors, 0)
274
275 def test_axilite_sram(self):
276 class DUT(Module):
277 def __init__(self, size, init):
278 self.axi_lite = AXILiteInterface()
279 self.submodules.sram = AXILiteSRAM(size, init=init, bus=self.axi_lite)
280 self.errors = 0
281
282 def generator(dut, ref_init):
283 for adr, ref in enumerate(ref_init):
284 adr = adr << 2
285 data, resp = (yield from dut.axi_lite.read(adr))
286 self.assertEqual(resp, 0b00)
287 if data != ref:
288 dut.errors += 1
289
290 write_data = [prng.randrange(255) for _ in ref_init]
291
292 for adr, wdata in enumerate(write_data):
293 adr = adr << 2
294 resp = (yield from dut.axi_lite.write(adr, wdata))
295 self.assertEqual(resp, 0b00)
296 rdata, resp = (yield from dut.axi_lite.read(adr))
297 self.assertEqual(resp, 0b00)
298 if rdata != wdata:
299 dut.errors += 1
300
301 prng = random.Random(42)
302 init = [prng.randrange(2**32) for i in range(100)]
303
304 dut = DUT(size=len(init)*4, init=[v for v in init])
305 run_simulation(dut, [generator(dut, init)])
306 self.assertEqual(dut.errors, 0)
307
308 def converter_test(self, width_from, width_to, parallel_rw=False,
309 write_pattern=None, write_expected=None,
310 read_pattern=None, read_expected=None):
311 assert not (write_pattern is None and read_pattern is None)
312
313 if write_pattern is None:
314 write_pattern = []
315 write_expected = []
316 elif len(write_pattern[0]) == 2:
317 # add w.strb
318 write_pattern = [(adr, data, 2**(width_from//8)-1) for adr, data in write_pattern]
319
320 if read_pattern is None:
321 read_pattern = []
322 read_expected = []
323
324 class DUT(Module):
325 def __init__(self, width_from, width_to):
326 self.master = AXILiteInterface(data_width=width_from)
327 self.slave = AXILiteInterface(data_width=width_to)
328 self.submodules.converter = AXILiteConverter(self.master, self.slave)
329
330 prng = random.Random(42)
331
332 def write_generator(axi_lite):
333 for addr, data, strb in write_pattern or []:
334 resp = (yield from axi_lite.write(addr, data, strb))
335 self.assertEqual(resp, RESP_OKAY)
336 for _ in range(prng.randrange(3)):
337 yield
338 for _ in range(16):
339 yield
340
341 def read_generator(axi_lite):
342 for addr, refdata in read_pattern or []:
343 data, resp = (yield from axi_lite.read(addr))
344 self.assertEqual(resp, RESP_OKAY)
345 self.assertEqual(data, refdata)
346 for _ in range(prng.randrange(3)):
347 yield
348 for _ in range(4):
349 yield
350
351 def sequential_generator(axi_lite):
352 yield from write_generator(axi_lite)
353 yield from read_generator(axi_lite)
354
355 def rdata_generator(adr):
356 for a, v in read_expected:
357 if a == adr:
358 return v
359 return 0xbaadc0de
360
361 _latency = 0
362 def latency():
363 nonlocal _latency
364 _latency = (_latency + 1) % 3
365 return _latency
366
367 dut = DUT(width_from=width_from, width_to=width_to)
368 checker = AXILiteChecker(ready_latency=latency, rdata_generator=rdata_generator)
369 if parallel_rw:
370 generators = [write_generator(dut.master), read_generator(dut.master)]
371 else:
372 generators = [sequential_generator(dut.master)]
373 generators += checker.parallel_handlers(dut.slave)
374 run_simulation(dut, generators)
375 self.assertEqual(checker.writes, write_expected)
376 self.assertEqual(checker.reads, read_expected)
377
378 def test_axilite_down_converter_32to16(self):
379 write_pattern = [
380 (0x00000000, 0x22221111),
381 (0x00000004, 0x44443333),
382 (0x00000008, 0x66665555),
383 (0x00000100, 0x88887777),
384 ]
385 write_expected = [
386 (0x00000000, 0x1111, 0b11),
387 (0x00000002, 0x2222, 0b11),
388 (0x00000004, 0x3333, 0b11),
389 (0x00000006, 0x4444, 0b11),
390 (0x00000008, 0x5555, 0b11),
391 (0x0000000a, 0x6666, 0b11),
392 (0x00000100, 0x7777, 0b11),
393 (0x00000102, 0x8888, 0b11),
394 ]
395 read_pattern = write_pattern
396 read_expected = [(adr, data) for (adr, data, _) in write_expected]
397 for parallel in [False, True]:
398 with self.subTest(parallel=parallel):
399 self.converter_test(width_from=32, width_to=16, parallel_rw=parallel,
400 write_pattern=write_pattern, write_expected=write_expected,
401 read_pattern=read_pattern, read_expected=read_expected)
402
403 def test_axilite_down_converter_32to8(self):
404 write_pattern = [
405 (0x00000000, 0x44332211),
406 (0x00000004, 0x88776655),
407 ]
408 write_expected = [
409 (0x00000000, 0x11, 0b1),
410 (0x00000001, 0x22, 0b1),
411 (0x00000002, 0x33, 0b1),
412 (0x00000003, 0x44, 0b1),
413 (0x00000004, 0x55, 0b1),
414 (0x00000005, 0x66, 0b1),
415 (0x00000006, 0x77, 0b1),
416 (0x00000007, 0x88, 0b1),
417 ]
418 read_pattern = write_pattern
419 read_expected = [(adr, data) for (adr, data, _) in write_expected]
420 for parallel in [False, True]:
421 with self.subTest(parallel=parallel):
422 self.converter_test(width_from=32, width_to=8, parallel_rw=parallel,
423 write_pattern=write_pattern, write_expected=write_expected,
424 read_pattern=read_pattern, read_expected=read_expected)
425
426 def test_axilite_down_converter_64to32(self):
427 write_pattern = [
428 (0x00000000, 0x2222222211111111),
429 (0x00000008, 0x4444444433333333),
430 ]
431 write_expected = [
432 (0x00000000, 0x11111111, 0b1111),
433 (0x00000004, 0x22222222, 0b1111),
434 (0x00000008, 0x33333333, 0b1111),
435 (0x0000000c, 0x44444444, 0b1111),
436 ]
437 read_pattern = write_pattern
438 read_expected = [(adr, data) for (adr, data, _) in write_expected]
439 for parallel in [False, True]:
440 with self.subTest(parallel=parallel):
441 self.converter_test(width_from=64, width_to=32, parallel_rw=parallel,
442 write_pattern=write_pattern, write_expected=write_expected,
443 read_pattern=read_pattern, read_expected=read_expected)
444
445 def test_axilite_down_converter_strb(self):
446 write_pattern = [
447 (0x00000000, 0x22221111, 0b1100),
448 (0x00000004, 0x44443333, 0b1111),
449 (0x00000008, 0x66665555, 0b1011),
450 (0x00000100, 0x88887777, 0b0011),
451 ]
452 write_expected = [
453 (0x00000002, 0x2222, 0b11),
454 (0x00000004, 0x3333, 0b11),
455 (0x00000006, 0x4444, 0b11),
456 (0x00000008, 0x5555, 0b11),
457 (0x0000000a, 0x6666, 0b10),
458 (0x00000100, 0x7777, 0b11),
459 ]
460 self.converter_test(width_from=32, width_to=16,
461 write_pattern=write_pattern, write_expected=write_expected)
462
463 def test_axilite_up_converter_16to32(self):
464 write_pattern = [
465 (0x00000000, 0x1111),
466 (0x00000002, 0x2222),
467 (0x00000006, 0x3333),
468 (0x00000004, 0x4444),
469 (0x00000102, 0x5555),
470 ]
471 write_expected = [
472 (0x00000000, 0x00001111, 0b0011),
473 (0x00000000, 0x22220000, 0b1100),
474 (0x00000004, 0x33330000, 0b1100),
475 (0x00000004, 0x00004444, 0b0011),
476 (0x00000100, 0x55550000, 0b1100),
477 ]
478 read_pattern = write_pattern
479 read_expected = [
480 (0x00000000, 0x22221111),
481 (0x00000000, 0x22221111),
482 (0x00000004, 0x33334444),
483 (0x00000004, 0x33334444),
484 (0x00000100, 0x55550000),
485 ]
486 for parallel in [False, True]:
487 with self.subTest(parallel=parallel):
488 self.converter_test(width_from=16, width_to=32, parallel_rw=parallel,
489 write_pattern=write_pattern, write_expected=write_expected,
490 read_pattern=read_pattern, read_expected=read_expected)
491
492 def test_axilite_up_converter_8to32(self):
493 write_pattern = [
494 (0x00000000, 0x11),
495 (0x00000001, 0x22),
496 (0x00000003, 0x33),
497 (0x00000002, 0x44),
498 (0x00000101, 0x55),
499 ]
500 write_expected = [
501 (0x00000000, 0x00000011, 0b0001),
502 (0x00000000, 0x00002200, 0b0010),
503 (0x00000000, 0x33000000, 0b1000),
504 (0x00000000, 0x00440000, 0b0100),
505 (0x00000100, 0x00005500, 0b0010),
506 ]
507 read_pattern = write_pattern
508 read_expected = [
509 (0x00000000, 0x33442211),
510 (0x00000000, 0x33442211),
511 (0x00000000, 0x33442211),
512 (0x00000000, 0x33442211),
513 (0x00000100, 0x00005500),
514 ]
515 for parallel in [False, True]:
516 with self.subTest(parallel=parallel):
517 self.converter_test(width_from=8, width_to=32, parallel_rw=parallel,
518 write_pattern=write_pattern, write_expected=write_expected,
519 read_pattern=read_pattern, read_expected=read_expected)
520
521 def test_axilite_up_converter_strb(self):
522 write_pattern = [
523 (0x00000000, 0x1111, 0b10),
524 (0x00000002, 0x2222, 0b11),
525 (0x00000006, 0x3333, 0b11),
526 (0x00000004, 0x4444, 0b01),
527 (0x00000102, 0x5555, 0b01),
528 ]
529 write_expected = [
530 (0x00000000, 0x00001111, 0b0010),
531 (0x00000000, 0x22220000, 0b1100),
532 (0x00000004, 0x33330000, 0b1100),
533 (0x00000004, 0x00004444, 0b0001),
534 (0x00000100, 0x55550000, 0b0100),
535 ]
536 self.converter_test(width_from=16, width_to=32,
537 write_pattern=write_pattern, write_expected=write_expected)
538
539 # TestAXILiteInterconnet ---------------------------------------------------------------------------
540
541 class TestAXILiteInterconnect(unittest.TestCase):
542 def test_interconnect_p2p(self):
543 class DUT(Module):
544 def __init__(self):
545 self.master = master = AXILiteInterface()
546 self.slave = slave = AXILiteInterface()
547 self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave)
548
549 pattern = [
550 ("w", 0x00000004, 0x11111111),
551 ("w", 0x0000000c, 0x22222222),
552 ("r", 0x00000010, 0x33333333),
553 ("r", 0x00000018, 0x44444444),
554 ]
555
556 def rdata_generator(adr):
557 for rw, a, v in pattern:
558 if rw == "r" and a == adr:
559 return v
560 return 0xbaadc0de
561
562 dut = DUT()
563 checker = AXILiteChecker(rdata_generator=rdata_generator)
564 generators = [
565 AXILitePatternGenerator(dut.master, pattern).handler(),
566 checker.handler(dut.slave),
567 ]
568 run_simulation(dut, generators)
569 self.assertEqual(checker.writes, [(addr, data, 0b1111) for rw, addr, data in pattern if rw == "w"])
570 self.assertEqual(checker.reads, [(addr, data) for rw, addr, data in pattern if rw == "r"])
571
572 def test_timeout(self):
573 class DUT(Module):
574 def __init__(self):
575 self.master = master = AXILiteInterface()
576 self.slave = slave = AXILiteInterface()
577 self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave)
578 self.submodules.timeout = AXILiteTimeout(master, 16)
579
580 def generator(axi_lite):
581 resp = (yield from axi_lite.write(0x00001000, 0x11111111))
582 self.assertEqual(resp, RESP_OKAY)
583 resp = (yield from axi_lite.write(0x00002000, 0x22222222))
584 self.assertEqual(resp, RESP_SLVERR)
585 data, resp = (yield from axi_lite.read(0x00003000))
586 self.assertEqual(resp, RESP_SLVERR)
587 self.assertEqual(data, 0xffffffff)
588 yield
589
590 def checker(axi_lite):
591 for _ in range(16):
592 yield
593 yield axi_lite.aw.ready.eq(1)
594 yield axi_lite.w.ready.eq(1)
595 yield
596 yield axi_lite.aw.ready.eq(0)
597 yield axi_lite.w.ready.eq(0)
598 yield axi_lite.b.valid.eq(1)
599 yield
600 while not (yield axi_lite.b.ready):
601 yield
602 yield axi_lite.b.valid.eq(0)
603
604 dut = DUT()
605 generators = [
606 generator(dut.master),
607 checker(dut.slave),
608 timeout_generator(300),
609 ]
610 run_simulation(dut, generators)
611
612 def test_arbiter_order(self):
613 class DUT(Module):
614 def __init__(self, n_masters):
615 self.masters = [AXILiteInterface() for _ in range(n_masters)]
616 self.slave = AXILiteInterface()
617 self.submodules.arbiter = AXILiteArbiter(self.masters, self.slave)
618
619 def generator(n, axi_lite, delay=0):
620 def gen(i):
621 return 100*n + i
622
623 for i in range(4):
624 resp = (yield from axi_lite.write(gen(i), gen(i)))
625 self.assertEqual(resp, RESP_OKAY)
626 for _ in range(delay):
627 yield
628 for i in range(4):
629 data, resp = (yield from axi_lite.read(gen(i)))
630 self.assertEqual(resp, RESP_OKAY)
631 for _ in range(delay):
632 yield
633 for _ in range(8):
634 yield
635
636 n_masters = 3
637
638 # with no delay each master will do all transfers at once
639 with self.subTest(delay=0):
640 dut = DUT(n_masters)
641 checker = AXILiteChecker()
642 generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
643 generators += [timeout_generator(300), checker.handler(dut.slave)]
644 run_simulation(dut, generators)
645 order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
646 self.assertEqual([addr for addr, data, strb in checker.writes], order)
647 self.assertEqual([addr for addr, data in checker.reads], order)
648
649 # with some delay, the round-robin arbiter will iterate over masters
650 with self.subTest(delay=1):
651 dut = DUT(n_masters)
652 checker = AXILiteChecker()
653 generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
654 generators += [timeout_generator(300), checker.handler(dut.slave)]
655 run_simulation(dut, generators)
656 order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
657 self.assertEqual([addr for addr, data, strb in checker.writes], order)
658 self.assertEqual([addr for addr, data in checker.reads], order)
659
660 def test_arbiter_holds_grant_until_response(self):
661 class DUT(Module):
662 def __init__(self, n_masters):
663 self.masters = [AXILiteInterface() for _ in range(n_masters)]
664 self.slave = AXILiteInterface()
665 self.submodules.arbiter = AXILiteArbiter(self.masters, self.slave)
666
667 def generator(n, axi_lite, delay=0):
668 def gen(i):
669 return 100*n + i
670
671 for i in range(4):
672 resp = (yield from axi_lite.write(gen(i), gen(i)))
673 self.assertEqual(resp, RESP_OKAY)
674 for _ in range(delay):
675 yield
676 for i in range(4):
677 data, resp = (yield from axi_lite.read(gen(i)))
678 self.assertEqual(resp, RESP_OKAY)
679 for _ in range(delay):
680 yield
681 for _ in range(8):
682 yield
683
684 n_masters = 3
685
686 # with no delay each master will do all transfers at once
687 with self.subTest(delay=0):
688 dut = DUT(n_masters)
689 checker = AXILiteChecker(response_latency=lambda: 3)
690 generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
691 generators += [timeout_generator(300), checker.handler(dut.slave)]
692 run_simulation(dut, generators)
693 order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
694 self.assertEqual([addr for addr, data, strb in checker.writes], order)
695 self.assertEqual([addr for addr, data in checker.reads], order)
696
697 # with some delay, the round-robin arbiter will iterate over masters
698 with self.subTest(delay=1):
699 dut = DUT(n_masters)
700 checker = AXILiteChecker(response_latency=lambda: 3)
701 generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
702 generators += [timeout_generator(300), checker.handler(dut.slave)]
703 run_simulation(dut, generators)
704 order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
705 self.assertEqual([addr for addr, data, strb in checker.writes], order)
706 self.assertEqual([addr for addr, data in checker.reads], order)
707
708 def address_decoder(self, i, size=0x100, python=False):
709 # bytes to 32-bit words aligned
710 _size = (size) >> 2
711 _origin = (size * i) >> 2
712 if python: # for python integers
713 shift = log2_int(_size)
714 return lambda a: ((a >> shift) == (_origin >> shift))
715 # for migen signals
716 return lambda a: (a[log2_int(_size):] == (_origin >> log2_int(_size)))
717
718 def decoder_test(self, n_slaves, pattern, generator_delay=0):
719 class DUT(Module):
720 def __init__(self, decoders):
721 self.master = AXILiteInterface()
722 self.slaves = [AXILiteInterface() for _ in range(len(decoders))]
723 slaves = list(zip(decoders, self.slaves))
724 self.submodules.decoder = AXILiteDecoder(self.master, slaves)
725
726 def rdata_generator(adr):
727 for rw, a, v in pattern:
728 if rw == "r" and a == adr:
729 return v
730 return 0xbaadc0de
731
732 dut = DUT([self.address_decoder(i) for i in range(n_slaves)])
733 checkers = [AXILiteChecker(rdata_generator=rdata_generator) for _ in dut.slaves]
734
735 generators = [AXILitePatternGenerator(dut.master, pattern, delay=generator_delay).handler()]
736 generators += [checker.handler(slave) for (slave, checker) in zip(dut.slaves, checkers)]
737 generators += [timeout_generator(300)]
738 run_simulation(dut, generators)
739
740 return checkers
741
742 def test_decoder_write(self):
743 for delay in [0, 1, 0]:
744 with self.subTest(delay=delay):
745 slaves = self.decoder_test(n_slaves=3, pattern=[
746 ("w", 0x010, 1),
747 ("w", 0x110, 2),
748 ("w", 0x210, 3),
749 ("w", 0x011, 1),
750 ("w", 0x012, 1),
751 ("w", 0x111, 2),
752 ("w", 0x112, 2),
753 ("w", 0x211, 3),
754 ("w", 0x212, 3),
755 ], generator_delay=delay)
756
757 def addr(checker_list):
758 return [entry[0] for entry in checker_list]
759
760 self.assertEqual(addr(slaves[0].writes), [0x010, 0x011, 0x012])
761 self.assertEqual(addr(slaves[1].writes), [0x110, 0x111, 0x112])
762 self.assertEqual(addr(slaves[2].writes), [0x210, 0x211, 0x212])
763 for slave in slaves:
764 self.assertEqual(slave.reads, [])
765
766 def test_decoder_read(self):
767 for delay in [0, 1]:
768 with self.subTest(delay=delay):
769 slaves = self.decoder_test(n_slaves=3, pattern=[
770 ("r", 0x010, 1),
771 ("r", 0x110, 2),
772 ("r", 0x210, 3),
773 ("r", 0x011, 1),
774 ("r", 0x012, 1),
775 ("r", 0x111, 2),
776 ("r", 0x112, 2),
777 ("r", 0x211, 3),
778 ("r", 0x212, 3),
779 ], generator_delay=delay)
780
781 def addr(checker_list):
782 return [entry[0] for entry in checker_list]
783
784 self.assertEqual(addr(slaves[0].reads), [0x010, 0x011, 0x012])
785 self.assertEqual(addr(slaves[1].reads), [0x110, 0x111, 0x112])
786 self.assertEqual(addr(slaves[2].reads), [0x210, 0x211, 0x212])
787 for slave in slaves:
788 self.assertEqual(slave.writes, [])
789
790 def test_decoder_read_write(self):
791 for delay in [0, 1]:
792 with self.subTest(delay=delay):
793 slaves = self.decoder_test(n_slaves=3, pattern=[
794 ("w", 0x010, 1),
795 ("w", 0x110, 2),
796 ("r", 0x111, 2),
797 ("r", 0x011, 1),
798 ("r", 0x211, 3),
799 ("w", 0x210, 3),
800 ], generator_delay=delay)
801
802 def addr(checker_list):
803 return [entry[0] for entry in checker_list]
804
805 self.assertEqual(addr(slaves[0].writes), [0x010])
806 self.assertEqual(addr(slaves[0].reads), [0x011])
807 self.assertEqual(addr(slaves[1].writes), [0x110])
808 self.assertEqual(addr(slaves[1].reads), [0x111])
809 self.assertEqual(addr(slaves[2].writes), [0x210])
810 self.assertEqual(addr(slaves[2].reads), [0x211])
811
812 def test_decoder_stall(self):
813 with self.assertRaises(TimeoutError):
814 self.decoder_test(n_slaves=3, pattern=[
815 ("w", 0x300, 1),
816 ])
817 with self.assertRaises(TimeoutError):
818 self.decoder_test(n_slaves=3, pattern=[
819 ("r", 0x300, 1),
820 ])
821
822 def interconnect_test(self, master_patterns, slave_decoders,
823 master_delay=0, slave_ready_latency=0, slave_response_latency=0,
824 disconnected_slaves=None, timeout=300, interconnect=AXILiteInterconnectShared,
825 **kwargs):
826 # number of masters/slaves is defined by the number of patterns/decoders
827 # master_patterns: list of patterns per master, pattern = list(tuple(rw, addr, data))
828 # slave_decoders: list of address decoders per slave
829 # delay/latency: control the speed of masters/slaves
830 # disconnected_slaves: list of slave numbers that shouldn't respond to any transactions
831 class DUT(Module):
832 def __init__(self, n_masters, decoders, **kwargs):
833 self.masters = [AXILiteInterface(name="master") for _ in range(n_masters)]
834 self.slaves = [AXILiteInterface(name="slave") for _ in range(len(decoders))]
835 slaves = list(zip(decoders, self.slaves))
836 self.submodules.interconnect = interconnect(self.masters, slaves, **kwargs)
837
838 class ReadDataGenerator:
839 # Generates data based on decoded addresses and data defined in master_patterns
840 def __init__(self, patterns):
841 self.mem = {}
842 for pattern in patterns:
843 for rw, addr, val in pattern:
844 if rw == "r":
845 assert addr not in self.mem
846 self.mem[addr] = val
847
848 def getter(self, n):
849 # on miss will give default data depending on slave n
850 return lambda addr: self.mem.get(addr, 0xbaad0000 + n)
851
852 def new_checker(rdata_generator):
853 return AXILiteChecker(ready_latency=slave_ready_latency,
854 response_latency=slave_response_latency,
855 rdata_generator=rdata_generator)
856
857 # perpare test
858 dut = DUT(len(master_patterns), slave_decoders, **kwargs)
859 rdata_generator = ReadDataGenerator(master_patterns)
860 checkers = [new_checker(rdata_generator.getter(i)) for i, _ in enumerate(master_patterns)]
861 pattern_generators = [AXILitePatternGenerator(dut.masters[i], pattern, delay=master_delay)
862 for i, pattern in enumerate(master_patterns)]
863
864 # run simulator
865 generators = [gen.handler() for gen in pattern_generators]
866 generators += [checker.handler(slave)
867 for i, (slave, checker) in enumerate(zip(dut.slaves, checkers))
868 if i not in (disconnected_slaves or [])]
869 generators += [timeout_generator(timeout)]
870 run_simulation(dut, generators)
871
872 return pattern_generators, checkers
873
874 def test_interconnect_shared_basic(self):
875 master_patterns = [
876 [("w", 0x000, 0), ("w", 0x101, 0), ("w", 0x202, 0)],
877 [("w", 0x010, 0), ("w", 0x111, 0), ("w", 0x112, 0)],
878 [("w", 0x220, 0), ("w", 0x221, 0), ("w", 0x222, 0)],
879 ]
880 slave_decoders = [self.address_decoder(i) for i in range(3)]
881
882 generators, checkers = self.interconnect_test(master_patterns, slave_decoders,
883 master_delay=1)
884
885 for gen in generators:
886 self.assertEqual(gen.errors, 0)
887
888 def addr(checker_list):
889 return [entry[0] for entry in checker_list]
890
891 self.assertEqual(addr(checkers[0].writes), [0x000, 0x010])
892 self.assertEqual(addr(checkers[1].writes), [0x101, 0x111, 0x112])
893 self.assertEqual(addr(checkers[2].writes), [0x220, 0x221, 0x202, 0x222])
894 self.assertEqual(addr(checkers[0].reads), [])
895 self.assertEqual(addr(checkers[1].reads), [])
896 self.assertEqual(addr(checkers[2].reads), [])
897
898 def interconnect_stress_test(self, timeout=1000, **kwargs):
899 prng = random.Random(42)
900
901 n_masters = 3
902 n_slaves = 3
903 pattern_length = 64
904 slave_region_size = 0x10000000
905 # for testing purpose each master will access only its own region of a slave
906 master_region_size = 0x1000
907 assert n_masters*master_region_size < slave_region_size
908
909 def gen_pattern(n, length):
910 assert length < master_region_size
911 for i_access in range(length):
912 rw = "w" if prng.randint(0, 1) == 0 else "r"
913 i_slave = prng.randrange(n_slaves)
914 addr = i_slave*slave_region_size + n*master_region_size + i_access
915 data = addr
916 yield rw, addr, data
917
918 master_patterns = [list(gen_pattern(i, pattern_length)) for i in range(n_masters)]
919 slave_decoders = [self.address_decoder(i, size=slave_region_size) for i in range(n_slaves)]
920 slave_decoders_py = [self.address_decoder(i, size=slave_region_size, python=True)
921 for i in range(n_slaves)]
922
923 generators, checkers = self.interconnect_test(master_patterns, slave_decoders,
924 timeout=timeout, **kwargs)
925
926 for gen in generators:
927 read_errors = [" 0x{:08x} vs 0x{:08x}".format(v, ref) for v, ref in gen.read_errors]
928 msg = "\ngen.resp_errors = {}\ngen.read_errors = \n{}".format(
929 gen.resp_errors, "\n".join(read_errors))
930 if not kwargs.get("disconnected_slaves", None):
931 self.assertEqual(gen.errors, 0, msg=msg)
932 else: # when some slaves are disconnected we should have some errors
933 self.assertNotEqual(gen.errors, 0, msg=msg)
934
935 # make sure all the accesses at slave side are in correct address region
936 for i_slave, (checker, decoder) in enumerate(zip(checkers, slave_decoders_py)):
937 for addr in (entry[0] for entry in checker.writes + checker.reads):
938 # compensate for the fact that decoders work on word-aligned addresses
939 self.assertNotEqual(decoder(addr >> 2), 0)
940
941 def test_interconnect_shared_stress_no_delay(self):
942 self.interconnect_stress_test(timeout=1000,
943 master_delay=0,
944 slave_ready_latency=0,
945 slave_response_latency=0)
946
947 def test_interconnect_shared_stress_rand_short(self):
948 prng = random.Random(42)
949 rand = lambda: prng.randrange(4)
950 self.interconnect_stress_test(timeout=2000,
951 master_delay=rand,
952 slave_ready_latency=rand,
953 slave_response_latency=rand)
954
955 def test_interconnect_shared_stress_rand_long(self):
956 prng = random.Random(42)
957 rand = lambda: prng.randrange(16)
958 self.interconnect_stress_test(timeout=4000,
959 master_delay=rand,
960 slave_ready_latency=rand,
961 slave_response_latency=rand)
962
963 def test_interconnect_shared_stress_timeout(self):
964 self.interconnect_stress_test(timeout=4000,
965 disconnected_slaves=[1],
966 timeout_cycles=50)
967
968 def test_crossbar_stress_no_delay(self):
969 self.interconnect_stress_test(timeout=1000,
970 master_delay=0,
971 slave_ready_latency=0,
972 slave_response_latency=0,
973 interconnect=AXILiteCrossbar)
974
975 def test_crossbar_stress_rand(self):
976 prng = random.Random(42)
977 rand = lambda: prng.randrange(4)
978 self.interconnect_stress_test(timeout=2000,
979 master_delay=rand,
980 slave_ready_latency=rand,
981 slave_response_latency=rand,
982 interconnect=AXILiteCrossbar)