From 4f59a0b73742d9f750af2ace3244176fb096c224 Mon Sep 17 00:00:00 2001 From: whitequark Date: Fri, 25 Oct 2019 23:47:52 +0000 Subject: [PATCH] csr.wishbone: add WishboneCSRBridge. --- nmigen_soc/csr/bus.py | 6 +- nmigen_soc/csr/wishbone.py | 88 +++++++++++ nmigen_soc/test/test_csr_wishbone.py | 216 +++++++++++++++++++++++++++ 3 files changed, 308 insertions(+), 2 deletions(-) create mode 100644 nmigen_soc/csr/wishbone.py create mode 100644 nmigen_soc/test/test_csr_wishbone.py diff --git a/nmigen_soc/csr/bus.py b/nmigen_soc/csr/bus.py index ffb8c3b..31a237d 100644 --- a/nmigen_soc/csr/bus.py +++ b/nmigen_soc/csr/bus.py @@ -201,7 +201,8 @@ class Multiplexer(Elaboratable): CSR bus providing access to registers. """ def __init__(self, *, addr_width, data_width, alignment=0): - self.bus = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment) + self.bus = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment, + name="csr") self._map = self.bus.memory_map def align_to(self, alignment): @@ -305,7 +306,8 @@ class Decoder(Elaboratable): CSR bus providing access to subordinate buses. """ def __init__(self, *, addr_width, data_width, alignment=0): - self.bus = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment) + self.bus = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment, + name="csr") self._map = self.bus.memory_map self._subs = dict() diff --git a/nmigen_soc/csr/wishbone.py b/nmigen_soc/csr/wishbone.py new file mode 100644 index 0000000..f570ece --- /dev/null +++ b/nmigen_soc/csr/wishbone.py @@ -0,0 +1,88 @@ +from nmigen import * +from nmigen.utils import log2_int + +from . import Interface as CSRInterface +from ..wishbone import Interface as WishboneInterface + + +__all__ = ["WishboneCSRBridge"] + + +class WishboneCSRBridge(Elaboratable): + """Wishbone to CSR bridge. + + A bus bridge for accessing CSR registers from Wishbone. This bridge supports any Wishbone + data width greater or equal to CSR data width and performs appropriate address translation. + + Latency + ------- + + Reads and writes always take ``self.data_width // csr_bus.data_width + 1`` cycles to complete, + regardless of the select inputs. Write side effects occur simultaneously with acknowledgement. + + Parameters + ---------- + csr_bus : :class:`..csr.Interface` + CSR bus driven by the bridge. + data_width : int or None + Wishbone bus data width. If not specified, defaults to ``csr_bus.data_width``. + + Attributes + ---------- + wb_bus : :class:`..wishbone.Interface` + Wishbone bus provided by the bridge. + """ + def __init__(self, csr_bus, *, data_width=None): + if not isinstance(csr_bus, CSRInterface): + raise ValueError("CSR bus must be an instance of CSRInterface, not {!r}" + .format(csr_bus)) + if csr_bus.data_width not in (8, 16, 32, 64): + raise ValueError("CSR bus data width must be one of 8, 16, 32, 64, not {!r}" + .format(csr_bus.data_width)) + if data_width is None: + data_width = csr_bus.data_width + + self.csr_bus = csr_bus + self.wb_bus = WishboneInterface( + addr_width=max(0, csr_bus.addr_width - log2_int(data_width // csr_bus.data_width)), + data_width=data_width, + granularity=csr_bus.data_width, + name="wb") + + # Since granularity of the Wishbone interface matches the data width of the CSR bus, + # no width conversion is performed, even if the Wishbone data width is greater. + self.wb_bus.memory_map.add_window(self.csr_bus.memory_map) + + def elaborate(self, platform): + csr_bus = self.csr_bus + wb_bus = self.wb_bus + + m = Module() + + cycle = Signal(range(len(wb_bus.sel) + 1)) + m.d.comb += csr_bus.addr.eq(Cat(cycle[:log2_int(len(wb_bus.sel))], wb_bus.adr)) + + with m.If(wb_bus.cyc & wb_bus.stb): + with m.Switch(cycle): + def segment(index): + return slice(index * wb_bus.granularity, (index + 1) * wb_bus.granularity) + + for index, sel_index in enumerate(wb_bus.sel): + with m.Case(index): + if index > 0: + # CSR reads are registered, and we need to re-register them. + m.d.sync += wb_bus.dat_r[segment(index - 1)].eq(csr_bus.r_data) + m.d.comb += csr_bus.r_stb.eq(sel_index & ~wb_bus.we) + m.d.comb += csr_bus.w_data.eq(wb_bus.dat_w[segment(index)]) + m.d.comb += csr_bus.w_stb.eq(sel_index & wb_bus.we) + m.d.sync += cycle.eq(index + 1) + + with m.Default(): + m.d.sync += wb_bus.dat_r[segment(index)].eq(csr_bus.r_data) + m.d.sync += wb_bus.ack.eq(1) + + with m.Else(): + m.d.sync += cycle.eq(0) + m.d.sync += wb_bus.ack.eq(0) + + return m diff --git a/nmigen_soc/test/test_csr_wishbone.py b/nmigen_soc/test/test_csr_wishbone.py new file mode 100644 index 0000000..e7474c5 --- /dev/null +++ b/nmigen_soc/test/test_csr_wishbone.py @@ -0,0 +1,216 @@ +import unittest +from nmigen import * +from nmigen.back.pysim import * + +from .. import csr +from ..csr.wishbone import * + + +class MockRegister(Elaboratable): + def __init__(self, width): + self.element = csr.Element(width, "rw") + self.r_count = Signal(8) + self.w_count = Signal(8) + self.data = Signal(width) + + def elaborate(self, platform): + m = Module() + + with m.If(self.element.r_stb): + m.d.sync += self.r_count.eq(self.r_count + 1) + m.d.comb += self.element.r_data.eq(self.data) + + with m.If(self.element.w_stb): + m.d.sync += self.w_count.eq(self.w_count + 1) + m.d.sync += self.data.eq(self.element.w_data) + + return m + + +class WishboneCSRBridgeTestCase(unittest.TestCase): + def test_wrong_csr_bus(self): + with self.assertRaisesRegex(ValueError, + r"CSR bus must be an instance of CSRInterface, not 'foo'"): + WishboneCSRBridge(csr_bus="foo") + + def test_wrong_csr_bus_data_width(self): + with self.assertRaisesRegex(ValueError, + r"CSR bus data width must be one of 8, 16, 32, 64, not 7"): + WishboneCSRBridge(csr_bus=csr.Interface(addr_width=10, data_width=7)) + + def test_narrow(self): + mux = csr.Multiplexer(addr_width=10, data_width=8) + reg_1 = MockRegister(8) + mux.add(reg_1.element) + reg_2 = MockRegister(16) + mux.add(reg_2.element) + dut = WishboneCSRBridge(mux.bus) + + def sim_test(): + yield dut.wb_bus.cyc.eq(1) + yield dut.wb_bus.sel.eq(0b1) + + yield dut.wb_bus.we.eq(1) + + yield dut.wb_bus.adr.eq(0) + yield dut.wb_bus.stb.eq(1) + yield dut.wb_bus.dat_w.eq(0x55) + yield + yield + yield dut.wb_bus.stb.eq(0) + yield + self.assertEqual((yield dut.wb_bus.ack), 1) + self.assertEqual((yield reg_1.r_count), 0) + self.assertEqual((yield reg_1.w_count), 1) + self.assertEqual((yield reg_1.data), 0x55) + + yield dut.wb_bus.adr.eq(1) + yield dut.wb_bus.stb.eq(1) + yield dut.wb_bus.dat_w.eq(0xaa) + yield + yield + yield dut.wb_bus.stb.eq(0) + yield + self.assertEqual((yield dut.wb_bus.ack), 1) + self.assertEqual((yield reg_2.r_count), 0) + self.assertEqual((yield reg_2.w_count), 0) + self.assertEqual((yield reg_2.data), 0) + + yield dut.wb_bus.adr.eq(2) + yield dut.wb_bus.stb.eq(1) + yield dut.wb_bus.dat_w.eq(0xbb) + yield + yield + yield dut.wb_bus.stb.eq(0) + yield + self.assertEqual((yield dut.wb_bus.ack), 1) + self.assertEqual((yield reg_2.r_count), 0) + self.assertEqual((yield reg_2.w_count), 1) + self.assertEqual((yield reg_2.data), 0xbbaa) + + yield dut.wb_bus.we.eq(0) + + yield dut.wb_bus.adr.eq(0) + yield dut.wb_bus.stb.eq(1) + yield + yield + yield dut.wb_bus.stb.eq(0) + yield + self.assertEqual((yield dut.wb_bus.ack), 1) + self.assertEqual((yield dut.wb_bus.dat_r), 0x55) + self.assertEqual((yield reg_1.r_count), 1) + self.assertEqual((yield reg_1.w_count), 1) + + yield dut.wb_bus.adr.eq(1) + yield dut.wb_bus.stb.eq(1) + yield + yield + yield dut.wb_bus.stb.eq(0) + yield + self.assertEqual((yield dut.wb_bus.ack), 1) + self.assertEqual((yield dut.wb_bus.dat_r), 0xaa) + self.assertEqual((yield reg_2.r_count), 1) + self.assertEqual((yield reg_2.w_count), 1) + + yield reg_2.data.eq(0x33333) + + yield dut.wb_bus.adr.eq(2) + yield dut.wb_bus.stb.eq(1) + yield + yield + yield dut.wb_bus.stb.eq(0) + yield + self.assertEqual((yield dut.wb_bus.ack), 1) + self.assertEqual((yield dut.wb_bus.dat_r), 0xbb) + self.assertEqual((yield reg_2.r_count), 1) + self.assertEqual((yield reg_2.w_count), 1) + + m = Module() + m.submodules += mux, reg_1, reg_2, dut + with Simulator(m, vcd_file=open("test.vcd", "w")) as sim: + sim.add_clock(1e-6) + sim.add_sync_process(sim_test()) + sim.run() + + def test_wide(self): + mux = csr.Multiplexer(addr_width=10, data_width=8) + reg = MockRegister(32) + mux.add(reg.element) + dut = WishboneCSRBridge(mux.bus, data_width=32) + + def sim_test(): + yield dut.wb_bus.cyc.eq(1) + yield dut.wb_bus.adr.eq(0) + + yield dut.wb_bus.we.eq(1) + + yield dut.wb_bus.dat_w.eq(0x44332211) + yield dut.wb_bus.sel.eq(0b1111) + yield dut.wb_bus.stb.eq(1) + yield + yield + yield + yield + yield + yield dut.wb_bus.stb.eq(0) + yield + self.assertEqual((yield dut.wb_bus.ack), 1) + self.assertEqual((yield reg.r_count), 0) + self.assertEqual((yield reg.w_count), 1) + self.assertEqual((yield reg.data), 0x44332211) + + # partial write + yield dut.wb_bus.dat_w.eq(0xaabbccdd) + yield dut.wb_bus.sel.eq(0b0110) + yield dut.wb_bus.stb.eq(1) + yield + yield + yield + yield + yield + yield dut.wb_bus.stb.eq(0) + yield + self.assertEqual((yield dut.wb_bus.ack), 1) + self.assertEqual((yield reg.r_count), 0) + self.assertEqual((yield reg.w_count), 1) + self.assertEqual((yield reg.data), 0x44332211) + + yield dut.wb_bus.we.eq(0) + + yield dut.wb_bus.sel.eq(0b1111) + yield dut.wb_bus.stb.eq(1) + yield + yield + yield + yield + yield + yield dut.wb_bus.stb.eq(0) + yield + self.assertEqual((yield dut.wb_bus.ack), 1) + self.assertEqual((yield dut.wb_bus.dat_r), 0x44332211) + self.assertEqual((yield reg.r_count), 1) + self.assertEqual((yield reg.w_count), 1) + + yield reg.data.eq(0xaaaaaaaa) + + # partial read + yield dut.wb_bus.sel.eq(0b0110) + yield dut.wb_bus.stb.eq(1) + yield + yield + yield + yield + yield + yield dut.wb_bus.stb.eq(0) + yield + self.assertEqual((yield dut.wb_bus.ack), 1) + self.assertEqual((yield dut.wb_bus.dat_r), 0x00332200) + self.assertEqual((yield reg.r_count), 1) + self.assertEqual((yield reg.w_count), 1) + + m = Module() + m.submodules += mux, reg, dut + with Simulator(m, vcd_file=open("test.vcd", "w")) as sim: + sim.add_clock(1e-6) + sim.add_sync_process(sim_test()) + sim.run() -- 2.30.2