periph.serial: add AsyncSerialPeripheral
authorJean-François Nguyen <jf@lambdaconcept.com>
Wed, 25 Mar 2020 14:51:57 +0000 (15:51 +0100)
committerJean-François Nguyen <jf@lambdaconcept.com>
Wed, 25 Mar 2020 14:51:57 +0000 (15:51 +0100)
lambdasoc/periph/serial.py [new file with mode: 0644]
lambdasoc/test/test_periph_serial.py [new file with mode: 0644]

diff --git a/lambdasoc/periph/serial.py b/lambdasoc/periph/serial.py
new file mode 100644 (file)
index 0000000..5176c13
--- /dev/null
@@ -0,0 +1,123 @@
+from nmigen import *
+from nmigen.lib.fifo import SyncFIFO
+
+from nmigen_stdio.serial import AsyncSerial
+
+from . import Peripheral
+
+
+__all__ = ["AsyncSerialPeripheral"]
+
+
+class AsyncSerialPeripheral(Peripheral, Elaboratable):
+    """Asynchronous serial transceiver peripheral.
+
+    See :class:`nmigen_stdio.serial.AsyncSerial` for details.
+
+    CSR registers
+    -------------
+    divisor : read/write
+        Clock divisor.
+    rx_data : read-only
+        Receiver data.
+    rx_rdy : read-only
+        Receiver ready. The receiver FIFO is non-empty.
+    rx_err : read-only
+        Receiver error flags. See :class:`nmigen_stdio.serial.AsyncSerialRX` for layout.
+    tx_data : write-only
+        Transmitter data.
+    tx_rdy : read-only
+        Transmitter ready. The transmitter FIFO is non-full.
+
+    Events
+    ------
+    rx_rdy : level-triggered
+        Receiver ready. The receiver FIFO is non-empty.
+    rx_err : edge-triggered (rising)
+        Receiver error. Error cause is available in the ``rx_err`` register.
+    tx_mty : edge-triggered (rising)
+        Transmitter empty. The transmitter FIFO is empty.
+
+    Parameters
+    ----------
+    rx_depth : int
+        Depth of the receiver FIFO.
+    tx_depth : int
+        Depth of the transmitter FIFO.
+    divisor : int
+        Clock divisor reset value. Should be set to ``int(clk_frequency // baudrate)``.
+    divisor_bits : int
+        Optional. Clock divisor width. If omitted, ``bits_for(divisor)`` is used instead.
+    data_bits : int
+        Data width.
+    parity : ``"none"``, ``"mark"``, ``"space"``, ``"even"``, ``"odd"``
+        Parity mode.
+    pins : :class:`Record`
+        Optional. UART pins. See :class:`nmigen_boards.resources.UARTResource`.
+
+    Attributes
+    ----------
+    bus : :class:`nmigen_soc.wishbone.Interface`
+        Wishbone bus interface.
+    irq : :class:`IRQLine`
+        Interrupt request line.
+    """
+    def __init__(self, *, rx_depth=16, tx_depth=16, **kwargs):
+        super().__init__()
+
+        self._phy       = AsyncSerial(**kwargs)
+        self._rx_fifo   = SyncFIFO(width=self._phy.rx.data.width, depth=rx_depth)
+        self._tx_fifo   = SyncFIFO(width=self._phy.tx.data.width, depth=tx_depth)
+
+        bank            = self.csr_bank()
+        self._divisor   = bank.csr(self._phy.divisor.width, "rw")
+        self._rx_data   = bank.csr(self._phy.rx.data.width, "r")
+        self._rx_rdy    = bank.csr(1, "r")
+        self._rx_err    = bank.csr(len(self._phy.rx.err),   "r")
+        self._tx_data   = bank.csr(self._phy.tx.data.width, "w")
+        self._tx_rdy    = bank.csr(1, "r")
+
+        self._rx_rdy_ev = self.event(mode="level")
+        self._rx_err_ev = self.event(mode="rise")
+        self._tx_mty_ev = self.event(mode="rise")
+
+        self._bridge    = self.bridge(data_width=32, granularity=8, alignment=2)
+        self.bus        = self._bridge.bus
+        self.irq        = self._bridge.irq
+
+    def elaborate(self, platform):
+        m = Module()
+        m.submodules.bridge  = self._bridge
+
+        m.submodules.phy     = self._phy
+        m.submodules.rx_fifo = self._rx_fifo
+        m.submodules.tx_fifo = self._tx_fifo
+
+        m.d.comb += self._divisor.r_data.eq(self._phy.divisor)
+        with m.If(self._divisor.w_stb):
+            m.d.sync += self._phy.divisor.eq(self._divisor.w_data)
+
+        m.d.comb += [
+            self._rx_data.r_data.eq(self._rx_fifo.r_data),
+            self._rx_fifo.r_en.eq(self._rx_data.r_stb),
+            self._rx_rdy.r_data.eq(self._rx_fifo.r_rdy),
+
+            self._rx_fifo.w_data.eq(self._phy.rx.data),
+            self._rx_fifo.w_en.eq(self._phy.rx.rdy),
+            self._phy.rx.ack.eq(self._rx_fifo.w_rdy),
+            self._rx_err.r_data.eq(self._phy.rx.err),
+
+            self._tx_fifo.w_en.eq(self._tx_data.w_stb),
+            self._tx_fifo.w_data.eq(self._tx_data.w_data),
+            self._tx_rdy.r_data.eq(self._tx_fifo.w_rdy),
+
+            self._phy.tx.data.eq(self._tx_fifo.r_data),
+            self._phy.tx.ack.eq(self._tx_fifo.r_rdy),
+            self._tx_fifo.r_en.eq(self._phy.tx.rdy),
+
+            self._rx_rdy_ev.stb.eq(self._rx_fifo.r_rdy),
+            self._rx_err_ev.stb.eq(self._phy.rx.err.any()),
+            self._tx_mty_ev.stb.eq(~self._tx_fifo.r_rdy),
+        ]
+
+        return m
diff --git a/lambdasoc/test/test_periph_serial.py b/lambdasoc/test/test_periph_serial.py
new file mode 100644 (file)
index 0000000..48bc126
--- /dev/null
@@ -0,0 +1,57 @@
+import unittest
+
+from nmigen import *
+from nmigen.lib.io import pin_layout
+from nmigen.back.pysim import *
+
+from ._wishbone import *
+from ..periph.serial import AsyncSerialPeripheral
+
+
+divisor_addr    = 0x00 >> 2
+rx_data_addr    = 0x04 >> 2
+rx_rdy_addr     = 0x08 >> 2
+rx_err_addr     = 0x0c >> 2
+tx_data_addr    = 0x10 >> 2
+tx_rdy_addr     = 0x14 >> 2
+ev_status_addr  = 0x20 >> 2
+ev_pending_addr = 0x24 >> 2
+ev_enable_addr  = 0x28 >> 2
+
+
+class AsyncSerialPeripheralTestCase(unittest.TestCase):
+    def test_loopback(self):
+        pins = Record([("rx", pin_layout(1, dir="i")),
+                       ("tx", pin_layout(1, dir="o"))])
+        dut = AsyncSerialPeripheral(divisor=5, pins=pins)
+        m = Module()
+        m.submodules.serial = dut
+        m.d.comb += pins.rx.i.eq(pins.tx.o)
+
+        def process():
+            # enable rx_rdy event
+            yield from wb_write(dut.bus, addr=ev_enable_addr, data=0b001, sel=0xf)
+            yield
+
+            tx_rdy = yield from wb_read(dut.bus, addr=tx_rdy_addr, sel=0xf)
+            self.assertEqual(tx_rdy, 1)
+            yield
+
+            yield from wb_write(dut.bus, addr=tx_data_addr, data=0xab, sel=0xf)
+            yield
+
+            for i in range(10):
+                yield
+            self.assertTrue((yield dut.irq))
+
+            rx_rdy = yield from wb_read(dut.bus, addr=rx_rdy_addr, sel=0xf)
+            self.assertEqual(rx_rdy, 1)
+            yield
+            rx_data = yield from wb_read(dut.bus, addr=rx_data_addr, sel=0xf)
+            self.assertEqual(rx_data, 0xab)
+            yield
+
+        with Simulator(m, vcd_file=open("test.vcd", "w")) as sim:
+            sim.add_clock(1e-6)
+            sim.add_sync_process(process)
+            sim.run()