From: Jean-François Nguyen Date: Wed, 25 Mar 2020 11:54:45 +0000 (+0100) Subject: periph: add Peripheral base class X-Git-Url: https://git.libre-soc.org/?a=commitdiff_plain;h=b1fb5bfcaa96540936828428a108d70e2372239d;p=lambdasoc.git periph: add Peripheral base class --- diff --git a/lambdasoc/__init__.py b/lambdasoc/__init__.py new file mode 100644 index 0000000..3d136b5 --- /dev/null +++ b/lambdasoc/__init__.py @@ -0,0 +1,5 @@ +import pkg_resources +try: + __version__ = pkg_resources.get_distribution(__name__).version +except pkg_resources.DistributionNotFound: + pass diff --git a/lambdasoc/periph/__init__.py b/lambdasoc/periph/__init__.py new file mode 100644 index 0000000..9b5ed21 --- /dev/null +++ b/lambdasoc/periph/__init__.py @@ -0,0 +1 @@ +from .base import * diff --git a/lambdasoc/periph/_event.py b/lambdasoc/periph/_event.py new file mode 100644 index 0000000..53df4aa --- /dev/null +++ b/lambdasoc/periph/_event.py @@ -0,0 +1,135 @@ +from nmigen import * +from nmigen import tracer + +from nmigen_soc import csr + + +__all__ = ["EventSource", "IRQLine", "InterruptSource"] + + +class EventSource: + """Event source. + + Parameters + ---------- + mode : ``"level"``, ``"rise"``, ``"fall"`` + Trigger mode. If ``"level"``, a notification is raised when the ``stb`` signal is high. + If ``"rise"`` (or ``"fall"``) a notification is raised on a rising (or falling) edge + of ``stb``. + name : str + Name of the event. If ``None`` (default) the name is inferred from the variable + name this event source is assigned to. + + Attributes + ---------- + name : str + Name of the event + mode : ``"level"``, ``"rise"``, ``"fall"`` + Trigger mode. + stb : Signal, in + Event strobe. + """ + def __init__(self, *, mode="level", name=None, src_loc_at=0): + if name is not None and not isinstance(name, str): + raise TypeError("Name must be a string, not {!r}".format(name)) + + choices = ("level", "rise", "fall") + if mode not in choices: + raise ValueError("Invalid trigger mode {!r}; must be one of {}" + .format(mode, ", ".join(choices))) + + self.name = name or tracer.get_var_name(depth=2 + src_loc_at) + self.mode = mode + self.stb = Signal(name="{}_stb".format(self.name)) + + +class IRQLine(Signal): + """Interrupt request line.""" + def __init__(self, *, name=None, src_loc_at=0): + super().__init__(name=name, src_loc_at=1 + src_loc_at) + + __hash__ = object.__hash__ + + +class InterruptSource(Elaboratable): + """Interrupt source. + + A mean of gathering multiple event sources into a single interrupt request line. + + Parameters + ---------- + events : iter(:class:`EventSource`) + Event sources. + name : str + Name of the interrupt source. If ``None`` (default) the name is inferred from the + variable name this interrupt source is assigned to. + + Attributes + ---------- + name : str + Name of the interrupt source. + status : :class:`csr.Element`, read-only + Event status register. Each bit displays the level of the strobe of an event source. + Events are ordered by position in the `events` parameter. + pending : :class:`csr.Element`, read/write + Event pending register. If a bit is 1, the associated event source has a pending + notification. Writing 1 to a bit clears it. + Events are ordered by position in the `events` parameter. + enable : :class:`csr.Element`, read/write + Event enable register. Writing 1 to a bit enables its associated event source. + Writing 0 disables it. + Events are ordered by position in the `events` parameter. + irq : :class:`IRQLine`, out + Interrupt request. It is raised if any event source is enabled and has a pending + notification. + """ + def __init__(self, events, *, name=None, src_loc_at=0): + if name is not None and not isinstance(name, str): + raise TypeError("Name must be a string, not {!r}".format(name)) + self.name = name or tracer.get_var_name(depth=2 + src_loc_at) + + for event in events: + if not isinstance(event, EventSource): + raise TypeError("Event source must be an instance of EventSource, not {!r}" + .format(event)) + self._events = list(events) + + width = len(events) + self.status = csr.Element(width, "r", name="{}_status".format(self.name)) + self.pending = csr.Element(width, "rw", name="{}_pending".format(self.name)) + self.enable = csr.Element(width, "rw", name="{}_enable".format(self.name)) + + self.irq = IRQLine(name="{}_irq".format(self.name)) + + def elaborate(self, platform): + m = Module() + + with m.If(self.pending.w_stb): + m.d.sync += self.pending.r_data.eq(self.pending.r_data & ~self.pending.w_data) + + with m.If(self.enable.w_stb): + m.d.sync += self.enable.r_data.eq(self.enable.w_data) + + for i, event in enumerate(self._events): + m.d.sync += self.status.r_data[i].eq(event.stb) + + if event.mode in ("rise", "fall"): + event_stb_r = Signal.like(event.stb, name_suffix="_r") + m.d.sync += event_stb_r.eq(event.stb) + + event_trigger = Signal(name="{}_trigger".format(event.name)) + if event.mode == "level": + m.d.comb += event_trigger.eq(event.stb) + elif event.mode == "rise": + m.d.comb += event_trigger.eq(~event_stb_r & event.stb) + elif event.mode == "fall": + m.d.comb += event_trigger.eq(event_stb_r & ~event.stb) + else: + assert False # :nocov: + + with m.If(event_trigger): + m.d.sync += self.pending.r_data[i].eq(1) + + m.d.comb += self.irq.eq((self.pending.r_data & self.enable.r_data).any()) + + return m diff --git a/lambdasoc/periph/base.py b/lambdasoc/periph/base.py new file mode 100644 index 0000000..6101946 --- /dev/null +++ b/lambdasoc/periph/base.py @@ -0,0 +1,358 @@ +from nmigen import * +from nmigen import tracer +from nmigen.utils import log2_int + +from nmigen_soc import csr, wishbone +from nmigen_soc.memory import MemoryMap +from nmigen_soc.csr.wishbone import WishboneCSRBridge + + +from ._event import * + + +__all__ = ["Peripheral", "CSRBank", "PeripheralBridge"] + + +class Peripheral: + """Wishbone peripheral. + + A helper class to reduce the boilerplate needed to control a peripheral with a Wishbone interface. + It provides facilities for instantiating CSR registers, requesting windows to subordinate busses + and sending interrupt requests to the CPU. + + The ``Peripheral`` class is not meant to be instantiated as-is, but rather as a base class for + actual peripherals. + + Usage example + ------------- + + ``` + class ExamplePeripheral(Peripheral, Elaboratable): + def __init__(self): + super().__init__() + bank = self.csr_bank() + self._foo = bank.csr(8, "r") + self._bar = bank.csr(8, "w") + + self._rdy = self.event(mode="rise") + + self._bridge = self.bridge(data_width=32, granularity=8, alignment=2) + self.bus = self._bridge.bus + self.irq = self._bridge.irq + + def elaborate(self, platform): + m = Module() + m.submodules.bridge = self._bridge + # ... + return m + ``` + + Arguments + --------- + name : str + Name of this peripheral. If ``None`` (default) the name is inferred from the variable + name this peripheral is assigned to. + + Properties + ---------- + name : str + Name of the peripheral. + """ + def __init__(self, name=None, src_loc_at=1): + if name is not None and not isinstance(name, str): + raise TypeError("Name must be a string, not {!r}".format(name)) + self.name = name or tracer.get_var_name(depth=2 + src_loc_at).lstrip("_") + + self._csr_banks = [] + self._windows = [] + self._events = [] + + self._bus = None + self._irq = None + + @property + def bus(self): + """Wishbone bus interface. + + Return value + ------------ + An instance of :class:`Interface`. + + Exceptions + ---------- + Raises :exn:`NotImplementedError` if the peripheral does not have a Wishbone bus. + """ + if self._bus is None: + raise NotImplementedError("Peripheral {!r} does not have a bus interface" + .format(self)) + return self._bus + + @bus.setter + def bus(self, bus): + if not isinstance(bus, wishbone.Interface): + raise TypeError("Bus interface must be an instance of wishbone.Interface, not {!r}" + .format(bus)) + self._bus = bus + + @property + def irq(self): + """Interrupt request line. + + Return value + ------------ + An instance of :class:`IRQLine`. + + Exceptions + ---------- + Raises :exn:`NotImplementedError` if the peripheral does not have an IRQ line. + """ + if self._irq is None: + raise NotImplementedError("Peripheral {!r} does not have an IRQ line" + .format(self)) + return self._irq + + @irq.setter + def irq(self, irq): + if not isinstance(irq, IRQLine): + raise TypeError("IRQ line must be an instance of IRQLine, not {!r}" + .format(irq)) + self._irq = irq + + def csr_bank(self, *, addr=None, alignment=None): + """Request a CSR bank. + + Arguments + --------- + addr : int or None + Address of the bank. If ``None``, the implicit next address will be used. + Otherwise, the exact specified address (which must be a multiple of + ``2 ** max(alignment, bridge_alignment)``) will be used. + alignment : int or None + Alignment of the bank. If not specified, the bridge alignment is used. + See :class:`nmigen_soc.csr.Multiplexer` for details. + + Return value + ------------ + An instance of :class:`CSRBank`. + """ + bank = CSRBank(name_prefix=self.name) + self._csr_banks.append((bank, addr, alignment)) + return bank + + def window(self, *, addr_width, data_width, granularity=None, features=frozenset(), + alignment=0, addr=None, sparse=None): + """Request a window to a subordinate bus. + + See :meth:`nmigen_soc.wishbone.Decoder.add` for details. + + Return value + ------------ + An instance of :class:`nmigen_soc.wishbone.Interface`. + """ + window = wishbone.Interface(addr_width=addr_width, data_width=data_width, + granularity=granularity, features=features) + granularity_bits = log2_int(data_width // window.granularity) + window.memory_map = MemoryMap(addr_width=addr_width + granularity_bits, + data_width=window.granularity, alignment=alignment) + self._windows.append((window, addr, sparse)) + return window + + def event(self, *, mode="level", name=None, src_loc_at=0): + """Request an event source. + + See :class:`EventSource` for details. + + Return value + ------------ + An instance of :class:`EventSource`. + """ + event = EventSource(mode=mode, name=name, src_loc_at=1 + src_loc_at) + self._events.append(event) + return event + + def bridge(self, *, data_width=8, granularity=None, features=frozenset(), alignment=0): + """Request a bridge to the resources of the peripheral. + + See :class:`PeripheralBridge` for details. + + Return value + ------------ + A :class:`PeripheralBridge` providing access to local resources. + """ + return PeripheralBridge(self, data_width=data_width, granularity=granularity, + features=features, alignment=alignment) + + def iter_csr_banks(self): + """Iterate requested CSR banks and their parameters. + + Yield values + ------------ + A tuple ``bank, addr, alignment`` describing the bank and its parameters. + """ + for bank, addr, alignment in self._csr_banks: + yield bank, addr, alignment + + def iter_windows(self): + """Iterate requested windows and their parameters. + + Yield values + ------------ + A tuple ``window, addr, sparse`` descr + given to :meth:`Peripheral.window`. + """ + for window, addr, sparse in self._windows: + yield window, addr, sparse + + def iter_events(self): + """Iterate requested event sources. + + Yield values + ------------ + An instance of :class:`EventSource`. + """ + for event in self._events: + yield event + + +class CSRBank: + """CSR register bank. + + Parameters + ---------- + name_prefix : str + Name prefix of the bank registers. + """ + def __init__(self, *, name_prefix=""): + self._name_prefix = name_prefix + self._csr_regs = [] + + def csr(self, width, access, *, addr=None, alignment=None, name=None, + src_loc_at=0): + """Request a CSR register. + + Parameters + ---------- + width : int + Width of the register. See :class:`nmigen_soc.csr.Element`. + access : :class:`Access` + Register access mode. See :class:`nmigen_soc.csr.Element`. + addr : int + Address of the register. See :meth:`nmigen_soc.csr.Multiplexer.add`. + alignment : int + Register alignment. See :class:`nmigen_soc.csr.Multiplexer`. + name : str + Name of the register. If ``None`` (default) the name is inferred from the variable + name this register is assigned to. + + Return value + ------------ + An instance of :class:`nmigen_soc.csr.Element`. + """ + if name is not None and not isinstance(name, str): + raise TypeError("Name must be a string, not {!r}".format(name)) + name = name or tracer.get_var_name(depth=2 + src_loc_at).lstrip("_") + + elem_name = "{}_{}".format(self._name_prefix, name) + elem = csr.Element(width, access, name=elem_name) + self._csr_regs.append((elem, addr, alignment)) + return elem + + def iter_csr_regs(self): + """Iterate requested CSR registers and their parameters. + + Yield values + ------------ + A tuple ``elem, addr, alignment`` describing the register and its parameters. + """ + for elem, addr, alignment in self._csr_regs: + yield elem, addr, alignment + + +class PeripheralBridge(Elaboratable): + """Peripheral bridge. + + A bridge providing access to the registers and windows of a peripheral, and support for + interrupt requests from its event sources. + + Event managment is performed by an :class:`InterruptSource` submodule. + + Parameters + --------- + periph : :class:`Peripheral` + The peripheral whose resources are exposed by this bridge. + data_width : int + Data width. See :class:`nmigen_soc.wishbone.Interface`. + granularity : int or None + Granularity. See :class:`nmigen_soc.wishbone.Interface`. + features : iter(str) + Optional signal set. See :class:`nmigen_soc.wishbone.Interface`. + alignment : int + Resource alignment. See :class:`nmigen_soc.memory.MemoryMap`. + + Attributes + ---------- + bus : :class:`nmigen_soc.wishbone.Interface` + Wishbone bus providing access to the resources of the peripheral. + irq : :class:`IRQLine`, out + Interrupt request. It is raised if any event source is enabled and has a pending + notification. + """ + def __init__(self, periph, *, data_width, granularity, features, alignment): + if not isinstance(periph, Peripheral): + raise TypeError("Peripheral must be an instance of Peripheral, not {!r}" + .format(periph)) + + self._wb_decoder = wishbone.Decoder(addr_width=1, data_width=data_width, + granularity=granularity, + features=features, alignment=alignment) + + self._csr_subs = [] + + for bank, bank_addr, bank_alignment in periph.iter_csr_banks(): + if bank_alignment is None: + bank_alignment = alignment + csr_mux = csr.Multiplexer(addr_width=1, data_width=8, alignment=bank_alignment) + for elem, elem_addr, elem_alignment in bank.iter_csr_regs(): + if elem_alignment is None: + elem_alignment = alignment + csr_mux.add(elem, addr=elem_addr, alignment=elem_alignment, extend=True) + + csr_bridge = WishboneCSRBridge(csr_mux.bus, data_width=data_width) + self._wb_decoder.add(csr_bridge.wb_bus, addr=bank_addr, extend=True) + self._csr_subs.append((csr_mux, csr_bridge)) + + for window, window_addr, window_sparse in periph.iter_windows(): + self._wb_decoder.add(window, addr=window_addr, sparse=window_sparse, extend=True) + + events = list(periph.iter_events()) + if len(events) > 0: + self._int_src = InterruptSource(events, name="{}_ev".format(periph.name)) + self.irq = self._int_src.irq + + csr_mux = csr.Multiplexer(addr_width=1, data_width=8, alignment=alignment) + csr_mux.add(self._int_src.status, extend=True) + csr_mux.add(self._int_src.pending, extend=True) + csr_mux.add(self._int_src.enable, extend=True) + + csr_bridge = WishboneCSRBridge(csr_mux.bus, data_width=data_width) + self._wb_decoder.add(csr_bridge.wb_bus, extend=True) + self._csr_subs.append((csr_mux, csr_bridge)) + else: + self._int_src = None + self.irq = None + + self.bus = self._wb_decoder.bus + + def elaborate(self, platform): + m = Module() + + for i, (csr_mux, csr_bridge) in enumerate(self._csr_subs): + m.submodules[ "csr_mux_{}".format(i)] = csr_mux + m.submodules["csr_bridge_{}".format(i)] = csr_bridge + + if self._int_src is not None: + m.submodules._int_src = self._int_src + + m.submodules.wb_decoder = self._wb_decoder + + return m diff --git a/lambdasoc/test/_wishbone.py b/lambdasoc/test/_wishbone.py new file mode 100644 index 0000000..c7ab28c --- /dev/null +++ b/lambdasoc/test/_wishbone.py @@ -0,0 +1,34 @@ +def wb_read(bus, addr, sel, timeout=32): + yield bus.cyc.eq(1) + yield bus.stb.eq(1) + yield bus.adr.eq(addr) + yield bus.sel.eq(sel) + yield + cycles = 0 + while not (yield bus.ack): + yield + if cycles >= timeout: + raise RuntimeError("Wishbone transaction timed out") + cycles += 1 + data = (yield bus.dat_r) + yield bus.cyc.eq(0) + yield bus.stb.eq(0) + return data + +def wb_write(bus, addr, data, sel, timeout=32): + yield bus.cyc.eq(1) + yield bus.stb.eq(1) + yield bus.adr.eq(addr) + yield bus.we.eq(1) + yield bus.sel.eq(sel) + yield bus.dat_w.eq(data) + yield + cycles = 0 + while not (yield bus.ack): + yield + if cycles >= timeout: + raise RuntimeError("Wishbone transaction timed out") + cycles += 1 + yield bus.cyc.eq(0) + yield bus.stb.eq(0) + yield bus.we.eq(0) diff --git a/lambdasoc/test/test_periph_base.py b/lambdasoc/test/test_periph_base.py new file mode 100644 index 0000000..925f49d --- /dev/null +++ b/lambdasoc/test/test_periph_base.py @@ -0,0 +1,229 @@ +# nmigen: UnusedElaboratable=no + +import unittest +from nmigen import * +from nmigen.back.pysim import * + +from ._wishbone import * +from ..periph.base import Peripheral, CSRBank, PeripheralBridge + + +def simulation_test(dut, process): + with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim: + sim.add_clock(1e-6) + sim.add_sync_process(process) + sim.run() + + +class PeripheralTestCase(unittest.TestCase): + def test_name(self): + class Wrapper(Peripheral): + def __init__(self): + super().__init__() + periph_0 = Wrapper() + periph_1 = Peripheral(name="periph_1") + self.assertEqual(periph_0.name, "periph_0") + self.assertEqual(periph_1.name, "periph_1") + + def test_periph_name_wrong(self): + with self.assertRaisesRegex(TypeError, + r"Name must be a string, not 2"): + periph = Peripheral(name=2) + + def test_set_bus_wrong(self): + periph = Peripheral(src_loc_at=0) + with self.assertRaisesRegex(TypeError, + r"Bus interface must be an instance of wishbone.Interface, not 'foo'"): + periph.bus = "foo" + + def test_get_bus_wrong(self): + periph = Peripheral(src_loc_at=0) + with self.assertRaisesRegex(NotImplementedError, + r"Peripheral <.*> does not have a bus interface"): + periph.bus + + def test_set_irq_wrong(self): + periph = Peripheral(src_loc_at=0) + with self.assertRaisesRegex(TypeError, + r"IRQ line must be an instance of IRQLine, not 'foo'"): + periph.irq = "foo" + + def test_get_irq_wrong(self): + periph = Peripheral(src_loc_at=0) + with self.assertRaisesRegex(NotImplementedError, + r"Peripheral <.*> does not have an IRQ line"): + periph.irq + + def test_iter_csr_banks(self): + periph = Peripheral(src_loc_at=0) + bank_0 = periph.csr_bank() + bank_1 = periph.csr_bank(addr=0x4, alignment=2) + self.assertEqual(list(periph.iter_csr_banks()), [ + (bank_0, None, None), + (bank_1, 0x4, 2), + ]) + + def test_iter_windows(self): + periph = Peripheral(src_loc_at=0) + win_0 = periph.window(addr_width=2, data_width=8) + win_1 = periph.window(addr_width=4, data_width=8, addr=0x4, sparse=True) + self.assertEqual(list(periph.iter_windows()), [ + (win_0, None, None), + (win_1, 0x4, True), + ]) + + def test_iter_events(self): + periph = Peripheral(src_loc_at=0) + ev_0 = periph.event() + ev_1 = periph.event(mode="rise") + self.assertEqual((ev_0.name, ev_0.mode), ("ev_0", "level")) + self.assertEqual((ev_1.name, ev_1.mode), ("ev_1", "rise")) + self.assertEqual(list(periph.iter_events()), [ + ev_0, + ev_1, + ]) + + +class CSRBankTestCase(unittest.TestCase): + def test_csr_name(self): + bank = CSRBank(name_prefix="foo") + bar = bank.csr(1, "r") + self.assertEqual(bar.name, "foo_bar") + + def test_csr_name_wrong(self): + bank = CSRBank() + with self.assertRaisesRegex(TypeError, + r"Name must be a string, not 2"): + bank.csr(1, "r", name=2) + + def test_iter_csr_regs(self): + bank = CSRBank() + csr_0 = bank.csr(1, "r") + csr_1 = bank.csr(8, "rw", addr=0x4, alignment=2) + self.assertEqual(list(bank.iter_csr_regs()), [ + (csr_0, None, None), + (csr_1, 0x4, 2), + ]) + + +class PeripheralBridgeTestCase(unittest.TestCase): + def test_periph_wrong(self): + with self.assertRaisesRegex(TypeError, + r"Peripheral must be an instance of Peripheral, not 'foo'"): + PeripheralBridge('foo', data_width=8, granularity=8, features=(), alignment=0) + + +class PeripheralSimulationTestCase(unittest.TestCase): + def test_csrs(self): + class DummyPeripheral(Peripheral, Elaboratable): + def __init__(self): + super().__init__() + bank = self.csr_bank(addr=0x100) + self._csr_0 = bank.csr(8, "r") + self._csr_1 = bank.csr(8, "r", addr=0x8, alignment=4) + self._csr_2 = bank.csr(8, "rw") + + self.win_0 = self.window(addr_width=1, data_width=8, sparse=True, addr=0x000) + self.win_1 = self.window(addr_width=1, data_width=32, granularity=8, addr=0x200) + + self._bridge = self.bridge(data_width=32, granularity=8, alignment=2) + self.bus = self._bridge.bus + + def elaborate(self, platform): + m = Module() + m.submodules.bridge = self._bridge + m.d.comb += [ + self._csr_0.r_data.eq(0xa), + self._csr_1.r_data.eq(0xb), + ] + with m.If(self._csr_2.w_stb): + m.d.sync += self._csr_2.r_data.eq(self._csr_2.w_data) + return m + + dut = DummyPeripheral() + + def process(): + self.assertEqual((yield from wb_read(dut.bus, addr=0x100 >> 2, sel=0xf)), 0xa) + yield + self.assertEqual((yield from wb_read(dut.bus, addr=0x108 >> 2, sel=0xf)), 0xb) + yield + yield from wb_write(dut.bus, addr=0x118 >> 2, data=0xc, sel=0xf) + yield + self.assertEqual((yield from wb_read(dut.bus, addr=0x118 >> 2, sel=0xf)), 0xc) + yield + + yield dut.bus.cyc.eq(1) + yield dut.bus.adr.eq(0x000 >> 2) + yield Delay(1e-7) + self.assertEqual((yield dut.win_0.cyc), 1) + + yield dut.bus.adr.eq(0x200 >> 2) + yield Delay(1e-7) + self.assertEqual((yield dut.win_1.cyc), 1) + + simulation_test(dut, process) + + def test_events(self): + class DummyPeripheral(Peripheral, Elaboratable): + def __init__(self): + super().__init__() + self.ev_0 = self.event() + self.ev_1 = self.event(mode="rise") + self.ev_2 = self.event(mode="fall") + self._bridge = self.bridge(data_width=8) + self.bus = self._bridge.bus + self.irq = self._bridge.irq + + def elaborate(self, platform): + m = Module() + m.submodules.bridge = self._bridge + return m + + dut = DummyPeripheral() + + ev_status_addr = 0x0 + ev_pending_addr = 0x1 + ev_enable_addr = 0x2 + + def process(): + yield dut.ev_0.stb.eq(1) + yield dut.ev_1.stb.eq(0) + yield dut.ev_2.stb.eq(1) + yield + self.assertEqual((yield dut.irq), 0) + self.assertEqual((yield from wb_read(dut.bus, ev_status_addr, sel=0xf)), 0b101) + yield + + yield from wb_write(dut.bus, ev_enable_addr, data=0b111, sel=0xf) + yield + self.assertEqual((yield dut.irq), 1) + + yield from wb_write(dut.bus, ev_pending_addr, data=0b001, sel=0xf) + yield + self.assertEqual((yield from wb_read(dut.bus, ev_pending_addr, sel=0xf)), 0b001) + yield + self.assertEqual((yield dut.irq), 1) + yield dut.ev_0.stb.eq(0) + yield from wb_write(dut.bus, ev_pending_addr, data=0b001, sel=0xf) + yield + self.assertEqual((yield dut.irq), 0) + + yield dut.ev_1.stb.eq(1) + yield + self.assertEqual((yield from wb_read(dut.bus, ev_pending_addr, sel=0xf)), 0b010) + yield + self.assertEqual((yield dut.irq), 1) + yield from wb_write(dut.bus, ev_pending_addr, data=0b010, sel=0xf) + yield + self.assertEqual((yield dut.irq), 0) + + yield dut.ev_2.stb.eq(0) + yield + self.assertEqual((yield from wb_read(dut.bus, ev_pending_addr, sel=0xf)), 0b100) + yield + self.assertEqual((yield dut.irq), 1) + yield from wb_write(dut.bus, ev_pending_addr, data=0b100, sel=0xf) + yield + self.assertEqual((yield dut.irq), 0) + + simulation_test(dut, process) diff --git a/lambdasoc/test/test_periph_event.py b/lambdasoc/test/test_periph_event.py new file mode 100644 index 0000000..4934af7 --- /dev/null +++ b/lambdasoc/test/test_periph_event.py @@ -0,0 +1,138 @@ +# nmigen: UnusedElaboratable=no + +import unittest +from nmigen import * +from nmigen.back.pysim import * + +from ..periph._event import * + + +def simulation_test(dut, process): + with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim: + sim.add_clock(1e-6) + sim.add_sync_process(process) + sim.run() + + +class EventSourceTestCase(unittest.TestCase): + def test_simple(self): + ev = EventSource() + self.assertEqual(ev.name, "ev") + self.assertEqual(ev.mode, "level") + + def test_name_wrong(self): + with self.assertRaisesRegex(TypeError, + r"Name must be a string, not 2"): + EventSource(name=2) + + def test_mode_wrong(self): + with self.assertRaisesRegex(ValueError, + r"Invalid trigger mode 'foo'; must be one of level, rise, fall"): + ev = EventSource(mode="foo") + + +class InterruptSourceTestCase(unittest.TestCase): + def test_simple(self): + ev_0 = EventSource() + ev_1 = EventSource() + dut = InterruptSource((ev_0, ev_1)) + self.assertEqual(dut.name, "dut") + self.assertEqual(dut.status.width, 2) + self.assertEqual(dut.pending.width, 2) + self.assertEqual(dut.enable.width, 2) + + def test_name_wrong(self): + with self.assertRaisesRegex(TypeError, + r"Name must be a string, not 2"): + InterruptSource((), name=2) + + def test_event_wrong(self): + with self.assertRaisesRegex(TypeError, + r"Event source must be an instance of EventSource, not 'foo'"): + dut = InterruptSource(("foo",)) + + def test_events(self): + ev_0 = EventSource(mode="level") + ev_1 = EventSource(mode="rise") + ev_2 = EventSource(mode="fall") + dut = InterruptSource((ev_0, ev_1, ev_2)) + + def process(): + yield ev_0.stb.eq(1) + yield ev_1.stb.eq(0) + yield ev_2.stb.eq(1) + yield + self.assertEqual((yield dut.irq), 0) + + yield dut.status.r_stb.eq(1) + yield + yield dut.status.r_stb.eq(0) + yield + self.assertEqual((yield dut.status.r_data), 0b101) + yield + + yield dut.enable.w_stb.eq(1) + yield dut.enable.w_data.eq(0b111) + yield + yield dut.enable.w_stb.eq(0) + yield + yield + self.assertEqual((yield dut.irq), 1) + + yield dut.pending.w_stb.eq(1) + yield dut.pending.w_data.eq(0b001) + yield + yield dut.pending.w_stb.eq(0) + yield + + yield dut.pending.r_stb.eq(1) + yield + yield dut.pending.r_stb.eq(0) + yield + self.assertEqual((yield dut.pending.r_data), 0b001) + self.assertEqual((yield dut.irq), 1) + yield + + yield ev_0.stb.eq(0) + yield dut.pending.w_stb.eq(1) + yield dut.pending.w_data.eq(0b001) + yield + yield dut.pending.w_stb.eq(0) + yield + yield + self.assertEqual((yield dut.irq), 0) + + yield ev_1.stb.eq(1) + yield dut.pending.r_stb.eq(1) + yield + yield dut.pending.r_stb.eq(0) + yield + self.assertEqual((yield dut.pending.r_data), 0b010) + self.assertEqual((yield dut.irq), 1) + + yield dut.pending.w_stb.eq(1) + yield dut.pending.w_data.eq(0b010) + yield + yield dut.pending.w_stb.eq(0) + yield + yield + self.assertEqual((yield dut.irq), 0) + + yield ev_2.stb.eq(0) + yield + yield dut.pending.r_stb.eq(1) + yield + yield dut.pending.r_stb.eq(0) + yield + self.assertEqual((yield dut.pending.r_data), 0b100) + self.assertEqual((yield dut.irq), 1) + + yield dut.pending.w_stb.eq(1) + yield dut.pending.w_data.eq(0b100) + yield + yield dut.pending.w_stb.eq(0) + yield + yield + self.assertEqual((yield dut.irq), 0) + + simulation_test(dut, process)