periph: add Peripheral base class
authorJean-François Nguyen <jf@lambdaconcept.com>
Wed, 25 Mar 2020 11:54:45 +0000 (12:54 +0100)
committerJean-François Nguyen <jf@lambdaconcept.com>
Wed, 25 Mar 2020 12:09:50 +0000 (13:09 +0100)
lambdasoc/__init__.py [new file with mode: 0644]
lambdasoc/periph/__init__.py [new file with mode: 0644]
lambdasoc/periph/_event.py [new file with mode: 0644]
lambdasoc/periph/base.py [new file with mode: 0644]
lambdasoc/test/_wishbone.py [new file with mode: 0644]
lambdasoc/test/test_periph_base.py [new file with mode: 0644]
lambdasoc/test/test_periph_event.py [new file with mode: 0644]

diff --git a/lambdasoc/__init__.py b/lambdasoc/__init__.py
new file mode 100644 (file)
index 0000000..3d136b5
--- /dev/null
@@ -0,0 +1,5 @@
+import pkg_resources
+try:
+    __version__ = pkg_resources.get_distribution(__name__).version
+except pkg_resources.DistributionNotFound:
+    pass
diff --git a/lambdasoc/periph/__init__.py b/lambdasoc/periph/__init__.py
new file mode 100644 (file)
index 0000000..9b5ed21
--- /dev/null
@@ -0,0 +1 @@
+from .base import *
diff --git a/lambdasoc/periph/_event.py b/lambdasoc/periph/_event.py
new file mode 100644 (file)
index 0000000..53df4aa
--- /dev/null
@@ -0,0 +1,135 @@
+from nmigen import *
+from nmigen import tracer
+
+from nmigen_soc import csr
+
+
+__all__ = ["EventSource", "IRQLine", "InterruptSource"]
+
+
+class EventSource:
+    """Event source.
+
+    Parameters
+    ----------
+    mode : ``"level"``, ``"rise"``, ``"fall"``
+        Trigger mode. If ``"level"``, a notification is raised when the ``stb`` signal is high.
+        If ``"rise"`` (or ``"fall"``) a notification is raised on a rising (or falling) edge
+        of ``stb``.
+    name : str
+        Name of the event. If ``None`` (default) the name is inferred from the variable
+        name this event source is assigned to.
+
+    Attributes
+    ----------
+    name : str
+        Name of the event
+    mode : ``"level"``, ``"rise"``, ``"fall"``
+        Trigger mode.
+    stb : Signal, in
+        Event strobe.
+    """
+    def __init__(self, *, mode="level", name=None, src_loc_at=0):
+        if name is not None and not isinstance(name, str):
+            raise TypeError("Name must be a string, not {!r}".format(name))
+
+        choices = ("level", "rise", "fall")
+        if mode not in choices:
+            raise ValueError("Invalid trigger mode {!r}; must be one of {}"
+                             .format(mode, ", ".join(choices)))
+
+        self.name = name or tracer.get_var_name(depth=2 + src_loc_at)
+        self.mode = mode
+        self.stb  = Signal(name="{}_stb".format(self.name))
+
+
+class IRQLine(Signal):
+    """Interrupt request line."""
+    def __init__(self, *, name=None, src_loc_at=0):
+        super().__init__(name=name, src_loc_at=1 + src_loc_at)
+
+    __hash__ = object.__hash__
+
+
+class InterruptSource(Elaboratable):
+    """Interrupt source.
+
+    A mean of gathering multiple event sources into a single interrupt request line.
+
+    Parameters
+    ----------
+    events : iter(:class:`EventSource`)
+        Event sources.
+    name : str
+        Name of the interrupt source. If ``None`` (default) the name is inferred from the
+        variable name this interrupt source is assigned to.
+
+    Attributes
+    ----------
+    name : str
+        Name of the interrupt source.
+    status : :class:`csr.Element`, read-only
+        Event status register. Each bit displays the level of the strobe of an event source.
+        Events are ordered by position in the `events` parameter.
+    pending : :class:`csr.Element`, read/write
+        Event pending register. If a bit is 1, the associated event source has a pending
+        notification. Writing 1 to a bit clears it.
+        Events are ordered by position in the `events` parameter.
+    enable : :class:`csr.Element`, read/write
+        Event enable register. Writing 1 to a bit enables its associated event source.
+        Writing 0 disables it.
+        Events are ordered by position in the `events` parameter.
+    irq : :class:`IRQLine`, out
+        Interrupt request. It is raised if any event source is enabled and has a pending
+        notification.
+    """
+    def __init__(self, events, *, name=None, src_loc_at=0):
+        if name is not None and not isinstance(name, str):
+            raise TypeError("Name must be a string, not {!r}".format(name))
+        self.name = name or tracer.get_var_name(depth=2 + src_loc_at)
+
+        for event in events:
+            if not isinstance(event, EventSource):
+                raise TypeError("Event source must be an instance of EventSource, not {!r}"
+                                .format(event))
+        self._events = list(events)
+
+        width = len(events)
+        self.status  = csr.Element(width, "r",  name="{}_status".format(self.name))
+        self.pending = csr.Element(width, "rw", name="{}_pending".format(self.name))
+        self.enable  = csr.Element(width, "rw", name="{}_enable".format(self.name))
+
+        self.irq = IRQLine(name="{}_irq".format(self.name))
+
+    def elaborate(self, platform):
+        m = Module()
+
+        with m.If(self.pending.w_stb):
+            m.d.sync += self.pending.r_data.eq(self.pending.r_data & ~self.pending.w_data)
+
+        with m.If(self.enable.w_stb):
+            m.d.sync += self.enable.r_data.eq(self.enable.w_data)
+
+        for i, event in enumerate(self._events):
+            m.d.sync += self.status.r_data[i].eq(event.stb)
+
+            if event.mode in ("rise", "fall"):
+                event_stb_r = Signal.like(event.stb, name_suffix="_r")
+                m.d.sync += event_stb_r.eq(event.stb)
+
+            event_trigger = Signal(name="{}_trigger".format(event.name))
+            if event.mode == "level":
+                m.d.comb += event_trigger.eq(event.stb)
+            elif event.mode == "rise":
+                m.d.comb += event_trigger.eq(~event_stb_r & event.stb)
+            elif event.mode == "fall":
+                m.d.comb += event_trigger.eq(event_stb_r & ~event.stb)
+            else:
+                assert False # :nocov:
+
+            with m.If(event_trigger):
+                m.d.sync += self.pending.r_data[i].eq(1)
+
+        m.d.comb += self.irq.eq((self.pending.r_data & self.enable.r_data).any())
+
+        return m
diff --git a/lambdasoc/periph/base.py b/lambdasoc/periph/base.py
new file mode 100644 (file)
index 0000000..6101946
--- /dev/null
@@ -0,0 +1,358 @@
+from nmigen import *
+from nmigen import tracer
+from nmigen.utils import log2_int
+
+from nmigen_soc import csr, wishbone
+from nmigen_soc.memory import MemoryMap
+from nmigen_soc.csr.wishbone import WishboneCSRBridge
+
+
+from ._event import *
+
+
+__all__ = ["Peripheral", "CSRBank", "PeripheralBridge"]
+
+
+class Peripheral:
+    """Wishbone peripheral.
+
+    A helper class to reduce the boilerplate needed to control a peripheral with a Wishbone interface.
+    It provides facilities for instantiating CSR registers, requesting windows to subordinate busses
+    and sending interrupt requests to the CPU.
+
+    The ``Peripheral`` class is not meant to be instantiated as-is, but rather as a base class for
+    actual peripherals.
+
+    Usage example
+    -------------
+
+    ```
+    class ExamplePeripheral(Peripheral, Elaboratable):
+        def __init__(self):
+            super().__init__()
+            bank         = self.csr_bank()
+            self._foo    = bank.csr(8, "r")
+            self._bar    = bank.csr(8, "w")
+
+            self._rdy    = self.event(mode="rise")
+
+            self._bridge = self.bridge(data_width=32, granularity=8, alignment=2)
+            self.bus     = self._bridge.bus
+            self.irq     = self._bridge.irq
+
+        def elaborate(self, platform):
+            m = Module()
+            m.submodules.bridge = self._bridge
+            # ...
+            return m
+    ```
+
+    Arguments
+    ---------
+    name : str
+        Name of this peripheral. If ``None`` (default) the name is inferred from the variable
+        name this peripheral is assigned to.
+
+    Properties
+    ----------
+    name : str
+        Name of the peripheral.
+    """
+    def __init__(self, name=None, src_loc_at=1):
+        if name is not None and not isinstance(name, str):
+            raise TypeError("Name must be a string, not {!r}".format(name))
+        self.name      = name or tracer.get_var_name(depth=2 + src_loc_at).lstrip("_")
+
+        self._csr_banks = []
+        self._windows   = []
+        self._events    = []
+
+        self._bus       = None
+        self._irq       = None
+
+    @property
+    def bus(self):
+        """Wishbone bus interface.
+
+        Return value
+        ------------
+        An instance of :class:`Interface`.
+
+        Exceptions
+        ----------
+        Raises :exn:`NotImplementedError` if the peripheral does not have a Wishbone bus.
+        """
+        if self._bus is None:
+            raise NotImplementedError("Peripheral {!r} does not have a bus interface"
+                                      .format(self))
+        return self._bus
+
+    @bus.setter
+    def bus(self, bus):
+        if not isinstance(bus, wishbone.Interface):
+            raise TypeError("Bus interface must be an instance of wishbone.Interface, not {!r}"
+                            .format(bus))
+        self._bus = bus
+
+    @property
+    def irq(self):
+        """Interrupt request line.
+
+        Return value
+        ------------
+        An instance of :class:`IRQLine`.
+
+        Exceptions
+        ----------
+        Raises :exn:`NotImplementedError` if the peripheral does not have an IRQ line.
+        """
+        if self._irq is None:
+            raise NotImplementedError("Peripheral {!r} does not have an IRQ line"
+                                      .format(self))
+        return self._irq
+
+    @irq.setter
+    def irq(self, irq):
+        if not isinstance(irq, IRQLine):
+            raise TypeError("IRQ line must be an instance of IRQLine, not {!r}"
+                            .format(irq))
+        self._irq = irq
+
+    def csr_bank(self, *, addr=None, alignment=None):
+        """Request a CSR bank.
+
+        Arguments
+        ---------
+        addr : int or None
+            Address of the bank. If ``None``, the implicit next address will be used.
+            Otherwise, the exact specified address (which must be a multiple of
+            ``2 ** max(alignment, bridge_alignment)``) will be used.
+        alignment : int or None
+            Alignment of the bank. If not specified, the bridge alignment is used.
+            See :class:`nmigen_soc.csr.Multiplexer` for details.
+
+        Return value
+        ------------
+        An instance of :class:`CSRBank`.
+        """
+        bank = CSRBank(name_prefix=self.name)
+        self._csr_banks.append((bank, addr, alignment))
+        return bank
+
+    def window(self, *, addr_width, data_width, granularity=None, features=frozenset(),
+               alignment=0, addr=None, sparse=None):
+        """Request a window to a subordinate bus.
+
+        See :meth:`nmigen_soc.wishbone.Decoder.add` for details.
+
+        Return value
+        ------------
+        An instance of :class:`nmigen_soc.wishbone.Interface`.
+        """
+        window = wishbone.Interface(addr_width=addr_width, data_width=data_width,
+                                    granularity=granularity, features=features)
+        granularity_bits = log2_int(data_width // window.granularity)
+        window.memory_map = MemoryMap(addr_width=addr_width + granularity_bits,
+                                      data_width=window.granularity, alignment=alignment)
+        self._windows.append((window, addr, sparse))
+        return window
+
+    def event(self, *, mode="level", name=None, src_loc_at=0):
+        """Request an event source.
+
+        See :class:`EventSource` for details.
+
+        Return value
+        ------------
+        An instance of :class:`EventSource`.
+        """
+        event = EventSource(mode=mode, name=name, src_loc_at=1 + src_loc_at)
+        self._events.append(event)
+        return event
+
+    def bridge(self, *, data_width=8, granularity=None, features=frozenset(), alignment=0):
+        """Request a bridge to the resources of the peripheral.
+
+        See :class:`PeripheralBridge` for details.
+
+        Return value
+        ------------
+        A :class:`PeripheralBridge` providing access to local resources.
+        """
+        return PeripheralBridge(self, data_width=data_width, granularity=granularity,
+                                features=features, alignment=alignment)
+
+    def iter_csr_banks(self):
+        """Iterate requested CSR banks and their parameters.
+
+        Yield values
+        ------------
+        A tuple ``bank, addr, alignment`` describing the bank and its parameters.
+        """
+        for bank, addr, alignment in self._csr_banks:
+            yield bank, addr, alignment
+
+    def iter_windows(self):
+        """Iterate requested windows and their parameters.
+
+        Yield values
+        ------------
+        A tuple ``window, addr, sparse`` descr
+        given to :meth:`Peripheral.window`.
+        """
+        for window, addr, sparse in self._windows:
+            yield window, addr, sparse
+
+    def iter_events(self):
+        """Iterate requested event sources.
+
+        Yield values
+        ------------
+        An instance of :class:`EventSource`.
+        """
+        for event in self._events:
+            yield event
+
+
+class CSRBank:
+    """CSR register bank.
+
+    Parameters
+    ----------
+    name_prefix : str
+        Name prefix of the bank registers.
+    """
+    def __init__(self, *, name_prefix=""):
+        self._name_prefix = name_prefix
+        self._csr_regs    = []
+
+    def csr(self, width, access, *, addr=None, alignment=None, name=None,
+            src_loc_at=0):
+        """Request a CSR register.
+
+        Parameters
+        ----------
+        width : int
+            Width of the register. See :class:`nmigen_soc.csr.Element`.
+        access : :class:`Access`
+            Register access mode. See :class:`nmigen_soc.csr.Element`.
+        addr : int
+            Address of the register. See :meth:`nmigen_soc.csr.Multiplexer.add`.
+        alignment : int
+            Register alignment. See :class:`nmigen_soc.csr.Multiplexer`.
+        name : str
+            Name of the register. If ``None`` (default) the name is inferred from the variable
+            name this register is assigned to.
+
+        Return value
+        ------------
+        An instance of :class:`nmigen_soc.csr.Element`.
+        """
+        if name is not None and not isinstance(name, str):
+            raise TypeError("Name must be a string, not {!r}".format(name))
+        name = name or tracer.get_var_name(depth=2 + src_loc_at).lstrip("_")
+
+        elem_name = "{}_{}".format(self._name_prefix, name)
+        elem = csr.Element(width, access, name=elem_name)
+        self._csr_regs.append((elem, addr, alignment))
+        return elem
+
+    def iter_csr_regs(self):
+        """Iterate requested CSR registers and their parameters.
+
+        Yield values
+        ------------
+        A tuple ``elem, addr, alignment`` describing the register and its parameters.
+        """
+        for elem, addr, alignment in self._csr_regs:
+            yield elem, addr, alignment
+
+
+class PeripheralBridge(Elaboratable):
+    """Peripheral bridge.
+
+    A bridge providing access to the registers and windows of a peripheral, and support for
+    interrupt requests from its event sources.
+
+    Event managment is performed by an :class:`InterruptSource` submodule.
+
+    Parameters
+    ---------
+    periph : :class:`Peripheral`
+        The peripheral whose resources are exposed by this bridge.
+    data_width : int
+        Data width. See :class:`nmigen_soc.wishbone.Interface`.
+    granularity : int or None
+        Granularity. See :class:`nmigen_soc.wishbone.Interface`.
+    features : iter(str)
+        Optional signal set. See :class:`nmigen_soc.wishbone.Interface`.
+    alignment : int
+        Resource alignment. See :class:`nmigen_soc.memory.MemoryMap`.
+
+    Attributes
+    ----------
+    bus : :class:`nmigen_soc.wishbone.Interface`
+        Wishbone bus providing access to the resources of the peripheral.
+    irq : :class:`IRQLine`, out
+        Interrupt request. It is raised if any event source is enabled and has a pending
+        notification.
+    """
+    def __init__(self, periph, *, data_width, granularity, features, alignment):
+        if not isinstance(periph, Peripheral):
+            raise TypeError("Peripheral must be an instance of Peripheral, not {!r}"
+                            .format(periph))
+
+        self._wb_decoder = wishbone.Decoder(addr_width=1, data_width=data_width,
+                                            granularity=granularity,
+                                            features=features, alignment=alignment)
+
+        self._csr_subs = []
+
+        for bank, bank_addr, bank_alignment in periph.iter_csr_banks():
+            if bank_alignment is None:
+                bank_alignment = alignment
+            csr_mux = csr.Multiplexer(addr_width=1, data_width=8, alignment=bank_alignment)
+            for elem, elem_addr, elem_alignment in bank.iter_csr_regs():
+                if elem_alignment is None:
+                    elem_alignment = alignment
+                csr_mux.add(elem, addr=elem_addr, alignment=elem_alignment, extend=True)
+
+            csr_bridge = WishboneCSRBridge(csr_mux.bus, data_width=data_width)
+            self._wb_decoder.add(csr_bridge.wb_bus, addr=bank_addr, extend=True)
+            self._csr_subs.append((csr_mux, csr_bridge))
+
+        for window, window_addr, window_sparse in periph.iter_windows():
+            self._wb_decoder.add(window, addr=window_addr, sparse=window_sparse, extend=True)
+
+        events = list(periph.iter_events())
+        if len(events) > 0:
+            self._int_src = InterruptSource(events, name="{}_ev".format(periph.name))
+            self.irq      = self._int_src.irq
+
+            csr_mux = csr.Multiplexer(addr_width=1, data_width=8, alignment=alignment)
+            csr_mux.add(self._int_src.status,  extend=True)
+            csr_mux.add(self._int_src.pending, extend=True)
+            csr_mux.add(self._int_src.enable,  extend=True)
+
+            csr_bridge = WishboneCSRBridge(csr_mux.bus, data_width=data_width)
+            self._wb_decoder.add(csr_bridge.wb_bus, extend=True)
+            self._csr_subs.append((csr_mux, csr_bridge))
+        else:
+            self._int_src = None
+            self.irq      = None
+
+        self.bus = self._wb_decoder.bus
+
+    def elaborate(self, platform):
+        m = Module()
+
+        for i, (csr_mux, csr_bridge) in enumerate(self._csr_subs):
+            m.submodules[   "csr_mux_{}".format(i)] = csr_mux
+            m.submodules["csr_bridge_{}".format(i)] = csr_bridge
+
+        if self._int_src is not None:
+            m.submodules._int_src = self._int_src
+
+        m.submodules.wb_decoder = self._wb_decoder
+
+        return m
diff --git a/lambdasoc/test/_wishbone.py b/lambdasoc/test/_wishbone.py
new file mode 100644 (file)
index 0000000..c7ab28c
--- /dev/null
@@ -0,0 +1,34 @@
+def wb_read(bus, addr, sel, timeout=32):
+    yield bus.cyc.eq(1)
+    yield bus.stb.eq(1)
+    yield bus.adr.eq(addr)
+    yield bus.sel.eq(sel)
+    yield
+    cycles = 0
+    while not (yield bus.ack):
+        yield
+        if cycles >= timeout:
+            raise RuntimeError("Wishbone transaction timed out")
+        cycles += 1
+    data = (yield bus.dat_r)
+    yield bus.cyc.eq(0)
+    yield bus.stb.eq(0)
+    return data
+
+def wb_write(bus, addr, data, sel, timeout=32):
+    yield bus.cyc.eq(1)
+    yield bus.stb.eq(1)
+    yield bus.adr.eq(addr)
+    yield bus.we.eq(1)
+    yield bus.sel.eq(sel)
+    yield bus.dat_w.eq(data)
+    yield
+    cycles = 0
+    while not (yield bus.ack):
+        yield
+        if cycles >= timeout:
+            raise RuntimeError("Wishbone transaction timed out")
+        cycles += 1
+    yield bus.cyc.eq(0)
+    yield bus.stb.eq(0)
+    yield bus.we.eq(0)
diff --git a/lambdasoc/test/test_periph_base.py b/lambdasoc/test/test_periph_base.py
new file mode 100644 (file)
index 0000000..925f49d
--- /dev/null
@@ -0,0 +1,229 @@
+# nmigen: UnusedElaboratable=no
+
+import unittest
+from nmigen import *
+from nmigen.back.pysim import *
+
+from ._wishbone import *
+from ..periph.base import Peripheral, CSRBank, PeripheralBridge
+
+
+def simulation_test(dut, process):
+    with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+        sim.add_clock(1e-6)
+        sim.add_sync_process(process)
+        sim.run()
+
+
+class PeripheralTestCase(unittest.TestCase):
+    def test_name(self):
+        class Wrapper(Peripheral):
+            def __init__(self):
+                super().__init__()
+        periph_0 = Wrapper()
+        periph_1 = Peripheral(name="periph_1")
+        self.assertEqual(periph_0.name, "periph_0")
+        self.assertEqual(periph_1.name, "periph_1")
+
+    def test_periph_name_wrong(self):
+        with self.assertRaisesRegex(TypeError,
+                r"Name must be a string, not 2"):
+            periph = Peripheral(name=2)
+
+    def test_set_bus_wrong(self):
+        periph = Peripheral(src_loc_at=0)
+        with self.assertRaisesRegex(TypeError,
+                r"Bus interface must be an instance of wishbone.Interface, not 'foo'"):
+            periph.bus = "foo"
+
+    def test_get_bus_wrong(self):
+        periph = Peripheral(src_loc_at=0)
+        with self.assertRaisesRegex(NotImplementedError,
+                r"Peripheral <.*> does not have a bus interface"):
+            periph.bus
+
+    def test_set_irq_wrong(self):
+        periph = Peripheral(src_loc_at=0)
+        with self.assertRaisesRegex(TypeError,
+                r"IRQ line must be an instance of IRQLine, not 'foo'"):
+            periph.irq = "foo"
+
+    def test_get_irq_wrong(self):
+        periph = Peripheral(src_loc_at=0)
+        with self.assertRaisesRegex(NotImplementedError,
+                r"Peripheral <.*> does not have an IRQ line"):
+            periph.irq
+
+    def test_iter_csr_banks(self):
+        periph = Peripheral(src_loc_at=0)
+        bank_0 = periph.csr_bank()
+        bank_1 = periph.csr_bank(addr=0x4, alignment=2)
+        self.assertEqual(list(periph.iter_csr_banks()), [
+            (bank_0, None, None),
+            (bank_1,  0x4, 2),
+        ])
+
+    def test_iter_windows(self):
+        periph = Peripheral(src_loc_at=0)
+        win_0 = periph.window(addr_width=2, data_width=8)
+        win_1 = periph.window(addr_width=4, data_width=8, addr=0x4, sparse=True)
+        self.assertEqual(list(periph.iter_windows()), [
+            (win_0, None, None),
+            (win_1, 0x4,  True),
+        ])
+
+    def test_iter_events(self):
+        periph = Peripheral(src_loc_at=0)
+        ev_0 = periph.event()
+        ev_1 = periph.event(mode="rise")
+        self.assertEqual((ev_0.name, ev_0.mode), ("ev_0", "level"))
+        self.assertEqual((ev_1.name, ev_1.mode), ("ev_1", "rise"))
+        self.assertEqual(list(periph.iter_events()), [
+            ev_0,
+            ev_1,
+        ])
+
+
+class CSRBankTestCase(unittest.TestCase):
+    def test_csr_name(self):
+        bank = CSRBank(name_prefix="foo")
+        bar = bank.csr(1, "r")
+        self.assertEqual(bar.name, "foo_bar")
+
+    def test_csr_name_wrong(self):
+        bank = CSRBank()
+        with self.assertRaisesRegex(TypeError,
+                r"Name must be a string, not 2"):
+            bank.csr(1, "r", name=2)
+
+    def test_iter_csr_regs(self):
+        bank = CSRBank()
+        csr_0 = bank.csr(1, "r")
+        csr_1 = bank.csr(8, "rw", addr=0x4, alignment=2)
+        self.assertEqual(list(bank.iter_csr_regs()), [
+            (csr_0, None, None),
+            (csr_1,  0x4,    2),
+        ])
+
+
+class PeripheralBridgeTestCase(unittest.TestCase):
+    def test_periph_wrong(self):
+        with self.assertRaisesRegex(TypeError,
+                r"Peripheral must be an instance of Peripheral, not 'foo'"):
+            PeripheralBridge('foo', data_width=8, granularity=8, features=(), alignment=0)
+
+
+class PeripheralSimulationTestCase(unittest.TestCase):
+    def test_csrs(self):
+        class DummyPeripheral(Peripheral, Elaboratable):
+            def __init__(self):
+                super().__init__()
+                bank         = self.csr_bank(addr=0x100)
+                self._csr_0  = bank.csr(8, "r")
+                self._csr_1  = bank.csr(8, "r", addr=0x8, alignment=4)
+                self._csr_2  = bank.csr(8, "rw")
+
+                self.win_0   = self.window(addr_width=1, data_width=8, sparse=True, addr=0x000)
+                self.win_1   = self.window(addr_width=1, data_width=32, granularity=8, addr=0x200)
+
+                self._bridge = self.bridge(data_width=32, granularity=8, alignment=2)
+                self.bus     = self._bridge.bus
+
+            def elaborate(self, platform):
+                m = Module()
+                m.submodules.bridge = self._bridge
+                m.d.comb += [
+                    self._csr_0.r_data.eq(0xa),
+                    self._csr_1.r_data.eq(0xb),
+                ]
+                with m.If(self._csr_2.w_stb):
+                    m.d.sync += self._csr_2.r_data.eq(self._csr_2.w_data)
+                return m
+
+        dut = DummyPeripheral()
+
+        def process():
+            self.assertEqual((yield from wb_read(dut.bus, addr=0x100 >> 2, sel=0xf)), 0xa)
+            yield
+            self.assertEqual((yield from wb_read(dut.bus, addr=0x108 >> 2, sel=0xf)), 0xb)
+            yield
+            yield from wb_write(dut.bus, addr=0x118 >> 2, data=0xc, sel=0xf)
+            yield
+            self.assertEqual((yield from wb_read(dut.bus, addr=0x118 >> 2, sel=0xf)), 0xc)
+            yield
+
+            yield dut.bus.cyc.eq(1)
+            yield dut.bus.adr.eq(0x000 >> 2)
+            yield Delay(1e-7)
+            self.assertEqual((yield dut.win_0.cyc), 1)
+
+            yield dut.bus.adr.eq(0x200 >> 2)
+            yield Delay(1e-7)
+            self.assertEqual((yield dut.win_1.cyc), 1)
+
+        simulation_test(dut, process)
+
+    def test_events(self):
+        class DummyPeripheral(Peripheral, Elaboratable):
+            def __init__(self):
+                super().__init__()
+                self.ev_0    = self.event()
+                self.ev_1    = self.event(mode="rise")
+                self.ev_2    = self.event(mode="fall")
+                self._bridge = self.bridge(data_width=8)
+                self.bus     = self._bridge.bus
+                self.irq     = self._bridge.irq
+
+            def elaborate(self, platform):
+                m = Module()
+                m.submodules.bridge = self._bridge
+                return m
+
+        dut = DummyPeripheral()
+
+        ev_status_addr  = 0x0
+        ev_pending_addr = 0x1
+        ev_enable_addr  = 0x2
+
+        def process():
+            yield dut.ev_0.stb.eq(1)
+            yield dut.ev_1.stb.eq(0)
+            yield dut.ev_2.stb.eq(1)
+            yield
+            self.assertEqual((yield dut.irq), 0)
+            self.assertEqual((yield from wb_read(dut.bus, ev_status_addr, sel=0xf)), 0b101)
+            yield
+
+            yield from wb_write(dut.bus, ev_enable_addr, data=0b111, sel=0xf)
+            yield
+            self.assertEqual((yield dut.irq), 1)
+
+            yield from wb_write(dut.bus, ev_pending_addr, data=0b001, sel=0xf)
+            yield
+            self.assertEqual((yield from wb_read(dut.bus, ev_pending_addr, sel=0xf)), 0b001)
+            yield
+            self.assertEqual((yield dut.irq), 1)
+            yield dut.ev_0.stb.eq(0)
+            yield from wb_write(dut.bus, ev_pending_addr, data=0b001, sel=0xf)
+            yield
+            self.assertEqual((yield dut.irq), 0)
+
+            yield dut.ev_1.stb.eq(1)
+            yield
+            self.assertEqual((yield from wb_read(dut.bus, ev_pending_addr, sel=0xf)), 0b010)
+            yield
+            self.assertEqual((yield dut.irq), 1)
+            yield from wb_write(dut.bus, ev_pending_addr, data=0b010, sel=0xf)
+            yield
+            self.assertEqual((yield dut.irq), 0)
+
+            yield dut.ev_2.stb.eq(0)
+            yield
+            self.assertEqual((yield from wb_read(dut.bus, ev_pending_addr, sel=0xf)), 0b100)
+            yield
+            self.assertEqual((yield dut.irq), 1)
+            yield from wb_write(dut.bus, ev_pending_addr, data=0b100, sel=0xf)
+            yield
+            self.assertEqual((yield dut.irq), 0)
+
+        simulation_test(dut, process)
diff --git a/lambdasoc/test/test_periph_event.py b/lambdasoc/test/test_periph_event.py
new file mode 100644 (file)
index 0000000..4934af7
--- /dev/null
@@ -0,0 +1,138 @@
+# nmigen: UnusedElaboratable=no
+
+import unittest
+from nmigen import *
+from nmigen.back.pysim import *
+
+from ..periph._event import *
+
+
+def simulation_test(dut, process):
+    with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+        sim.add_clock(1e-6)
+        sim.add_sync_process(process)
+        sim.run()
+
+
+class EventSourceTestCase(unittest.TestCase):
+    def test_simple(self):
+        ev = EventSource()
+        self.assertEqual(ev.name, "ev")
+        self.assertEqual(ev.mode, "level")
+
+    def test_name_wrong(self):
+        with self.assertRaisesRegex(TypeError,
+                r"Name must be a string, not 2"):
+            EventSource(name=2)
+
+    def test_mode_wrong(self):
+        with self.assertRaisesRegex(ValueError,
+                r"Invalid trigger mode 'foo'; must be one of level, rise, fall"):
+            ev = EventSource(mode="foo")
+
+
+class InterruptSourceTestCase(unittest.TestCase):
+    def test_simple(self):
+        ev_0 = EventSource()
+        ev_1 = EventSource()
+        dut = InterruptSource((ev_0, ev_1))
+        self.assertEqual(dut.name, "dut")
+        self.assertEqual(dut.status.width, 2)
+        self.assertEqual(dut.pending.width, 2)
+        self.assertEqual(dut.enable.width, 2)
+
+    def test_name_wrong(self):
+        with self.assertRaisesRegex(TypeError,
+                r"Name must be a string, not 2"):
+            InterruptSource((), name=2)
+
+    def test_event_wrong(self):
+        with self.assertRaisesRegex(TypeError,
+                r"Event source must be an instance of EventSource, not 'foo'"):
+            dut = InterruptSource(("foo",))
+
+    def test_events(self):
+        ev_0 = EventSource(mode="level")
+        ev_1 = EventSource(mode="rise")
+        ev_2 = EventSource(mode="fall")
+        dut  = InterruptSource((ev_0, ev_1, ev_2))
+
+        def process():
+            yield ev_0.stb.eq(1)
+            yield ev_1.stb.eq(0)
+            yield ev_2.stb.eq(1)
+            yield
+            self.assertEqual((yield dut.irq), 0)
+
+            yield dut.status.r_stb.eq(1)
+            yield
+            yield dut.status.r_stb.eq(0)
+            yield
+            self.assertEqual((yield dut.status.r_data), 0b101)
+            yield
+
+            yield dut.enable.w_stb.eq(1)
+            yield dut.enable.w_data.eq(0b111)
+            yield
+            yield dut.enable.w_stb.eq(0)
+            yield
+            yield
+            self.assertEqual((yield dut.irq), 1)
+
+            yield dut.pending.w_stb.eq(1)
+            yield dut.pending.w_data.eq(0b001)
+            yield
+            yield dut.pending.w_stb.eq(0)
+            yield
+
+            yield dut.pending.r_stb.eq(1)
+            yield
+            yield dut.pending.r_stb.eq(0)
+            yield
+            self.assertEqual((yield dut.pending.r_data), 0b001)
+            self.assertEqual((yield dut.irq), 1)
+            yield
+
+            yield ev_0.stb.eq(0)
+            yield dut.pending.w_stb.eq(1)
+            yield dut.pending.w_data.eq(0b001)
+            yield
+            yield dut.pending.w_stb.eq(0)
+            yield
+            yield
+            self.assertEqual((yield dut.irq), 0)
+
+            yield ev_1.stb.eq(1)
+            yield dut.pending.r_stb.eq(1)
+            yield
+            yield dut.pending.r_stb.eq(0)
+            yield
+            self.assertEqual((yield dut.pending.r_data), 0b010)
+            self.assertEqual((yield dut.irq), 1)
+
+            yield dut.pending.w_stb.eq(1)
+            yield dut.pending.w_data.eq(0b010)
+            yield
+            yield dut.pending.w_stb.eq(0)
+            yield
+            yield
+            self.assertEqual((yield dut.irq), 0)
+
+            yield ev_2.stb.eq(0)
+            yield
+            yield dut.pending.r_stb.eq(1)
+            yield
+            yield dut.pending.r_stb.eq(0)
+            yield
+            self.assertEqual((yield dut.pending.r_data), 0b100)
+            self.assertEqual((yield dut.irq), 1)
+
+            yield dut.pending.w_stb.eq(1)
+            yield dut.pending.w_data.eq(0b100)
+            yield
+            yield dut.pending.w_stb.eq(0)
+            yield
+            yield
+            self.assertEqual((yield dut.irq), 0)
+
+        simulation_test(dut, process)