add PartitionedSignalTester
authorJacob Lifshay <programmerjake@gmail.com>
Sat, 2 Oct 2021 02:10:55 +0000 (19:10 -0700)
committerJacob Lifshay <programmerjake@gmail.com>
Sat, 2 Oct 2021 02:10:55 +0000 (19:10 -0700)
.gitignore
src/ieee754/partitioned_signal_tester.py [new file with mode: 0644]
src/ieee754/test_partitioned_signal_tester.py [new file with mode: 0644]

index 77d4f5d07fac9f86a1828aab5f9cccb47a3dd928..f288a6dfb814d843c0f26837c809f18771797965 100644 (file)
@@ -9,3 +9,5 @@ __pycache__
 .eggs
 *.egg-info
 *.gtkw
+/formal_test_temp
+/sim_test_out
\ No newline at end of file
diff --git a/src/ieee754/partitioned_signal_tester.py b/src/ieee754/partitioned_signal_tester.py
new file mode 100644 (file)
index 0000000..9d09495
--- /dev/null
@@ -0,0 +1,443 @@
+# SPDX-License-Identifier: LGPL-3-or-later
+# See Notices.txt for copyright information
+from enum import Enum
+from typing import (Any, Callable, Dict, Generator, Iterable, List, Mapping,
+                    Optional, Sequence, Tuple, Union, final, overload)
+import shutil
+from nmigen.hdl.ast import (AnyConst, Assert, Signal, Value, ValueCastable)
+from nmigen.hdl.dsl import Module
+from nmigen.hdl.ir import Elaboratable, Fragment
+from nmigen.sim import Simulator, Delay
+from ieee754.part.partsig import PartitionedSignal, PartitionPoints
+import unittest
+import textwrap
+import subprocess
+from hashlib import sha256
+from nmigen.back import rtlil
+from nmutil.get_test_path import get_test_path, _StrPath
+
+
+_PartitionedSignalTestable = Callable[[Tuple[PartitionedSignal, ...]],
+                                      PartitionedSignal]
+
+_WidthCastable = Union["Layout", int]
+_LayoutCastable = Union["Layout", Mapping[int, Any], Iterable[int]]
+_ValueCastableType = Union[Value, int, Enum, ValueCastable]
+_FragmentLike = Union[Elaboratable, Fragment]
+
+
+def formal(test_case: unittest.TestCase, hdl: _FragmentLike, *,
+           base_path: _StrPath = "formal_test_temp"):
+    hdl = Fragment.get(hdl, platform="formal")
+    path = get_test_path(test_case, base_path)
+    shutil.rmtree(path, ignore_errors=True)
+    path.mkdir(parents=True)
+    sby_name = "config.sby"
+    sby_file = path / sby_name
+
+    sby_file.write_text(textwrap.dedent(f"""\
+    [options]
+    mode prove
+    depth 1
+    wait on
+
+    [engines]
+    smtbmc
+
+    [script]
+    read_rtlil top.il
+    prep
+
+    [file top.il]
+    {rtlil.convert(hdl)}
+    """), encoding="utf-8")
+    sby = shutil.which('sby')
+    assert sby is not None
+    with subprocess.Popen(
+        [sby, sby_name],
+        cwd=path, text=True, encoding="utf-8",
+        stdin=subprocess.DEVNULL, stdout=subprocess.PIPE
+    ) as p:
+        stdout, stderr = p.communicate()
+        if p.returncode != 0:
+            test_case.fail(f"Formal failed:\n{stdout}")
+
+
+@final
+class Layout:
+    __lane_starts_for_sizes: Dict[int, Dict[int, None]]
+    """keys are in sorted order"""
+
+    part_indexes: Tuple[int, ...]
+    """bit indexes of partition points in sorted order, always includes
+    `0` and `self.width`"""
+
+    @staticmethod
+    def cast(layout: _LayoutCastable,
+             width: Optional[_WidthCastable] = None) -> "Layout":
+        if isinstance(layout, Layout):
+            return layout
+        return Layout(layout, width)
+
+    def __init__(self,
+                 part_indexes: Union[Mapping[int, Any], Iterable[int]],
+                 width: Optional[_WidthCastable] = None):
+        part_indexes = set(part_indexes)
+        for p in part_indexes:
+            assert isinstance(p, int)
+            assert 0 <= p
+        if width is not None:
+            width = Layout.get_width(width)
+            for p in part_indexes:
+                assert p <= width
+            part_indexes.add(width)
+        part_indexes.add(0)
+        part_indexes = list(part_indexes)
+        part_indexes.sort()
+        self.part_indexes = tuple(part_indexes)
+        sizes: List[int] = []
+        for start_index in range(len(self.part_indexes)):
+            start = self.part_indexes[start_index]
+            for end in self.part_indexes[start_index + 1:]:
+                sizes.append(end - start)
+        sizes.sort()
+        # build in sorted order
+        self.__lane_starts_for_sizes = {size: {} for size in sizes}
+        for start_index in range(len(self.part_indexes)):
+            start = self.part_indexes[start_index]
+            for end in self.part_indexes[start_index + 1:]:
+                self.__lane_starts_for_sizes[end - start][start] = None
+
+    @property
+    def width(self) -> int:
+        return self.part_indexes[-1]
+
+    @property
+    def part_signal_count(self) -> int:
+        return max(len(self.part_indexes) - 2, 0)
+
+    @staticmethod
+    def get_width(width: _WidthCastable) -> int:
+        if isinstance(width, Layout):
+            width = width.width
+        assert isinstance(width, int)
+        assert width >= 0
+        return width
+
+    def partition_points_signals(self, name: Optional[str] = None,
+                                 src_loc_at: int = 0) -> PartitionPoints:
+        if name is None:
+            name = Signal(src_loc_at=1 + src_loc_at).name
+        return PartitionPoints({
+            i: Signal(name=f"{name}_{i}", src_loc_at=1 + src_loc_at)
+            for i in self.part_indexes[1:-1]
+        })
+
+    def __repr__(self) -> str:
+        return f"Layout({self.part_indexes}, width={self.width})"
+
+    def __eq__(self, o: object) -> bool:
+        if isinstance(o, Layout):
+            return self.part_indexes == o.part_indexes
+        return NotImplemented
+
+    def __hash__(self) -> int:
+        return hash(self.part_indexes)
+
+    def is_lane_valid(self, start: int, size: int) -> bool:
+        return start in self.__lane_starts_for_sizes.get(size, ())
+
+    def lane_sizes(self) -> Iterable[int]:
+        return self.__lane_starts_for_sizes.keys()
+
+    def lane_starts_for_size(self, size: int) -> Iterable[int]:
+        return self.__lane_starts_for_sizes[size].keys()
+
+    def lanes_for_size(self, size: int) -> Iterable["Lane"]:
+        for start in self.lane_starts_for_size(size):
+            yield Lane(start, size, self)
+
+    def lanes(self) -> Iterable["Lane"]:
+        for size in self.lane_sizes():
+            yield from self.lanes_for_size(size)
+
+    def is_compatible(self, other: _LayoutCastable) -> bool:
+        other = Layout.cast(other)
+        return len(self.part_indexes) == len(other.part_indexes)
+
+    def translate_lane_to(self, lane: "Lane",
+                          target_layout: _LayoutCastable) -> "Lane":
+        assert lane.layout == self
+        target_layout = Layout.cast(target_layout)
+        assert self.is_compatible(target_layout)
+        start_index = self.part_indexes.index(lane.start)
+        end_index = self.part_indexes.index(lane.end)
+        target_start = target_layout.part_indexes[start_index]
+        target_end = target_layout.part_indexes[end_index]
+        return Lane(target_start, target_end - target_start, target_layout)
+
+
+@final
+class Lane:
+    def __init__(self, start: int, size: int, layout: _LayoutCastable):
+        self.layout = Layout.cast(layout)
+        assert self.layout.is_lane_valid(start, size)
+        self.start = start
+        self.size = size
+
+    def __repr__(self) -> str:
+        return (f"Lane(start={self.start}, size={self.size}, "
+                f"layout={self.layout})")
+
+    def __eq__(self, o: object) -> bool:
+        if isinstance(o, Lane):
+            return self.start == o.start and self.size == o.size \
+                and self.layout == o.layout
+        return NotImplemented
+
+    def __hash__(self) -> int:
+        return hash((self.start, self.size, self.layout))
+
+    def as_slice(self) -> slice:
+        return slice(self.start, self.end)
+
+    @property
+    def end(self) -> int:
+        return self.start + self.size
+
+    def translate_to(self, target_layout: _LayoutCastable) -> "Lane":
+        return self.layout.translate_lane_to(self, target_layout)
+
+    @overload
+    def is_active(self, partition_points: Sequence[bool]) -> bool: ...
+
+    @overload
+    def is_active(self, partition_points: Sequence[_ValueCastableType]
+                  ) -> Union[Value, bool]: ...
+
+    @overload
+    def is_active(self, partition_points: Mapping[int, bool]) -> bool: ...
+
+    @overload
+    def is_active(self, partition_points: Mapping[int, _ValueCastableType]
+                  ) -> Union[Value, bool]: ...
+
+    def is_active(self, partition_points):
+        def get_partition_point(index: int, invert: bool):
+            if index == 0 or index == len(self.layout.part_indexes) - 1:
+                return True
+            if isinstance(partition_points, Sequence):
+                retval = partition_points[index]
+            else:
+                retval = partition_points[self.layout.part_indexes[index]]
+            if isinstance(retval, bool):
+                if invert:
+                    return not retval
+                return retval
+            retval = Value.cast(retval)
+            if invert:
+                return ~retval
+            return retval
+        start_index = self.layout.part_indexes.index(self.start)
+        end_index = self.layout.part_indexes.index(self.end)
+        retval = get_partition_point(start_index, False) \
+            & get_partition_point(end_index, False)
+        for i in range(start_index + 1, end_index):
+            retval &= get_partition_point(i, True)
+        return retval
+
+
+_PartitionedSignalTestReference = Callable[[Lane, Tuple[Value, ...]],
+                                           _ValueCastableType]
+
+_PartitionedSignalTestCasePartMode = Tuple[bool, ...]
+_PartitionedSignalTestCaseInputs = Tuple[int, ...]
+_PartitionedSignalTestCase = Tuple[_PartitionedSignalTestCasePartMode,
+                                   _PartitionedSignalTestCaseInputs]
+
+
+class PartitionedSignalTester:
+    layouts: List[Layout]
+    inputs: List[PartitionedSignal]
+
+    def __init__(self,
+                 m: Module,
+                 operation: _PartitionedSignalTestable,
+                 reference: _PartitionedSignalTestReference,
+                 *layouts: _LayoutCastable,
+                 src_loc_at: int = 0,
+                 additional_case_count: int = 30,
+                 special_cases: Iterable[_PartitionedSignalTestCase] = (),
+                 seed: str = ""):
+        self.m = m
+        self.operation = operation
+        self.reference = reference
+        self.layouts = []
+        self.inputs = []
+        for layout in layouts:
+            layout = Layout.cast(layout)
+            if len(self.layouts) > 0:
+                assert self.layouts[0].is_compatible(layout)
+            self.layouts.append(layout)
+            name = f"input_{len(self.inputs)}"
+            ps = PartitionedSignal(
+                layout.partition_points_signals(name=name,
+                                                src_loc_at=1 + src_loc_at),
+                layout.width,
+                name=name)
+            ps.set_module(m)
+            self.inputs.append(ps)
+        assert len(self.layouts) != 0, "must have at least one input layout"
+        for i in range(1, len(self.inputs)):
+            for j in range(1, len(self.layouts[0].part_indexes) - 1):
+                lhs_part_point = self.layouts[i].part_indexes[j]
+                rhs_part_point = self.layouts[0].part_indexes[j]
+                lhs = self.inputs[i].partpoints[lhs_part_point]
+                rhs = self.inputs[0].partpoints[rhs_part_point]
+                m.d.comb += lhs.eq(rhs)
+        self.special_cases = list(special_cases)
+        self.case_count = additional_case_count + len(self.special_cases)
+        self.seed = seed
+        self.case_number = Signal(64)
+        self.test_output = operation(tuple(self.inputs))
+        assert isinstance(self.test_output, PartitionedSignal)
+        self.test_output_layout = Layout(
+            self.test_output.partpoints, self.test_output.sig.width)
+        assert self.test_output_layout.is_compatible(self.layouts[0])
+        self.reference_output_values = {
+            lane: Value.cast(reference(lane, tuple(
+                inp.sig[lane.translate_to(layout).as_slice()]
+                for inp, layout in zip(self.inputs, self.layouts))))
+            for lane in self.layouts[0].lanes()
+        }
+        self.reference_outputs = {
+            lane: Signal(value.shape(),
+                         name=f"reference_output_{lane.start}_{lane.size}")
+            for lane, value in self.reference_output_values.items()
+        }
+        for lane, value in self.reference_output_values.items():
+            m.d.comb += self.reference_outputs[lane].eq(value)
+
+    def __hash_256(self, v: str) -> int:
+        return int.from_bytes(
+            sha256(bytes(self.seed + v, encoding='utf-8')).digest(),
+            byteorder='little'
+        )
+
+    def __hash(self, v: str, bits: int) -> int:
+        retval = 0
+        for i in range(0, bits, 256):
+            retval <<= 256
+            retval |= self.__hash_256(f" {v} {i}")
+        return retval & ((1 << bits) - 1)
+
+    def __get_case(self, case_number: int) -> _PartitionedSignalTestCase:
+        if case_number < len(self.special_cases):
+            return self.special_cases[case_number]
+        trial = 0
+        bits = self.__hash(f"{case_number} trial {trial}",
+                           self.layouts[0].part_signal_count)
+        bits |= 1 | (1 << len(self.layouts[0].part_indexes)) | (bits << 1)
+        part_starts = tuple(
+            (bits & (1 << i)) != 0
+            for i in range(len(self.layouts[0].part_indexes)))
+        inputs = tuple(self.__hash(f"{case_number} input {i}",
+                                   self.layouts[i].width)
+                       for i in range(len(self.layouts)))
+        return part_starts, inputs
+
+    def __format_case(self, case: _PartitionedSignalTestCase) -> str:
+        part_starts, inputs = case
+        str_inputs = [hex(i) for i in inputs]
+        return f"part_starts={part_starts}, inputs={str_inputs}"
+
+    def __setup_case(self, case_number: int,
+                     case: Optional[_PartitionedSignalTestCase] = None
+                     ) -> Generator[Any, int, None]:
+        if case is None:
+            case = self.__get_case(case_number)
+        yield self.case_number.eq(case_number)
+        part_starts, inputs = case
+        part_indexes = self.layouts[0].part_indexes
+        assert len(part_starts) == len(part_indexes)
+        for i in range(1, len(part_starts) - 1):
+            yield self.inputs[0].partpoints[part_indexes[i]].eq(part_starts[i])
+        for i in range(len(self.inputs)):
+            yield self.inputs[i].sig.eq(inputs[i])
+
+    def run_sim(self, test_case: unittest.TestCase, *,
+                engine: Optional[str] = None,
+                base_path: _StrPath = "sim_test_out"):
+        if engine is None:
+            sim = Simulator(self.m)
+        else:
+            sim = Simulator(self.m, engine=engine)
+
+        def check_active_lane(lane: Lane):
+            reference = yield self.reference_outputs[lane]
+            output = yield self.test_output.sig[
+                lane.translate_to(self.test_output_layout).as_slice()]
+            test_case.assertEqual(hex(reference), hex(output))
+
+        def check_case(case: _PartitionedSignalTestCase):
+            part_starts, inputs = case
+            for i in range(1, len(self.layouts[0].part_indexes) - 1):
+                part_point = yield self.test_output.partpoints[
+                    self.test_output_layout.part_indexes[i]]
+                test_case.assertEqual(part_point, part_starts[i])
+            for lane in self.layouts[0].lanes():
+                with test_case.subTest(lane=lane):
+                    active = lane.is_active(part_starts)
+                    if active:
+                        yield from check_active_lane(lane)
+
+        def process():
+            for case_number in range(self.case_count):
+                with test_case.subTest(case_number=str(case_number)):
+                    case = self.__get_case(case_number)
+                    with test_case.subTest(case=self.__format_case(case)):
+                        yield from self.__setup_case(case_number, case)
+                        yield Delay(1e-6)
+                        yield from check_case(case)
+        sim.add_process(process)
+        path = get_test_path(test_case, base_path)
+        path.parent.mkdir(parents=True, exist_ok=True)
+        vcd_path = path.with_suffix(".vcd")
+        gtkw_path = path.with_suffix(".gtkw")
+        traces = [self.case_number]
+        for i in self.layouts[0].part_indexes[1:-1]:
+            traces.append(self.inputs[0].partpoints[i])
+        for inp in self.inputs:
+            traces.append(inp.sig)
+        traces.extend(self.reference_outputs.values())
+        traces.append(self.test_output.sig)
+        with sim.write_vcd(vcd_path.open("wt", encoding="utf-8"),
+                           gtkw_path.open("wt", encoding="utf-8"),
+                           traces=traces):
+            sim.run()
+
+    def run_formal(self, test_case: unittest.TestCase, **kwargs):
+        for part_point in self.inputs[0].partpoints.values():
+            self.m.d.comb += part_point.eq(AnyConst(1))
+        for i in range(len(self.inputs)):
+            s = self.inputs[i].sig
+            self.m.d.comb += s.eq(AnyConst(s.shape()))
+        for i in range(1, len(self.layouts[0].part_indexes) - 1):
+            in_part_point = self.inputs[0].partpoints[
+                self.layouts[0].part_indexes[i]]
+            out_part_point = self.test_output.partpoints[
+                self.test_output_layout.part_indexes[i]]
+            self.m.d.comb += Assert(in_part_point == out_part_point)
+
+        def check_active_lane(lane: Lane) -> Assert:
+            reference = self.reference_outputs[lane]
+            output = self.test_output.sig[
+                lane.translate_to(self.test_output_layout).as_slice()]
+            yield Assert(reference == output)
+
+        for lane in self.layouts[0].lanes():
+            with test_case.subTest(lane=lane):
+                a = check_active_lane(lane)
+                with self.m.If(lane.is_active(self.inputs[0].partpoints)):
+                    self.m.d.comb += a
+        formal(test_case, self.m, **kwargs)
diff --git a/src/ieee754/test_partitioned_signal_tester.py b/src/ieee754/test_partitioned_signal_tester.py
new file mode 100644 (file)
index 0000000..040f45a
--- /dev/null
@@ -0,0 +1,223 @@
+# SPDX-License-Identifier: LGPL-3-or-later
+# See Notices.txt for copyright information
+
+from typing import Set, Tuple
+from nmigen.hdl.ast import AnyConst, Assert, Assume, Signal
+from nmigen.hdl.dsl import Module
+from ieee754.partitioned_signal_tester import (
+    PartitionedSignalTester, Layout, Lane, formal)
+import unittest
+
+
+class TestFormal(unittest.TestCase):
+    def test_formal(self):
+        m = Module()
+        a = Signal(10)
+        b = Signal(10)
+        m.d.comb += a.eq(AnyConst(10))
+        m.d.comb += b.eq(AnyConst(10))
+        m.d.comb += Assume(a * b == 1021 * 1019)
+        m.d.comb += Assert((a == 1021) | (a == 1019))
+        formal(self, m)
+
+    @unittest.expectedFailure
+    def test_formal_fail(self):
+        m = Module()
+        a = Signal(10)
+        b = Signal(10)
+        m.d.comb += a.eq(AnyConst(10))
+        m.d.comb += b.eq(AnyConst(10))
+        m.d.comb += Assume(a * b == 1021 * 1019)
+        m.d.comb += Assert(a == 1021)
+        formal(self, m)
+
+
+class TestLayout(unittest.TestCase):
+    maxDiff = None
+
+    def test_repr(self):
+        self.assertEqual(repr(Layout({3: False, 7: True}, 16)),
+                         "Layout((0, 3, 7, 16), width=16)")
+        self.assertEqual(repr(Layout({7: False, 3: True}, 16)),
+                         "Layout((0, 3, 7, 16), width=16)")
+        self.assertEqual(repr(Layout(range(0, 10, 2), 10)),
+                         "Layout((0, 2, 4, 6, 8, 10), width=10)")
+        self.assertEqual(repr(Layout(range(0, 10, 2))),
+                         "Layout((0, 2, 4, 6, 8), width=8)")
+        self.assertEqual(repr(Layout(())),
+                         "Layout((0,), width=0)")
+        self.assertEqual(repr(Layout((1,))),
+                         "Layout((0, 1), width=1)")
+
+    def test_cast(self):
+        a = Layout.cast((1, 2, 3))
+        self.assertEqual(repr(a),
+                         "Layout((0, 1, 2, 3), width=3)")
+        b = Layout.cast(a)
+        self.assertIs(a, b)
+
+    def test_part_signal_count(self):
+        a = Layout.cast(())
+        b = Layout.cast((1,))
+        c = Layout.cast((1, 3))
+        d = Layout.cast((0, 1, 3))
+        e = Layout.cast((1, 2, 3))
+        self.assertEqual(a.part_signal_count, 0)
+        self.assertEqual(b.part_signal_count, 0)
+        self.assertEqual(c.part_signal_count, 1)
+        self.assertEqual(d.part_signal_count, 1)
+        self.assertEqual(e.part_signal_count, 2)
+
+    def test_lanes(self):
+        a = Layout.cast(())
+        self.assertEqual(list(a.lanes()), [])
+        b = Layout.cast((1,))
+        self.assertEqual(list(b.lanes()), [Lane(0, 1, b)])
+        c = Layout.cast((1, 3))
+        self.assertEqual(list(c.lanes()),
+                         [Lane(0, 1, c),
+                          Lane(1, 2, c),
+                          Lane(0, 3, c)])
+        d = Layout.cast((1, 4, 5))
+        self.assertEqual(list(d.lanes()),
+                         [Lane(0, 1, d),
+                          Lane(4, 1, d),
+                          Lane(1, 3, d),
+                          Lane(0, 4, d),
+                          Lane(1, 4, d),
+                          Lane(0, 5, d)])
+        e = Layout(range(0, 32, 8), 32)
+        self.assertEqual(list(e.lanes()),
+                         [Lane(0, 8, e),
+                          Lane(8, 8, e),
+                          Lane(16, 8, e),
+                          Lane(24, 8, e),
+                          Lane(0, 16, e),
+                          Lane(8, 16, e),
+                          Lane(16, 16, e),
+                          Lane(0, 24, e),
+                          Lane(8, 24, e),
+                          Lane(0, 32, e)])
+
+    def test_is_compatible(self):
+        a = Layout.cast(())
+        b = Layout.cast((1,))
+        c = Layout.cast((1, 2))
+        d = Layout.cast((4, 5))
+        e = Layout.cast((1, 2, 3, 4))
+        f = Layout.cast((8, 16, 24, 32))
+        compatibility_classes = {
+            a: 0,
+            b: 1,
+            c: 2,
+            d: 2,
+            e: 4,
+            f: 4,
+        }
+        for lhs in compatibility_classes:
+            for rhs in compatibility_classes:
+                with self.subTest(lhs=lhs, rhs=rhs):
+                    self.assertEqual(compatibility_classes[lhs]
+                                     == compatibility_classes[rhs],
+                                     lhs.is_compatible(rhs))
+
+    def test_translate_lanes_to(self):
+        src = Layout((0, 3, 7, 12, 13))
+        dest = Layout((0, 8, 16, 24, 32))
+        src_lanes = list(src.lanes())
+        src_lanes.sort(key=lambda lane: lane.start)
+        dest_lanes = list(dest.lanes())
+        dest_lanes.sort(key=lambda lane: lane.start)
+        self.assertEqual(len(src_lanes), len(dest_lanes))
+        for src_lane, dest_lane in zip(src_lanes, dest_lanes):
+            with self.subTest(src_lane=src_lane, dest_lane=dest_lane):
+                self.assertEqual(src_lane.translate_to(dest), dest_lane)
+                self.assertEqual(dest_lane.translate_to(src), src_lane)
+
+
+class TestLane(unittest.TestCase):
+    def test_is_active(self):
+        layout = Layout((0, 8, 16, 24, 32))
+        def do_test(part_starts: Tuple[bool, ...],
+                    expected_lanes: Set[Tuple[int, int]]):
+            with self.subTest(part_starts=part_starts,
+                              expected_lanes=expected_lanes):
+                for lane in layout.lanes():
+                    expected = (lane.start, lane.size) in expected_lanes
+                    with self.subTest(lane=lane):
+                        self.assertEqual(lane.is_active(part_starts),
+                                         expected)
+
+        _0 = False
+        _1 = True
+        do_test((_1, _0, _0, _0, _1),
+                {(0, 32)})
+        do_test((_1, _0, _0, _1, _1),
+                {(0, 24), (24, 8)})
+        do_test((_1, _0, _1, _0, _1),
+                {(0, 16), (16, 16)})
+        do_test((_1, _0, _1, _1, _1),
+                {(0, 16), (16, 8), (24, 8)})
+        do_test((_1, _1, _0, _0, _1),
+                {(0, 8), (8, 24)})
+        do_test((_1, _1, _0, _1, _1),
+                {(0, 8), (8, 16), (24, 8)})
+        do_test((_1, _1, _1, _0, _1),
+                {(0, 8), (8, 8), (16, 16)})
+        do_test((_1, _1, _1, _1, _1),
+                {(0, 8), (8, 8), (16, 8), (24, 8)})
+
+    def test_as_slice(self):
+        slice_target = tuple(range(8))
+        layout = Layout((0, 2, 4, 6, 8))
+        slices = list(slice_target[lane.as_slice()]
+                      for lane in layout.lanes())
+        self.assertEqual(slices,
+                         [(0, 1),
+                          (2, 3),
+                          (4, 5),
+                          (6, 7),
+                          (0, 1, 2, 3),
+                          (2, 3, 4, 5),
+                          (4, 5, 6, 7),
+                          (0, 1, 2, 3, 4, 5),
+                          (2, 3, 4, 5, 6, 7),
+                          (0, 1, 2, 3, 4, 5, 6, 7)])
+
+
+class TestPartitionedSignalTester(unittest.TestCase):
+    def test_sim_identity(self):
+        m = Module()
+        PartitionedSignalTester(m,
+                                lambda inputs: inputs[0],
+                                lambda lane, inputs: inputs[0],
+                                (0, 8, 16, 24, 32)).run_sim(self)
+
+    def test_formal_identity(self):
+        m = Module()
+        PartitionedSignalTester(m,
+                                lambda inputs: inputs[0],
+                                lambda lane, inputs: inputs[0],
+                                (0, 8, 16, 24, 32)).run_formal(self)
+
+    def test_sim_pass_through_input(self):
+        for which_input in range(0, 2):
+            m = Module()
+            PartitionedSignalTester(m,
+                                    lambda inputs: inputs[which_input],
+                                    lambda lane, inputs: inputs[which_input],
+                                    (0, 8, 16, 24, 32),
+                                    (0, 1, 2, 3, 4)).run_sim(self)
+
+    def test_formal_pass_through_input(self):
+        for which_input in range(0, 2):
+            m = Module()
+            PartitionedSignalTester(m,
+                                    lambda inputs: inputs[which_input],
+                                    lambda lane, inputs: inputs[which_input],
+                                    (0, 8, 16, 24, 32),
+                                    (0, 1, 2, 3, 4)).run_formal(self)
+
+
+if __name__ == '__main__':
+    unittest.main()