create SIMD pipe multiply unit test
authorLuke Kenneth Casson Leighton <lkcl@lkcl.net>
Wed, 28 Aug 2019 06:17:21 +0000 (07:17 +0100)
committerLuke Kenneth Casson Leighton <lkcl@lkcl.net>
Wed, 28 Aug 2019 06:17:21 +0000 (07:17 +0100)
src/ieee754/part_mul_add/test/test_mul_pipe.py [new file with mode: 0644]
src/ieee754/part_mul_add/test/test_multiply.py

diff --git a/src/ieee754/part_mul_add/test/test_mul_pipe.py b/src/ieee754/part_mul_add/test/test_mul_pipe.py
new file mode 100644 (file)
index 0000000..38904e1
--- /dev/null
@@ -0,0 +1,284 @@
+""" Unit tests for Buffered and Unbuffered pipelines
+
+    contains useful worked examples of how to use the Pipeline API,
+    including:
+
+    * Combinatorial Stage "Chaining"
+    * class-based data stages
+    * nmigen module-based data stages
+    * special nmigen module-based data stage, where the stage *is* the module
+    * Record-based data stages
+    * static-class data stages
+    * multi-stage pipelines (and how to connect them)
+    * how to *use* the pipelines (see Test5) - how to get data in and out
+
+"""
+
+from nmigen import Module, Signal, Mux, Const, Elaboratable
+from nmigen.hdl.rec import Record
+from nmigen.compat.sim import run_simulation
+from nmigen.cli import verilog, rtlil
+
+from nmutil.singlepipe import ControlBase
+from nmutil.singlepipe import MaskCancellable
+
+from random import randint, seed
+
+#seed(4)
+
+
+from ieee754.part_mul_add.test.test_multiply import SIMDMulLane, simd_mul
+from ieee754.part_mul_add.mul_pipe import MulPipe_8_16_32_64, InputData
+from ieee754.part_mul_add.multiply import OP_MUL_LOW, PartitionPoints
+
+
+def resultfn_3(data_o, expected, i, o):
+    assert data_o == expected + 1, \
+                "%d-%d data %x not match %x\n" \
+                % (i, o, data_o, expected)
+
+def data_placeholder():
+        data = []
+        for i in range(num_tests):
+            d = PlaceHolder()
+            d.src1 = randint(0, 1<<16-1)
+            d.src2 = randint(0, 1<<16-1)
+            data.append(d)
+        return data
+
+def data_dict():
+        data = []
+        for i in range(num_tests):
+            data.append({'src1': randint(0, 1<<16-1),
+                         'src2': randint(0, 1<<16-1)})
+        return data
+
+
+class Test5:
+    def __init__(self, dut, resultfn, data=None, stage_ctl=False):
+        self.dut = dut
+        self.resultfn = resultfn
+        self.stage_ctl = stage_ctl
+        if data:
+            self.data = data
+        else:
+            self.data = []
+            for i in range(num_tests):
+                self.data.append((randint(0, 1<<16-1), randint(0, 1<<16-1)))
+        self.i = 0
+        self.o = 0
+
+    def send(self):
+        while self.o != len(self.data):
+            send_range = randint(0, 3)
+            for j in range(randint(1,10)):
+                if send_range == 0:
+                    send = True
+                else:
+                    send = randint(0, send_range) != 0
+                #send = True
+                o_p_ready = yield self.dut.p.ready_o
+                if not o_p_ready:
+                    yield
+                    continue
+                if send and self.i != len(self.data):
+                    yield self.dut.p.valid_i.eq(1)
+                    pd = self.dut.p.data_i
+                    di = self.data[self.i]
+                    print ("send", j, hex(di.a), hex(di.b))
+                    yield pd.a.eq(di.a)
+                    yield pd.b.eq(di.b)
+                    for k in pd.part_pts.keys():
+                        yield pd.part_pts[k].eq(di.part_pts[k])
+                    for j in range(len(pd.part_ops)):
+                        yield pd.part_ops[j].eq(di.part_ops[j])
+                    self.i += 1
+                else:
+                    yield self.dut.p.valid_i.eq(0)
+                yield
+
+    def rcv(self):
+        while self.o != len(self.data):
+            stall_range = randint(0, 3)
+            for j in range(randint(1,10)):
+                ready = randint(0, stall_range) != 0
+                #ready = True
+                yield self.dut.n.ready_i.eq(ready)
+                yield
+                o_n_valid = yield self.dut.n.valid_o
+                i_n_ready = yield self.dut.n.ready_i_test
+                if not o_n_valid or not i_n_ready:
+                    continue
+                data_o = yield self.dut.n.data_o.output
+                print ("rcv", j, hex(data_o))
+                self.resultfn(data_o, self.data[self.o], self.i, self.o)
+                self.o += 1
+                if self.o == len(self.data):
+                    break
+
+
+def resultfn_5(data_o, expected, i, o):
+    res = expected[0] + expected[1]
+    assert data_o == res, \
+                "%d-%d data %x not match %s\n" \
+                % (i, o, data_o, repr(expected))
+
+
+class ExampleAddRecordPlaceHolderStage:
+    """ example use of a Record, with a placeholder as the processing result
+    """
+
+    record_spec = [('src1', 16), ('src2', 16)]
+    def ispec(self):
+        """ returns a Record using the specification
+        """
+        return Record(self.record_spec)
+
+    def ospec(self):
+        return Record(self.record_spec)
+
+    def process(self, i):
+        """ process the input data, returning a PlaceHolder class instance
+            with attributes that exactly match those of the Record.
+        """
+        o = PlaceHolder()
+        o.src1 = i.src1 + 1
+        o.src2 = i.src2 + 1
+        return o
+
+
+# a dummy class that may have stuff assigned to instances once created
+class PlaceHolder: pass
+
+
+######################################################################
+# Test 8
+######################################################################
+
+
+class TestInputMul:
+    """ the eq function, called by set_input, needs an incoming object
+        that conforms to the Example2OpClass.eq function requirements
+        easiest way to do that is to create a class that has the exact
+        same member layout (self.op1, self.op2) as Example2OpClass
+    """
+    def __init__(self, a, b):
+
+        self.a = a
+        self.b = b
+
+        self.part_pts = PartitionPoints()
+        for i in range(8, 64, 8):
+            self.part_pts[i] = False
+
+        # set to 16-bit partitions
+        for i in range(16, 64, 16):
+            self.part_pts[i] = True
+
+        self.part_ops = [Signal() for i in range(8)]
+
+def simd_calc_result(a, b):
+    lanes = [SIMDMulLane(False,
+                         False,
+                         16,
+                         False)]*4
+    so, si = simd_mul(a, b, lanes)
+    return so
+
+def resultfn_8(data_o, expected, i, o):
+    res = simd_calc_result(expected.a, expected.b)
+    assert data_o == res, \
+                "%d-%d data %x res %x not match %s\n" \
+                % (i, o, data_o, res, repr(expected))
+
+def data_2op():
+    data = []
+    for i in range(num_tests):
+        a = randint(0, 1<<64-1)
+        b = randint(0, 1<<64-1)
+        data.append(TestInputMul(a, b))
+    return data
+
+
+
+
+######################################################################
+# Unit Tests
+######################################################################
+
+num_tests = 10
+
+def test0():
+    maskwid = num_tests
+    print ("test 0")
+    dut = MaskCancellablePipe(maskwid)
+    data = data_chain0(maskwid)
+    test = TestMask(dut, resultfn_0, maskwid, data=data)
+    run_simulation(dut, [test.send, test.rcv],
+                        vcd_name="test_maskchain0.vcd")
+
+
+def test8():
+    print ("test 8")
+    dut = MulPipe_8_16_32_64()
+    ports = [dut.p.valid_i, dut.n.ready_i,
+             dut.n.valid_o, dut.p.ready_o,
+             dut.a, dut.b, dut.output]
+    #vl = rtlil.convert(dut, ports=ports)
+    #with open("test_mul_pipe_8_16_32_64.il", "w") as f:
+    #    f.write(vl)
+    data=data_2op()
+    test = Test5(dut, resultfn_8, data=data)
+    run_simulation(dut, [test.send, test.rcv],
+                   vcd_name="test_mul_pipe_8_16_32_64.vcd")
+
+
+def test_simd_mul():
+    lanes = [SIMDMulLane(True,
+                         True,
+                         8,
+                         True),
+             SIMDMulLane(False,
+                         False,
+                         8,
+                         True),
+                 SIMDMulLane(True,
+                             True,
+                             16,
+                             False),
+                 SIMDMulLane(True,
+                             False,
+                             32,
+                             True)]
+    a = 0x0123456789ABCDEF
+    b = 0xFEDCBA9876543210
+    output = 0x0121FA00FE1C28FE
+    intermediate_output = 0x0121FA0023E20B28C94DFE1C280AFEF0
+    so, si = simd_mul(a, b, lanes)
+    print (hex(so), hex(si))
+    print (hex(output), hex(intermediate_output))
+
+def test_simd_mul1():
+    lanes = [SIMDMulLane(True,
+                         True,
+                         8,
+                         False),
+             SIMDMulLane(False,
+                         False,
+                         8,
+                         True),
+             ]
+    a = 0x1217
+    b = 0x5925
+    #output = 0x0121FA00FE1C28FE
+    #intermediate_output = 0x0121FA0023E20B28C94DFE1C280AFEF0
+    so, si = simd_mul(a, b, lanes)
+    print (hex(so), hex(si))
+    #print (hex(output), hex(intermediate_output))
+
+
+if __name__ == '__main__':
+    test8()
+    #test0_1()
+    #test_simd_mul()
+    #test_simd_mul1()
index 61043ab552779d94861353407751117fcc5e5bf6..e1120fd708126be391789c2779890c5f902fc32a 100644 (file)
@@ -334,11 +334,7 @@ class TestAddReduce(unittest.TestCase):
 
 
 class SIMDMulLane:
-    def __init__(self,
-                 a_signed: bool,
-                 b_signed: bool,
-                 bit_width: int,
-                 high_half: bool):
+    def __init__(self, a_signed, b_signed, bit_width, high_half):
         self.a_signed = a_signed
         self.b_signed = b_signed
         self.bit_width = bit_width
@@ -349,32 +345,32 @@ class SIMDMulLane:
             f"{self.bit_width}, {self.high_half})"
 
 
+def simd_mul(a, b, lanes):
+    output = 0
+    intermediate_output = 0
+    shift = 0
+    for lane in lanes:
+        a_signed = lane.a_signed or not lane.high_half
+        b_signed = lane.b_signed or not lane.high_half
+        mask = (1 << lane.bit_width) - 1
+        sign_bit = 1 << (lane.bit_width - 1)
+        a_part = (a >> shift) & mask
+        if a_signed and (a_part & sign_bit) != 0:
+            a_part -= 1 << lane.bit_width
+        b_part = (b >> shift) & mask
+        if b_signed and (b_part & sign_bit) != 0:
+            b_part -= 1 << lane.bit_width
+        value = a_part * b_part
+        value &= (1 << (lane.bit_width * 2)) - 1
+        intermediate_output |= value << (shift * 2)
+        if lane.high_half:
+            value >>= lane.bit_width
+        value &= mask
+        output |= value << shift
+        shift += lane.bit_width
+    return output, intermediate_output
+
 class TestMul8_16_32_64(unittest.TestCase):
-    @staticmethod
-    def simd_mul(a: int, b: int, lanes: List[SIMDMulLane]) -> Tuple[int, int]:
-        output = 0
-        intermediate_output = 0
-        shift = 0
-        for lane in lanes:
-            a_signed = lane.a_signed or not lane.high_half
-            b_signed = lane.b_signed or not lane.high_half
-            mask = (1 << lane.bit_width) - 1
-            sign_bit = 1 << (lane.bit_width - 1)
-            a_part = (a >> shift) & mask
-            if a_signed and (a_part & sign_bit) != 0:
-                a_part -= 1 << lane.bit_width
-            b_part = (b >> shift) & mask
-            if b_signed and (b_part & sign_bit) != 0:
-                b_part -= 1 << lane.bit_width
-            value = a_part * b_part
-            value &= (1 << (lane.bit_width * 2)) - 1
-            intermediate_output |= value << (shift * 2)
-            if lane.high_half:
-                value >>= lane.bit_width
-            value &= mask
-            output |= value << shift
-            shift += lane.bit_width
-        return output, intermediate_output
 
     @staticmethod
     def get_tst_cases(lanes: List[SIMDMulLane],
@@ -419,13 +415,13 @@ class TestMul8_16_32_64(unittest.TestCase):
         b = 0xFEDCBA9876543210
         output = 0x0121FA00FE1C28FE
         intermediate_output = 0x0121FA0023E20B28C94DFE1C280AFEF0
-        self.assertEqual(self.simd_mul(a, b, lanes),
+        self.assertEqual(simd_mul(a, b, lanes),
                          (output, intermediate_output))
         a = 0x8123456789ABCDEF
         b = 0xFEDCBA9876543210
         output = 0x81B39CB4FE1C28FE
         intermediate_output = 0x81B39CB423E20B28C94DFE1C280AFEF0
-        self.assertEqual(self.simd_mul(a, b, lanes),
+        self.assertEqual(simd_mul(a, b, lanes),
                          (output, intermediate_output))
 
     def test_signed_mul_from_unsigned(self):
@@ -458,7 +454,7 @@ class TestMul8_16_32_64(unittest.TestCase):
         if gen_or_check == GenOrCheck.Generate:
             yield module.a.eq(a)
             yield module.b.eq(b)
-        output2, intermediate_output2 = self.simd_mul(a, b, lanes)
+        output2, intermediate_output2 = simd_mul(a, b, lanes)
         yield Delay(0.1e-6)
         if gen_or_check == GenOrCheck.Check:
             intermediate_output = (yield module.intermediate_output)