1 # This file is Copyright (c) 2019 Florent Kermarrec <florent@enjoy-digital.fr>
9 from litex
.soc
.interconnect
.stream
import *
10 from litex
.soc
.interconnect
.packet
import *
12 packet_header_length
= 31
13 packet_header_fields
= {
14 "field_8b" : HeaderField(0, 0, 8),
15 "field_16b" : HeaderField(1, 0, 16),
16 "field_32b" : HeaderField(3, 0, 32),
17 "field_64b" : HeaderField(7, 0, 64),
18 "field_128b": HeaderField(15, 0, 128),
20 packet_header
= Header(
21 fields
= packet_header_fields
,
22 length
= packet_header_length
,
23 swap_field_bytes
= True)
25 def packet_description(dw
):
26 param_layout
= packet_header
.get_layout()
27 payload_layout
= [("data", dw
)]
28 return EndpointDescription(payload_layout
, param_layout
)
30 def raw_description(dw
):
31 payload_layout
= [("data", dw
)]
32 return EndpointDescription(payload_layout
)
35 def __init__(self
, header
, datas
):
40 class TestPacket(unittest
.TestCase
):
41 def loopback_test(self
, dw
):
42 prng
= random
.Random(42)
46 for n
in range(npackets
):
48 header
["field_8b"] = prng
.randrange(2**8)
49 header
["field_16b"] = prng
.randrange(2**16)
50 header
["field_32b"] = prng
.randrange(2**32)
51 header
["field_64b"] = prng
.randrange(2**64)
52 header
["field_128b"] = prng
.randrange(2**128)
53 datas
= [prng
.randrange(2**dw
) for _
in range(prng
.randrange(2**7))]
54 packets
.append(Packet(header
, datas
))
56 def generator(dut
, valid_rand
=50):
58 for packet
in packets
:
59 yield dut
.sink
.field_8b
.eq(packet
.header
["field_8b"])
60 yield dut
.sink
.field_16b
.eq(packet
.header
["field_16b"])
61 yield dut
.sink
.field_32b
.eq(packet
.header
["field_32b"])
62 yield dut
.sink
.field_64b
.eq(packet
.header
["field_64b"])
63 yield dut
.sink
.field_128b
.eq(packet
.header
["field_128b"])
65 for n
, data
in enumerate(packet
.datas
):
66 yield dut
.sink
.valid
.eq(1)
67 yield dut
.sink
.last
.eq(n
== (len(packet
.datas
) - 1))
68 yield dut
.sink
.data
.eq(data
)
70 while (yield dut
.sink
.ready
) == 0:
72 yield dut
.sink
.valid
.eq(0)
73 yield dut
.sink
.last
.eq(0)
74 while prng
.randrange(100) < valid_rand
:
77 def checker(dut
, ready_rand
=50):
81 # Receive and check packets
82 for packet
in packets
:
83 for n
, data
in enumerate(packet
.datas
):
84 yield dut
.source
.ready
.eq(0)
86 while (yield dut
.source
.valid
) == 0:
88 while prng
.randrange(100) < ready_rand
:
90 yield dut
.source
.ready
.eq(1)
92 for field
in ["field_8b", "field_16b", "field_32b", "field_64b", "field_128b"]:
93 if (yield getattr(dut
.source
, field
)) != packet
.header
[field
]:
94 dut
.header_errors
+= 1
95 #print("{:x} vs {:x}".format((yield dut.source.data), data))
96 if ((yield dut
.source
.data
) != data
):
98 if ((yield dut
.source
.last
) != (n
== (len(packet
.datas
) - 1))):
104 packetizer
= Packetizer(packet_description(dw
), raw_description(dw
), packet_header
)
105 depacketizer
= Depacketizer(raw_description(dw
), packet_description(dw
), packet_header
)
106 self
.submodules
+= packetizer
, depacketizer
107 self
.comb
+= packetizer
.source
.connect(depacketizer
.sink
)
108 self
.sink
, self
.source
= packetizer
.sink
, depacketizer
.source
111 run_simulation(dut
, [generator(dut
), checker(dut
)])
112 self
.assertEqual(dut
.header_errors
, 0)
113 self
.assertEqual(dut
.data_errors
, 0)
114 self
.assertEqual(dut
.last_errors
, 0)
116 def test_8bit_loopback(self
):
117 self
.loopback_test(dw
=8)
119 def test_32bit_loopback(self
):
120 self
.loopback_test(dw
=32)
122 def test_64bit_loopback(self
):
123 self
.loopback_test(dw
=64)
125 def test_128bit_loopback(self
):
126 self
.loopback_test(dw
=128)