1 """ Unit tests for Buffered and Unbuffered pipelines
3 contains useful worked examples of how to use the Pipeline API,
6 * Combinatorial Stage "Chaining"
7 * class-based data stages
8 * nmigen module-based data stages
9 * special nmigen module-based data stage, where the stage *is* the module
10 * Record-based data stages
11 * static-class data stages
12 * multi-stage pipelines (and how to connect them)
13 * how to *use* the pipelines (see Test5) - how to get data in and out
17 from nmigen
import Module
, Signal
, Mux
, Const
, Elaboratable
18 from nmigen
.hdl
.rec
import Record
19 from nmigen
.compat
.sim
import run_simulation
20 from nmigen
.cli
import verilog
, rtlil
22 from nmutil
.singlepipe
import ControlBase
23 from nmutil
.singlepipe
import MaskCancellable
25 from random
import randint
, seed
30 from ieee754
.part_mul_add
.test
.test_multiply
import SIMDMulLane
, simd_mul
31 from ieee754
.part_mul_add
.mul_pipe
import MulPipe_8_16_32_64
, InputData
32 from ieee754
.part_mul_add
.multiply
import OP_MUL_LOW
, PartitionPoints
35 def resultfn_3(data_o
, expected
, i
, o
):
36 assert data_o
== expected
+ 1, \
37 "%d-%d data %x not match %x\n" \
38 % (i
, o
, data_o
, expected
)
40 def data_placeholder():
42 for i
in range(num_tests
):
44 d
.src1
= randint(0, 1<<16-1)
45 d
.src2
= randint(0, 1<<16-1)
51 for i
in range(num_tests
):
52 data
.append({'src1': randint(0, 1<<16-1),
53 'src2': randint(0, 1<<16-1)})
58 def __init__(self
, dut
, resultfn
, data
=None, stage_ctl
=False):
60 self
.resultfn
= resultfn
61 self
.stage_ctl
= stage_ctl
66 for i
in range(num_tests
):
67 self
.data
.append((randint(0, 1<<16-1), randint(0, 1<<16-1)))
72 while self
.o
!= len(self
.data
):
73 send_range
= randint(0, 3)
74 for j
in range(randint(1,10)):
78 send
= randint(0, send_range
) != 0
80 o_p_ready
= yield self
.dut
.p
.ready_o
84 if send
and self
.i
!= len(self
.data
):
85 yield self
.dut
.p
.valid_i
.eq(1)
86 pd
= self
.dut
.p
.data_i
87 di
= self
.data
[self
.i
]
88 print ("send", j
, hex(di
.a
), hex(di
.b
))
91 for k
in pd
.part_pts
.keys():
92 yield pd
.part_pts
[k
].eq(di
.part_pts
[k
])
93 for j
in range(len(pd
.part_ops
)):
94 yield pd
.part_ops
[j
].eq(di
.part_ops
[j
])
97 yield self
.dut
.p
.valid_i
.eq(0)
101 while self
.o
!= len(self
.data
):
102 stall_range
= randint(0, 3)
103 for j
in range(randint(1,10)):
104 ready
= randint(0, stall_range
) != 0
106 yield self
.dut
.n
.ready_i
.eq(ready
)
108 o_n_valid
= yield self
.dut
.n
.valid_o
109 i_n_ready
= yield self
.dut
.n
.ready_i_test
110 if not o_n_valid
or not i_n_ready
:
112 data_o
= yield self
.dut
.n
.data_o
.output
113 print ("rcv", j
, hex(data_o
))
114 self
.resultfn(data_o
, self
.data
[self
.o
], self
.i
, self
.o
)
116 if self
.o
== len(self
.data
):
120 def resultfn_5(data_o
, expected
, i
, o
):
121 res
= expected
[0] + expected
[1]
122 assert data_o
== res
, \
123 "%d-%d data %x not match %s\n" \
124 % (i
, o
, data_o
, repr(expected
))
127 class ExampleAddRecordPlaceHolderStage
:
128 """ example use of a Record, with a placeholder as the processing result
131 record_spec
= [('src1', 16), ('src2', 16)]
133 """ returns a Record using the specification
135 return Record(self
.record_spec
)
138 return Record(self
.record_spec
)
140 def process(self
, i
):
141 """ process the input data, returning a PlaceHolder class instance
142 with attributes that exactly match those of the Record.
150 # a dummy class that may have stuff assigned to instances once created
151 class PlaceHolder
: pass
154 ######################################################################
156 ######################################################################
160 """ the eq function, called by set_input, needs an incoming object
161 that conforms to the Example2OpClass.eq function requirements
162 easiest way to do that is to create a class that has the exact
163 same member layout (self.op1, self.op2) as Example2OpClass
165 def __init__(self
, a
, b
):
170 self
.part_pts
= PartitionPoints()
171 for i
in range(8, 64, 8):
172 self
.part_pts
[i
] = False
174 # set to 16-bit partitions
175 for i
in range(16, 64, 16):
176 self
.part_pts
[i
] = True
178 self
.part_ops
= [Signal() for i
in range(8)]
180 def simd_calc_result(a
, b
):
181 lanes
= [SIMDMulLane(False,
185 so
, si
= simd_mul(a
, b
, lanes
)
188 def resultfn_8(data_o
, expected
, i
, o
):
189 res
= simd_calc_result(expected
.a
, expected
.b
)
190 assert data_o
== res
, \
191 "%d-%d data %x res %x not match %s\n" \
192 % (i
, o
, data_o
, res
, repr(expected
))
196 for i
in range(num_tests
):
197 a
= randint(0, 1<<64-1)
198 b
= randint(0, 1<<64-1)
199 data
.append(TestInputMul(a
, b
))
205 ######################################################################
207 ######################################################################
214 dut
= MaskCancellablePipe(maskwid
)
215 data
= data_chain0(maskwid
)
216 test
= TestMask(dut
, resultfn_0
, maskwid
, data
=data
)
217 run_simulation(dut
, [test
.send
, test
.rcv
],
218 vcd_name
="test_maskchain0.vcd")
223 dut
= MulPipe_8_16_32_64()
224 ports
= [dut
.p
.valid_i
, dut
.n
.ready_i
,
225 dut
.n
.valid_o
, dut
.p
.ready_o
,
226 dut
.a
, dut
.b
, dut
.output
]
227 #vl = rtlil.convert(dut, ports=ports)
228 #with open("test_mul_pipe_8_16_32_64.il", "w") as f:
231 test
= Test5(dut
, resultfn_8
, data
=data
)
232 run_simulation(dut
, [test
.send
, test
.rcv
],
233 vcd_name
="test_mul_pipe_8_16_32_64.vcd")
237 lanes
= [SIMDMulLane(True,
253 a
= 0x0123456789ABCDEF
254 b
= 0xFEDCBA9876543210
255 output
= 0x0121FA00FE1C28FE
256 intermediate_output
= 0x0121FA0023E20B28C94DFE1C280AFEF0
257 so
, si
= simd_mul(a
, b
, lanes
)
258 print (hex(so
), hex(si
))
259 print (hex(output
), hex(intermediate_output
))
261 def test_simd_mul1():
262 lanes
= [SIMDMulLane(True,
273 #output = 0x0121FA00FE1C28FE
274 #intermediate_output = 0x0121FA0023E20B28C94DFE1C280AFEF0
275 so
, si
= simd_mul(a
, b
, lanes
)
276 print (hex(so
), hex(si
))
277 #print (hex(output), hex(intermediate_output))
280 if __name__
== '__main__':