From: Luke Kenneth Casson Leighton Date: Fri, 19 Jun 2020 23:21:44 +0000 (+0100) Subject: autopep8 cleanup X-Git-Tag: 24jan2021_ls180~17 X-Git-Url: https://git.libre-soc.org/?a=commitdiff_plain;h=9b791169dd148c71ca91164077c690c5f4e3b048;p=nmigen-soc.git autopep8 cleanup --- diff --git a/nmigen_soc/csr/bus.py b/nmigen_soc/csr/bus.py index e760197..7bf5694 100644 --- a/nmigen_soc/csr/bus.py +++ b/nmigen_soc/csr/bus.py @@ -16,8 +16,8 @@ class Element(Record): can have more restrictive access mode, e.g. R/O fields can be a part of an R/W register. """ - R = "r" - W = "w" + R = "r" + W = "w" RW = "rw" def readable(self): @@ -56,26 +56,29 @@ class Element(Record): Write strobe. Registers should update their value or perform the write side effect when this strobe is asserted. """ + def __init__(self, width, access, *, name=None, src_loc_at=0): if not isinstance(width, int) or width < 0: raise ValueError("Width must be a non-negative integer, not {!r}" .format(width)) - if not isinstance(access, Element.Access) and access not in ("r", "w", "rw"): - raise ValueError("Access mode must be one of \"r\", \"w\", or \"rw\", not {!r}" + if not isinstance(access, Element.Access) and access not in ( + "r", "w", "rw"): + raise ValueError("Access mode must be one of \"r\", " + "\"w\", or \"rw\", not {!r}" .format(access)) - self.width = width + self.width = width self.access = Element.Access(access) layout = [] if self.access.readable(): layout += [ ("r_data", width), - ("r_stb", 1), + ("r_stb", 1), ] if self.access.writable(): layout += [ ("w_data", width), - ("w_stb", 1), + ("w_stb", 1), ] super().__init__(layout, name=name, src_loc_at=1) @@ -151,15 +154,16 @@ class Interface(Record): .format(data_width)) self.addr_width = addr_width self.data_width = data_width - self.memory_map = MemoryMap(addr_width=addr_width, data_width=data_width, + self.memory_map = MemoryMap(addr_width=addr_width, + data_width=data_width, alignment=alignment) super().__init__([ - ("addr", addr_width), - ("r_data", data_width), - ("r_stb", 1), - ("w_data", data_width), - ("w_stb", 1), + ("addr", addr_width), + ("r_data", data_width), + ("r_stb", 1), + ("w_data", data_width), + ("w_stb", 1), ], name=name, src_loc_at=1) @@ -218,9 +222,12 @@ class Multiplexer(Elaboratable): bus : :class:`Interface` CSR bus providing access to registers. """ + def __init__(self, *, addr_width, data_width, alignment=0): - self.bus = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment, - name="csr") + self.bus = Interface(addr_width=addr_width, + data_width=data_width, + alignment=alignment, + name="csr") self._map = self.bus.memory_map def align_to(self, alignment): @@ -236,11 +243,12 @@ class Multiplexer(Elaboratable): See :meth:`MemoryMap.add_resource` for details. """ if not isinstance(element, Element): - raise TypeError("Element must be an instance of csr.Element, not {!r}" - .format(element)) + raise TypeError("Element must be an instance of csr.Element, " + "not {!r}" .format(element)) size = (element.width + self.bus.data_width - 1) // self.bus.data_width - return self._map.add_resource(element, size=size, addr=addr, alignment=alignment) + return self._map.add_resource( + element, size=size, addr=addr, alignment=alignment) def elaborate(self, platform): m = Module() @@ -255,7 +263,10 @@ class Multiplexer(Elaboratable): for elem, (elem_start, elem_end) in self._map.resources(): shadow = Signal(elem.width, name="{}__shadow".format(elem.name)) if elem.access.readable(): - shadow_en = Signal(elem_end - elem_start, name="{}__shadow_en".format(elem.name)) + shadow_en = Signal( + elem_end - elem_start, + name="{}__shadow_en".format( + elem.name)) m.d.sync += shadow_en.eq(0) if elem.access.writable(): m.d.comb += elem.w_data.eq(shadow) @@ -266,18 +277,22 @@ class Multiplexer(Elaboratable): # carry chains for comparisons, even with a constant. (Register sizes don't have # to be powers of 2.) with m.Switch(self.bus.addr): - for chunk_offset, chunk_addr in enumerate(range(elem_start, elem_end)): - shadow_slice = shadow.word_select(chunk_offset, self.bus.data_width) + for chunk_offset, chunk_addr in enumerate( + range(elem_start, elem_end)): + shadow_slice = shadow.word_select( + chunk_offset, self.bus.data_width) with m.Case(chunk_addr): if elem.access.readable(): - r_data_fanin |= Mux(shadow_en[chunk_offset], shadow_slice, 0) + r_data_fanin |= Mux( + shadow_en[chunk_offset], shadow_slice, 0) if chunk_addr == elem_start: m.d.comb += elem.r_stb.eq(self.bus.r_stb) with m.If(self.bus.r_stb): m.d.sync += shadow.eq(elem.r_data) # Delay by 1 cycle, allowing reads to be pipelined. - m.d.sync += shadow_en.eq(self.bus.r_stb << chunk_offset) + m.d.sync += shadow_en.eq(self.bus.r_stb << + chunk_offset) if elem.access.writable(): if chunk_addr == elem_end - 1: @@ -325,10 +340,13 @@ class Decoder(Elaboratable): bus : :class:`Interface` CSR bus providing access to subordinate buses. """ + def __init__(self, *, addr_width, data_width, alignment=0): - self.bus = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment, - name="csr") - self._map = self.bus.memory_map + self.bus = Interface(addr_width=addr_width, + data_width=data_width, + alignment=alignment, + name="csr") + self._map = self.bus.memory_map self._subs = dict() def align_to(self, alignment): @@ -344,10 +362,11 @@ class Decoder(Elaboratable): See :meth:`MemoryMap.add_resource` for details. """ if not isinstance(sub_bus, Interface): - raise TypeError("Subordinate bus must be an instance of csr.Interface, not {!r}" - .format(sub_bus)) + raise TypeError("Subordinate bus must be an instance of " + "csr.Interface, not {!r}" .format(sub_bus)) if sub_bus.data_width != self.bus.data_width: - raise ValueError("Subordinate bus has data width {}, which is not the same as " + raise ValueError("Subordinate bus has data width {}, " + "which is not the same as " "decoder data width {}" .format(sub_bus.data_width, self.bus.data_width)) self._subs[sub_bus.memory_map] = sub_bus diff --git a/nmigen_soc/csr/wishbone.py b/nmigen_soc/csr/wishbone.py index e20c85d..abc8af6 100644 --- a/nmigen_soc/csr/wishbone.py +++ b/nmigen_soc/csr/wishbone.py @@ -35,6 +35,7 @@ class WishboneCSRBridge(Elaboratable): wb_bus : :class:`..wishbone.Interface` Wishbone bus provided by the bridge. """ + def __init__(self, csr_bus, *, data_width=None): if not isinstance(csr_bus, CSRInterface): raise ValueError("CSR bus must be an instance of CSRInterface, not {!r}" @@ -46,37 +47,48 @@ class WishboneCSRBridge(Elaboratable): data_width = csr_bus.data_width self.csr_bus = csr_bus - self.wb_bus = WishboneInterface( - addr_width=max(0, csr_bus.addr_width - log2_int(data_width // csr_bus.data_width)), + self.wb_bus = WishboneInterface( + addr_width=max( + 0, + csr_bus.addr_width - + log2_int( + data_width // + csr_bus.data_width)), data_width=data_width, granularity=csr_bus.data_width, name="wb") # Since granularity of the Wishbone interface matches the data width of the CSR bus, - # no width conversion is performed, even if the Wishbone data width is greater. + # no width conversion is performed, even if the Wishbone data width is + # greater. self.wb_bus.memory_map.add_window(self.csr_bus.memory_map) def elaborate(self, platform): csr_bus = self.csr_bus - wb_bus = self.wb_bus + wb_bus = self.wb_bus m = Module() cycle = Signal(range(len(wb_bus.sel) + 1)) - m.d.comb += csr_bus.addr.eq(Cat(cycle[:log2_int(len(wb_bus.sel))], wb_bus.adr)) + m.d.comb += csr_bus.addr.eq( + Cat(cycle[:log2_int(len(wb_bus.sel))], wb_bus.adr)) with m.If(wb_bus.cyc & wb_bus.stb): with m.Switch(cycle): def segment(index): - return slice(index * wb_bus.granularity, (index + 1) * wb_bus.granularity) + return slice(index * wb_bus.granularity, + (index + 1) * wb_bus.granularity) for index, sel_index in enumerate(wb_bus.sel): with m.Case(index): if index > 0: - # CSR reads are registered, and we need to re-register them. - m.d.sync += wb_bus.dat_r[segment(index - 1)].eq(csr_bus.r_data) + # CSR reads are registered, and we need to + # re-register them. + m.d.sync += wb_bus.dat_r[segment( + index - 1)].eq(csr_bus.r_data) m.d.comb += csr_bus.r_stb.eq(sel_index & ~wb_bus.we) - m.d.comb += csr_bus.w_data.eq(wb_bus.dat_w[segment(index)]) + m.d.comb += csr_bus.w_data.eq( + wb_bus.dat_w[segment(index)]) m.d.comb += csr_bus.w_stb.eq(sel_index & wb_bus.we) m.d.sync += cycle.eq(index + 1) diff --git a/nmigen_soc/memory.py b/nmigen_soc/memory.py index 4f130c5..2597fd8 100644 --- a/nmigen_soc/memory.py +++ b/nmigen_soc/memory.py @@ -9,18 +9,19 @@ class _RangeMap: A range map is a mapping from non-overlapping ranges to arbitrary values. """ + def __init__(self): - self._keys = [] + self._keys = [] self._values = dict() self._starts = [] - self._stops = [] + self._stops = [] def insert(self, key, value): assert isinstance(key, range) assert not self.overlaps(key) start_idx = bisect.bisect_right(self._starts, key.start) - stop_idx = bisect.bisect_left(self._stops, key.stop) + stop_idx = bisect.bisect_left(self._stops, key.stop) assert start_idx == stop_idx self._starts.insert(start_idx, key.start) @@ -37,7 +38,7 @@ class _RangeMap: def overlaps(self, key): start_idx = bisect.bisect_right(self._stops, key.start) - stop_idx = bisect.bisect_left(self._starts, key.stop) + stop_idx = bisect.bisect_left(self._starts, key.stop) return [self._values[key] for key in self._keys[start_idx:stop_idx]] def items(self): @@ -75,24 +76,25 @@ class MemoryMap: at an address that is a multiple of ``2 ** alignment``, and its size will be rounded up to be a multiple of ``2 ** alignment``. """ + def __init__(self, *, addr_width, data_width, alignment=0): if not isinstance(addr_width, int) or addr_width <= 0: - raise ValueError("Address width must be a positive integer, not {!r}" - .format(addr_width)) + raise ValueError("Address width must be a positive integer, " + "not {!r}" .format(addr_width)) if not isinstance(data_width, int) or data_width <= 0: - raise ValueError("Data width must be a positive integer, not {!r}" - .format(data_width)) + raise ValueError("Data width must be a positive integer, " + "not {!r}" .format(data_width)) if not isinstance(alignment, int) or alignment < 0: - raise ValueError("Alignment must be a non-negative integer, not {!r}" - .format(alignment)) + raise ValueError("Alignment must be a non-negative integer, " + "not {!r}" .format(alignment)) self.addr_width = addr_width self.data_width = data_width - self.alignment = alignment + self.alignment = alignment - self._ranges = _RangeMap() + self._ranges = _RangeMap() self._resources = dict() - self._windows = dict() + self._windows = dict() self._next_addr = 0 @@ -116,20 +118,22 @@ class MemoryMap: Implicit next address. """ if not isinstance(alignment, int) or alignment < 0: - raise ValueError("Alignment must be a non-negative integer, not {!r}" - .format(alignment)) - self._next_addr = self._align_up(self._next_addr, max(alignment, self.alignment)) + raise ValueError("Alignment must be a non-negative integer, " + "not {!r}" .format(alignment)) + self._next_addr = self._align_up( + self._next_addr, max( + alignment, self.alignment)) return self._next_addr def _compute_addr_range(self, addr, size, step=1, *, alignment): if addr is not None: if not isinstance(addr, int) or addr < 0: - raise ValueError("Address must be a non-negative integer, not {!r}" - .format(addr)) + raise ValueError("Address must be a non-negative integer, " + "not {!r}" .format(addr)) if addr % (1 << self.alignment) != 0: - raise ValueError("Explicitly specified address {:#x} must be a multiple of " - "{:#x} bytes" - .format(addr, 1 << alignment)) + raise ValueError("Explicitly specified address {:#x} " + "must be a multiple of " + "{:#x} bytes".format(addr, 1 << alignment)) else: addr = self._align_up(self._next_addr, alignment) @@ -138,10 +142,12 @@ class MemoryMap: .format(size)) size = self._align_up(size, alignment) - if addr > (1 << self.addr_width) or addr + size > (1 << self.addr_width): + if addr > (1 << self.addr_width) or addr + \ + size > (1 << self.addr_width): raise ValueError("Address range {:#x}..{:#x} out of bounds for memory map spanning " "range {:#x}..{:#x} ({} address bits)" - .format(addr, addr + size, 0, 1 << self.addr_width, self.addr_width)) + .format(addr, addr + size, 0, + 1 << self.addr_width, self.addr_width)) addr_range = range(addr, addr + size, step) overlaps = self._ranges.overlaps(addr_range) @@ -151,13 +157,19 @@ class MemoryMap: if overlap in self._resources: resource_range = self._resources[overlap] overlap_descrs.append("resource {!r} at {:#x}..{:#x}" - .format(overlap, resource_range.start, resource_range.stop)) + .format(overlap, + resource_range.start, + resource_range.stop)) if overlap in self._windows: window_range = self._windows[overlap] overlap_descrs.append("window {!r} at {:#x}..{:#x}" - .format(overlap, window_range.start, window_range.stop)) + .format(overlap, + window_range.start, + window_range.stop)) raise ValueError("Address range {:#x}..{:#x} overlaps with {}" - .format(addr, addr + size, ", ".join(overlap_descrs))) + .format(addr, + addr + size, + ", ".join(overlap_descrs))) return addr_range @@ -196,13 +208,15 @@ class MemoryMap: """ if resource in self._resources: addr_range = self._resources[resource] - raise ValueError("Resource {!r} is already added at address range {:#x}..{:#x}" - .format(resource, addr_range.start, addr_range.stop)) + raise ValueError("Resource {!r} is already added at " + "address range {:#x}..{:#x}" + .format(resource, addr_range.start, + addr_range.stop)) if alignment is not None: if not isinstance(alignment, int) or alignment < 0: - raise ValueError("Alignment must be a non-negative integer, not {!r}" - .format(alignment)) + raise ValueError("Alignment must be a non-negative integer, " + "not {!r}" .format(alignment)) alignment = max(alignment, self.alignment) else: alignment = self.alignment @@ -285,22 +299,28 @@ class MemoryMap: .format(window)) if window in self._windows: addr_range = self._windows[window] - raise ValueError("Window {!r} is already added at address range {:#x}..{:#x}" + raise ValueError("Window {!r} is already added at " + "address range {:#x}..{:#x}" .format(window, addr_range.start, addr_range.stop)) if window.data_width > self.data_width: - raise ValueError("Window has data width {}, and cannot be added to a memory map " + raise ValueError("Window has data width {}, and cannot " + "be added to a memory map " "with data width {}" .format(window.data_width, self.data_width)) if window.data_width != self.data_width: if sparse is None: - raise ValueError("Address translation mode must be explicitly specified " - "when adding a window with data width {} to a memory map " + raise ValueError("Address translation mode must be " + "explicitly specified " + "when adding a window with " + "data width {} to a memory map " "with data width {}" .format(window.data_width, self.data_width)) if not sparse and self.data_width % window.data_width != 0: - raise ValueError("Dense addressing cannot be used because the memory map " - "data width {} is not an integer multiple of window " + raise ValueError("Dense addressing cannot be used " + "because the memory map " + "data width {} is not an integer " + "multiple of window " "data width {}" .format(self.data_width, window.data_width)) @@ -316,7 +336,8 @@ class MemoryMap: # a window can still be aligned using align_to(). alignment = max(self.alignment, window.addr_width // ratio) - addr_range = self._compute_addr_range(addr, size, ratio, alignment=alignment) + addr_range = self._compute_addr_range( + addr, size, ratio, alignment=alignment) self._ranges.insert(addr_range, window) self._windows[window] = addr_range self._next_addr = addr_range.stop @@ -336,7 +357,8 @@ class MemoryMap: wider bus. Otherwise, it is always 1. """ for window, window_range in self._windows.items(): - yield window, (window_range.start, window_range.stop, window_range.step) + yield window, (window_range.start, window_range.stop, + window_range.step) def window_patterns(self): """Iterate local windows and patterns that match their address ranges. @@ -355,7 +377,8 @@ class MemoryMap: it is always 1. """ for window, window_range in self._windows.items(): - pattern = "{:0{}b}{}".format(window_range.start >> window.addr_width, + pattern = "{:0{}b}{}".format((window_range.start >> + window.addr_width), self.addr_width - window.addr_width, "-" * window.addr_width) yield window, (pattern, window_range.step) @@ -366,7 +389,7 @@ class MemoryMap: # Accessing a resource through a dense and then a sparse window results in very strange # layouts that cannot be easily represented, so reject those. assert window_range.step == 1 or width == window.data_width - size = (end - start) // window_range.step + size = (end - start) // window_range.step start += window_range.start width *= window_range.step return start, start + size, width @@ -386,12 +409,15 @@ class MemoryMap: """ for addr_range, assignment in self._ranges.items(): if assignment in self._resources: - yield assignment, (addr_range.start, addr_range.stop, self.data_width) + yield assignment, (addr_range.start, addr_range.stop, + self.data_width) elif assignment in self._windows: for sub_resource, sub_descr in assignment.all_resources(): - yield sub_resource, self._translate(*sub_descr, assignment, addr_range) + yield sub_resource, self._translate(*sub_descr, + assignment, + addr_range) else: - assert False # :nocov: + assert False # :nocov: def find_resource(self, resource): """Find address range corresponding to a resource. @@ -423,7 +449,9 @@ class MemoryMap: for window, window_range in self._windows.items(): try: - return self._translate(*window.find_resource(resource), window, window_range) + return self._translate(*window.find_resource(resource), + window, + window_range) except KeyError: pass @@ -450,6 +478,7 @@ class MemoryMap: return assignment elif assignment in self._windows: addr_range = self._windows[assignment] - return assignment.decode_address((address - addr_range.start) // addr_range.step) + return assignment.decode_address( + (address - addr_range.start) // addr_range.step) else: - assert False # :nocov: + assert False # :nocov: diff --git a/nmigen_soc/scheduler.py b/nmigen_soc/scheduler.py index 588238a..9fffac5 100644 --- a/nmigen_soc/scheduler.py +++ b/nmigen_soc/scheduler.py @@ -24,6 +24,7 @@ class RoundRobin(Elaboratable): Strobe signal to enable granting access to the next device requesting. Externally driven. """ + def __init__(self, n): self.n = n self.request = Signal(n) @@ -37,7 +38,7 @@ class RoundRobin(Elaboratable): with m.Switch(self.grant): for i in range(self.n): with m.Case(i): - for j in reversed(range(i+1, i+self.n)): + for j in reversed(range(i + 1, i + self.n)): # If i+1 <= j < n, then t == j; (after i) # If n <= j < i+n, then t == j - n (before i) t = j % self.n diff --git a/nmigen_soc/test/test_csr_bus.py b/nmigen_soc/test/test_csr_bus.py index 12fb580..83e9b7c 100644 --- a/nmigen_soc/test/test_csr_bus.py +++ b/nmigen_soc/test/test_csr_bus.py @@ -38,7 +38,7 @@ class ElementTestCase(unittest.TestCase): ("w_stb", 1), ])) - def test_layout_0_rw(self): # degenerate but legal case + def test_layout_0_rw(self): # degenerate but legal case elem = Element(0, access=Element.Access.RW) self.assertEqual(elem.width, 0) self.assertEqual(elem.access, Element.Access.RW) @@ -66,11 +66,11 @@ class InterfaceTestCase(unittest.TestCase): self.assertEqual(iface.addr_width, 12) self.assertEqual(iface.data_width, 8) self.assertEqual(iface.layout, Layout.cast([ - ("addr", 12), - ("r_data", 8), - ("r_stb", 1), - ("w_data", 8), - ("w_stb", 1), + ("addr", 12), + ("r_data", 8), + ("r_stb", 1), + ("w_data", 8), + ("w_stb", 1), ])) def test_wrong_addr_width(self): @@ -152,7 +152,7 @@ class MultiplexerTestCase(unittest.TestCase): self.assertEqual((yield elem_4_r.r_stb), 0) self.assertEqual((yield elem_16_rw.r_stb), 1) yield - yield bus.addr.eq(3) # pipeline a read + yield bus.addr.eq(3) # pipeline a read self.assertEqual((yield bus.r_data), 0xa5) yield bus.r_stb.eq(1) @@ -168,7 +168,7 @@ class MultiplexerTestCase(unittest.TestCase): yield bus.w_stb.eq(1) yield yield bus.w_stb.eq(0) - yield bus.addr.eq(2) # change address + yield bus.addr.eq(2) # change address yield self.assertEqual((yield elem_8_w.w_stb), 1) self.assertEqual((yield elem_8_w.w_data), 0x3d) @@ -182,7 +182,7 @@ class MultiplexerTestCase(unittest.TestCase): yield self.assertEqual((yield elem_8_w.w_stb), 0) self.assertEqual((yield elem_16_rw.w_stb), 0) - yield bus.addr.eq(3) # pipeline a write + yield bus.addr.eq(3) # pipeline a write yield bus.w_data.eq(0xaa) yield self.assertEqual((yield elem_8_w.w_stb), 0) @@ -271,12 +271,12 @@ class DecoderTestCase(unittest.TestCase): def test_add_wrong_sub_bus(self): with self.assertRaisesRegex(TypeError, - r"Subordinate bus must be an instance of csr\.Interface, not 1"): + r"Subordinate bus must be an instance of csr\.Interface, not 1"): self.dut.add(1) def test_add_wrong_data_width(self): mux = Multiplexer(addr_width=10, data_width=16) - Fragment.get(mux, platform=None) # silence UnusedElaboratable + Fragment.get(mux, platform=None) # silence UnusedElaboratable with self.assertRaisesRegex(ValueError, r"Subordinate bus has data width 16, which is not the same as " @@ -284,12 +284,12 @@ class DecoderTestCase(unittest.TestCase): self.dut.add(mux.bus) def test_sim(self): - mux_1 = Multiplexer(addr_width=10, data_width=8) + mux_1 = Multiplexer(addr_width=10, data_width=8) self.dut.add(mux_1.bus) elem_1 = Element(8, "rw") mux_1.add(elem_1) - mux_2 = Multiplexer(addr_width=10, data_width=8) + mux_2 = Multiplexer(addr_width=10, data_width=8) self.dut.add(mux_2.bus) elem_2 = Element(8, "rw") mux_2.add(elem_2, addr=2) diff --git a/nmigen_soc/test/test_csr_wishbone.py b/nmigen_soc/test/test_csr_wishbone.py index a754d18..7c41cf2 100644 --- a/nmigen_soc/test/test_csr_wishbone.py +++ b/nmigen_soc/test/test_csr_wishbone.py @@ -13,7 +13,7 @@ class MockRegister(Elaboratable): self.element = csr.Element(width, "rw") self.r_count = Signal(8) self.w_count = Signal(8) - self.data = Signal(width) + self.data = Signal(width) def elaborate(self, platform): m = Module() @@ -39,15 +39,15 @@ class WishboneCSRBridgeTestCase(unittest.TestCase): with self.assertRaisesRegex(ValueError, r"CSR bus data width must be one of 8, 16, 32, 64, not 7"): WishboneCSRBridge(csr_bus=csr.Interface(addr_width=10, - data_width=7)) + data_width=7)) def test_narrow(self): - mux = csr.Multiplexer(addr_width=10, data_width=8) + mux = csr.Multiplexer(addr_width=10, data_width=8) reg_1 = MockRegister(8) mux.add(reg_1.element) reg_2 = MockRegister(16) mux.add(reg_2.element) - dut = WishboneCSRBridge(mux.bus) + dut = WishboneCSRBridge(mux.bus) def sim_test(): yield dut.wb_bus.cyc.eq(1) diff --git a/nmigen_soc/test/test_memory.py b/nmigen_soc/test/test_memory.py index 040f777..7f42319 100644 --- a/nmigen_soc/test/test_memory.py +++ b/nmigen_soc/test/test_memory.py @@ -6,33 +6,33 @@ from ..memory import _RangeMap, MemoryMap class RangeMapTestCase(unittest.TestCase): def test_insert(self): range_map = _RangeMap() - range_map.insert(range(0,10), "a") - range_map.insert(range(20,21), "c") - range_map.insert(range(15,16), "b") - range_map.insert(range(16,20), "q") + range_map.insert(range(0, 10), "a") + range_map.insert(range(20, 21), "c") + range_map.insert(range(15, 16), "b") + range_map.insert(range(16, 20), "q") self.assertEqual(range_map._keys, [ - range(0,10), range(15,16), range(16,20), range(20,21) + range(0, 10), range(15, 16), range(16, 20), range(20, 21) ]) def test_overlaps(self): range_map = _RangeMap() - range_map.insert(range(10,20), "a") - self.assertEqual(range_map.overlaps(range(5,15)), ["a"]) - self.assertEqual(range_map.overlaps(range(15,25)), ["a"]) - self.assertEqual(range_map.overlaps(range(5,25)), ["a"]) - self.assertEqual(range_map.overlaps(range(0,3)), []) - self.assertEqual(range_map.overlaps(range(0,5)), []) - self.assertEqual(range_map.overlaps(range(25,30)), []) + range_map.insert(range(10, 20), "a") + self.assertEqual(range_map.overlaps(range(5, 15)), ["a"]) + self.assertEqual(range_map.overlaps(range(15, 25)), ["a"]) + self.assertEqual(range_map.overlaps(range(5, 25)), ["a"]) + self.assertEqual(range_map.overlaps(range(0, 3)), []) + self.assertEqual(range_map.overlaps(range(0, 5)), []) + self.assertEqual(range_map.overlaps(range(25, 30)), []) def test_insert_wrong_overlap(self): range_map = _RangeMap() - range_map.insert(range(0,10), "a") + range_map.insert(range(0, 10), "a") with self.assertRaises(AssertionError): - range_map.insert(range(5,15), "b") + range_map.insert(range(5, 15), "b") def test_get(self): range_map = _RangeMap() - range_map.insert(range(5,15), "a") + range_map.insert(range(5, 15), "a") self.assertEqual(range_map.get(0), None) self.assertEqual(range_map.get(5), "a") self.assertEqual(range_map.get(10), "a") @@ -43,17 +43,17 @@ class RangeMapTestCase(unittest.TestCase): class MemoryMapTestCase(unittest.TestCase): def test_wrong_addr_width(self): with self.assertRaisesRegex(ValueError, - r"Address width must be a positive integer, not -1"): + r"Address width must be a positive integer, not -1"): MemoryMap(addr_width=-1, data_width=8) def test_wrong_data_width(self): with self.assertRaisesRegex(ValueError, - r"Data width must be a positive integer, not -1"): + r"Data width must be a positive integer, not -1"): MemoryMap(addr_width=16, data_width=-1) def test_wrong_alignment(self): with self.assertRaisesRegex(ValueError, - r"Alignment must be a non-negative integer, not -1"): + r"Alignment must be a non-negative integer, not -1"): MemoryMap(addr_width=16, data_width=8, alignment=-1) def test_add_resource(self): @@ -69,31 +69,35 @@ class MemoryMapTestCase(unittest.TestCase): def test_add_resource_explicit_aligned(self): memory_map = MemoryMap(addr_width=16, data_width=8) self.assertEqual(memory_map.add_resource("a", size=1), (0, 1)) - self.assertEqual(memory_map.add_resource("b", size=1, alignment=1), (2, 4)) + self.assertEqual( + memory_map.add_resource( + "b", size=1, alignment=1), (2, 4)) self.assertEqual(memory_map.add_resource("c", size=2), (4, 6)) def test_add_resource_addr(self): memory_map = MemoryMap(addr_width=16, data_width=8) - self.assertEqual(memory_map.add_resource("a", size=1, addr=10), (10, 11)) + self.assertEqual( + memory_map.add_resource( + "a", size=1, addr=10), (10, 11)) self.assertEqual(memory_map.add_resource("b", size=2), (11, 13)) def test_add_resource_wrong_address(self): memory_map = MemoryMap(addr_width=16, data_width=8) with self.assertRaisesRegex(ValueError, - r"Address must be a non-negative integer, not -1"): + r"Address must be a non-negative integer, not -1"): memory_map.add_resource("a", size=1, addr=-1) def test_add_resource_wrong_address_unaligned(self): memory_map = MemoryMap(addr_width=16, data_width=8, alignment=1) with self.assertRaisesRegex(ValueError, - r"Explicitly specified address 0x1 must be " - r"a multiple of 0x2 bytes"): + r"Explicitly specified address 0x1 must be " + r"a multiple of 0x2 bytes"): memory_map.add_resource("a", size=1, addr=1) def test_add_resource_wrong_size(self): memory_map = MemoryMap(addr_width=16, data_width=8) with self.assertRaisesRegex(ValueError, - r"Size must be a non-negative integer, not -1"): + r"Size must be a non-negative integer, not -1"): memory_map.add_resource("a", size=-1) def test_add_resource_wrong_alignment(self): @@ -187,7 +191,7 @@ class MemoryMapTestCase(unittest.TestCase): r"data width 16 is not an integer multiple of window data " r"width 7"): memory_map.add_window(MemoryMap(addr_width=10, data_width=7), - sparse=False) + sparse=False) def test_add_window_wrong_overlap(self): memory_map = MemoryMap(addr_width=16, data_width=8) diff --git a/nmigen_soc/test/test_wishbone_bus.py b/nmigen_soc/test/test_wishbone_bus.py index 7360edc..d93ea9d 100644 --- a/nmigen_soc/test/test_wishbone_bus.py +++ b/nmigen_soc/test/test_wishbone_bus.py @@ -17,14 +17,14 @@ class InterfaceTestCase(unittest.TestCase): self.assertEqual(iface.memory_map.addr_width, 32) self.assertEqual(iface.memory_map.data_width, 8) self.assertEqual(iface.layout, Layout.cast([ - ("adr", 32, DIR_FANOUT), - ("dat_w", 8, DIR_FANOUT), - ("dat_r", 8, DIR_FANIN), - ("sel", 1, DIR_FANOUT), - ("cyc", 1, DIR_FANOUT), - ("stb", 1, DIR_FANOUT), - ("we", 1, DIR_FANOUT), - ("ack", 1, DIR_FANIN), + ("adr", 32, DIR_FANOUT), + ("dat_w", 8, DIR_FANOUT), + ("dat_r", 8, DIR_FANIN), + ("sel", 1, DIR_FANOUT), + ("cyc", 1, DIR_FANOUT), + ("stb", 1, DIR_FANOUT), + ("we", 1, DIR_FANOUT), + ("ack", 1, DIR_FANIN), ])) def test_granularity(self): @@ -35,35 +35,35 @@ class InterfaceTestCase(unittest.TestCase): self.assertEqual(iface.memory_map.addr_width, 32) self.assertEqual(iface.memory_map.data_width, 8) self.assertEqual(iface.layout, Layout.cast([ - ("adr", 30, DIR_FANOUT), + ("adr", 30, DIR_FANOUT), ("dat_w", 32, DIR_FANOUT), ("dat_r", 32, DIR_FANIN), - ("sel", 4, DIR_FANOUT), - ("cyc", 1, DIR_FANOUT), - ("stb", 1, DIR_FANOUT), - ("we", 1, DIR_FANOUT), - ("ack", 1, DIR_FANIN), + ("sel", 4, DIR_FANOUT), + ("cyc", 1, DIR_FANOUT), + ("stb", 1, DIR_FANOUT), + ("we", 1, DIR_FANOUT), + ("ack", 1, DIR_FANIN), ])) def test_features(self): iface = Interface(addr_width=32, data_width=32, features={"rty", "err", "stall", "lock", - "cti", "bte"}) + "cti", "bte"}) self.assertEqual(iface.layout, Layout.cast([ - ("adr", 32, DIR_FANOUT), + ("adr", 32, DIR_FANOUT), ("dat_w", 32, DIR_FANOUT), ("dat_r", 32, DIR_FANIN), - ("sel", 1, DIR_FANOUT), - ("cyc", 1, DIR_FANOUT), - ("stb", 1, DIR_FANOUT), - ("we", 1, DIR_FANOUT), - ("ack", 1, DIR_FANIN), - ("err", 1, DIR_FANIN), - ("rty", 1, DIR_FANIN), - ("stall", 1, DIR_FANIN), - ("lock", 1, DIR_FANOUT), - ("cti", CycleType, DIR_FANOUT), - ("bte", BurstTypeExt, DIR_FANOUT), + ("sel", 1, DIR_FANOUT), + ("cyc", 1, DIR_FANOUT), + ("stb", 1, DIR_FANOUT), + ("we", 1, DIR_FANOUT), + ("ack", 1, DIR_FANIN), + ("err", 1, DIR_FANIN), + ("rty", 1, DIR_FANIN), + ("stall", 1, DIR_FANIN), + ("lock", 1, DIR_FANOUT), + ("cti", CycleType, DIR_FANOUT), + ("bte", BurstTypeExt, DIR_FANOUT), ])) def test_wrong_addr_width(self): @@ -138,7 +138,7 @@ class DecoderTestCase(unittest.TestCase): r"but the decoder does " r"not have a corresponding input"): self.dut.add(Interface(addr_width=15, data_width=32, - granularity=16, features={"err"})) + granularity=16, features={"err"})) class DecoderSimulationTestCase(unittest.TestCase): @@ -239,7 +239,7 @@ class DecoderSimulationTestCase(unittest.TestCase): for index, sel_bit in enumerate(self.bus.sel): with m.If(sel_bit): segment = self.bus.dat_r.word_select(index, - self.bus.granularity) + self.bus.granularity) m.d.comb += segment.eq(self.bus.adr + index) return m @@ -254,7 +254,7 @@ class DecoderSimulationTestCase(unittest.TestCase): loop_3 = AddressLoopback(addr_width=8, data_width=16, granularity=16) self.assertEqual(dut.add(loop_3.bus, addr=0x30000, sparse=True), (0x30000, 0x30100, 1)) - loop_4 = AddressLoopback(addr_width=8, data_width=8, granularity=8) + loop_4 = AddressLoopback(addr_width=8, data_width=8, granularity=8) self.assertEqual(dut.add(loop_4.bus, addr=0x40000, sparse=True), (0x40000, 0x40100, 1)) @@ -393,7 +393,7 @@ class ArbiterSimulationTestCase(unittest.TestCase): dut.add(itor_1) itor_2 = Interface(addr_width=30, data_width=32, granularity=16, features={"err", "rty", "stall", "lock", - "cti", "bte"}) + "cti", "bte"}) dut.add(itor_2) def sim_test(): @@ -618,39 +618,39 @@ class InterconnectSharedSimulationTestCase(unittest.TestCase): self.shared = Interface(addr_width=30, data_width=32, granularity=8, - features={"err","cti","bte"}, + features={"err", "cti", "bte"}, name="shared") self.master01 = Interface(addr_width=30, data_width=32, granularity=8, - features={"err","cti","bte"}, + features={"err", "cti", "bte"}, name="master01") self.master02 = Record([ - ("adr", 30, DIR_FANOUT), + ("adr", 30, DIR_FANOUT), ("dat_w", 32, DIR_FANOUT), ("dat_r", 32, DIR_FANIN), - ("sel", 4, DIR_FANOUT), - ("cyc", 1, DIR_FANOUT), - ("stb", 1, DIR_FANOUT), - ("ack", 1, DIR_FANIN), - ("we", 1, DIR_FANOUT), - ("cti", 3, DIR_FANOUT), - ("bte", 2, DIR_FANOUT), - ("err", 1, DIR_FANIN) + ("sel", 4, DIR_FANOUT), + ("cyc", 1, DIR_FANOUT), + ("stb", 1, DIR_FANOUT), + ("ack", 1, DIR_FANIN), + ("we", 1, DIR_FANOUT), + ("cti", 3, DIR_FANOUT), + ("bte", 2, DIR_FANOUT), + ("err", 1, DIR_FANIN) ]) self.sub01 = Interface(addr_width=11, - data_width=32, - granularity=8, - features={"err","cti","bte"}, - name="sub01") + data_width=32, + granularity=8, + features={"err", "cti", "bte"}, + name="sub01") self.sub02 = Interface(addr_width=21, data_width=32, granularity=8, - features={"err","cti","bte"}, + features={"err", "cti", "bte"}, name="sub02") self.dut = InterconnectShared( addr_width=30, data_width=32, granularity=8, - features={"err","cti","bte"}, + features={"err", "cti", "bte"}, itors=[ self.master01, self.master02 diff --git a/nmigen_soc/wishbone/bus.py b/nmigen_soc/wishbone/bus.py index ba5bc86..7b44f34 100644 --- a/nmigen_soc/wishbone/bus.py +++ b/nmigen_soc/wishbone/bus.py @@ -13,17 +13,17 @@ __all__ = ["CycleType", "BurstTypeExt", "Interface", "Decoder", class CycleType(Enum): """Wishbone Registered Feedback cycle type.""" - CLASSIC = 0b000 - CONST_BURST = 0b001 - INCR_BURST = 0b010 + CLASSIC = 0b000 + CONST_BURST = 0b001 + INCR_BURST = 0b010 END_OF_BURST = 0b111 class BurstTypeExt(Enum): """Wishbone Registered Feedback burst type extension.""" - LINEAR = 0b00 - WRAP_4 = 0b01 - WRAP_8 = 0b10 + LINEAR = 0b00 + WRAP_4 = 0b01 + WRAP_8 = 0b10 WRAP_16 = 0b11 @@ -107,6 +107,7 @@ class Interface(Record): Optional. Corresponds to Wishbone signal ``BTE_O`` (initiator) or ``BTE_I`` (target). """ + def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(), alignment=0, name=None): if not isinstance(addr_width, int) or addr_width < 0: @@ -123,29 +124,30 @@ class Interface(Record): if granularity > data_width: raise ValueError("Granularity {} may not be greater than data width {}" .format(granularity, data_width)) - self.addr_width = addr_width - self.data_width = data_width + self.addr_width = addr_width + self.data_width = data_width self.granularity = granularity granularity_bits = log2_int(data_width // granularity) self._alignment = alignment - self.memory_map = MemoryMap(addr_width=max(1, addr_width + granularity_bits), - data_width=data_width >> granularity_bits, - alignment=alignment) + self.memory_map = MemoryMap(addr_width=max(1, addr_width + granularity_bits), + data_width=data_width >> granularity_bits, + alignment=alignment) self._features = set(features) - unknown = self._features - {"rty", "err", "stall", "lock", "cti", "bte"} + unknown = self._features - \ + {"rty", "err", "stall", "lock", "cti", "bte"} if unknown: raise ValueError("Optional signal(s) {} are not supported" .format(", ".join(map(repr, unknown)))) layout = [ - ("adr", addr_width, Direction.FANOUT), + ("adr", addr_width, Direction.FANOUT), ("dat_w", data_width, Direction.FANOUT), ("dat_r", data_width, Direction.FANIN), - ("sel", data_width // granularity, Direction.FANOUT), - ("cyc", 1, Direction.FANOUT), - ("stb", 1, Direction.FANOUT), - ("we", 1, Direction.FANOUT), - ("ack", 1, Direction.FANIN), + ("sel", data_width // granularity, Direction.FANOUT), + ("cyc", 1, Direction.FANOUT), + ("stb", 1, Direction.FANOUT), + ("we", 1, Direction.FANOUT), + ("ack", 1, Direction.FANIN), ] if "err" in features: layout += [("err", 1, Direction.FANIN)] @@ -154,9 +156,9 @@ class Interface(Record): if "stall" in features: layout += [("stall", 1, Direction.FANIN)] if "lock" in features: - layout += [("lock", 1, Direction.FANOUT)] + layout += [("lock", 1, Direction.FANOUT)] if "cti" in features: - layout += [("cti", CycleType, Direction.FANOUT)] + layout += [("cti", CycleType, Direction.FANOUT)] if "bte" in features: layout += [("bte", BurstTypeExt, Direction.FANOUT)] super().__init__(layout, name=name, src_loc_at=1) @@ -174,7 +176,7 @@ class Interface(Record): "but {}-bit long \"dat_r\"" .format(record, len(record.dat_w), len(record.dat_r))) data_width = len(record.dat_w) - if data_width%len(record.sel) != 0: + if data_width % len(record.sel) != 0: raise AttributeError("Record {!r} has invalid granularity value because " "its data width is {}-bit long but " "its \"sel\" is {}-bit long" @@ -189,7 +191,7 @@ class Interface(Record): granularity=granularity, features=features, alignment=0, - name=record.name+"_intf") + name=record.name + "_intf") class Decoder(Elaboratable): @@ -215,12 +217,13 @@ class Decoder(Elaboratable): bus : :class:`Interface` Bus providing access to subordinate buses. """ + def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(), alignment=0): - self.bus = Interface(addr_width=addr_width, data_width=data_width, - granularity=granularity, features=features, - alignment=alignment) - self._map = self.bus.memory_map + self.bus = Interface(addr_width=addr_width, data_width=data_width, + granularity=granularity, features=features, + alignment=alignment) + self._map = self.bus.memory_map self._subs = dict() def align_to(self, alignment): @@ -263,20 +266,22 @@ class Decoder(Elaboratable): "translation)" .format(sub_bus.data_width, sub_bus.granularity)) for opt_output in {"err", "rty", "stall"}: - if hasattr(sub_bus, opt_output) and not hasattr(self.bus, opt_output): + if hasattr(sub_bus, opt_output) and not hasattr( + self.bus, opt_output): raise ValueError("Subordinate bus has optional output {!r}, but the decoder " "does not have a corresponding input" .format(opt_output)) self._subs[sub_bus.memory_map] = sub_bus - return self._map.add_window(sub_bus.memory_map, addr=addr, sparse=sparse) + return self._map.add_window( + sub_bus.memory_map, addr=addr, sparse=sparse) def elaborate(self, platform): m = Module() - ack_fanin = 0 - err_fanin = 0 - rty_fanin = 0 + ack_fanin = 0 + err_fanin = 0 + rty_fanin = 0 stall_fanin = 0 with m.Switch(self.bus.adr): @@ -286,16 +291,19 @@ class Decoder(Elaboratable): m.d.comb += [ sub_bus.adr.eq(self.bus.adr << log2_int(sub_ratio)), sub_bus.dat_w.eq(self.bus.dat_w), - sub_bus.sel.eq(Cat(Repl(sel, sub_ratio) for sel in self.bus.sel)), + sub_bus.sel.eq(Cat(Repl(sel, sub_ratio) + for sel in self.bus.sel)), sub_bus.we.eq(self.bus.we), sub_bus.stb.eq(self.bus.stb), ] if hasattr(sub_bus, "lock"): m.d.comb += sub_bus.lock.eq(getattr(self.bus, "lock", 0)) if hasattr(sub_bus, "cti"): - m.d.comb += sub_bus.cti.eq(getattr(self.bus, "cti", CycleType.CLASSIC)) + m.d.comb += sub_bus.cti.eq(getattr(self.bus, + "cti", CycleType.CLASSIC)) if hasattr(sub_bus, "bte"): - m.d.comb += sub_bus.bte.eq(getattr(self.bus, "bte", BurstTypeExt.LINEAR)) + m.d.comb += sub_bus.bte.eq(getattr(self.bus, + "bte", BurstTypeExt.LINEAR)) with m.Case(sub_pat[:-log2_int(self.bus.data_width // self.bus.granularity)]): m.d.comb += [ @@ -346,10 +354,11 @@ class Arbiter(Elaboratable): bus : :class:`Interface` Shared bus to which the selected initiator gains access. """ + def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(), scheduler="rr"): - self.bus = Interface(addr_width=addr_width, data_width=data_width, - granularity=granularity, features=features) + self.bus = Interface(addr_width=addr_width, data_width=data_width, + granularity=granularity, features=features) self._itors = [] if scheduler not in ["rr"]: raise ValueError("Scheduling mode must be \"rr\", not {!r}" @@ -379,7 +388,8 @@ class Arbiter(Elaboratable): "arbiter data width {}" .format(itor_bus.data_width, self.bus.data_width)) for opt_output in {"lock", "cti", "bte"}: - if hasattr(itor_bus, opt_output) and not hasattr(self.bus, opt_output): + if hasattr(itor_bus, opt_output) and not hasattr( + self.bus, opt_output): raise ValueError("Initiator bus has optional output {!r}, but the arbiter " "does not have a corresponding input" .format(opt_output)) @@ -393,7 +403,8 @@ class Arbiter(Elaboratable): m.submodules.scheduler = scheduler = RoundRobin(len(self._itors)) grant = Signal(range(len(self._itors))) - # CYC should not be indefinitely asserted. (See RECOMMENDATION 3.05, Wishbone B4) + # CYC should not be indefinitely asserted. (See RECOMMENDATION 3.05, + # Wishbone B4) bus_busy = self.bus.cyc if hasattr(self.bus, "lock"): # If LOCK is not asserted, we also wait for STB to be deasserted before granting bus @@ -420,25 +431,32 @@ class Arbiter(Elaboratable): m.d.comb += [ self.bus.adr.eq(itor_bus.adr), self.bus.dat_w.eq(itor_bus.dat_w), - self.bus.sel.eq(Cat(Repl(sel, ratio) for sel in itor_bus.sel)), + self.bus.sel.eq(Cat(Repl(sel, ratio) + for sel in itor_bus.sel)), self.bus.we.eq(itor_bus.we), self.bus.stb.eq(itor_bus.stb), ] m.d.comb += self.bus.cyc.eq(itor_bus.cyc) if hasattr(self.bus, "lock"): - m.d.comb += self.bus.lock.eq(getattr(itor_bus, "lock", 1)) + m.d.comb += self.bus.lock.eq( + getattr(itor_bus, "lock", 1)) if hasattr(self.bus, "cti"): - m.d.comb += self.bus.cti.eq(getattr(itor_bus, "cti", CycleType.CLASSIC)) + m.d.comb += self.bus.cti.eq( + getattr(itor_bus, "cti", CycleType.CLASSIC)) if hasattr(self.bus, "bte"): - m.d.comb += self.bus.bte.eq(getattr(itor_bus, "bte", BurstTypeExt.LINEAR)) + m.d.comb += self.bus.bte.eq( + getattr(itor_bus, "bte", BurstTypeExt.LINEAR)) m.d.comb += itor_bus.ack.eq(self.bus.ack) if hasattr(itor_bus, "err"): - m.d.comb += itor_bus.err.eq(getattr(self.bus, "err", 0)) + m.d.comb += itor_bus.err.eq( + getattr(self.bus, "err", 0)) if hasattr(itor_bus, "rty"): - m.d.comb += itor_bus.rty.eq(getattr(self.bus, "rty", 0)) + m.d.comb += itor_bus.rty.eq( + getattr(self.bus, "rty", 0)) if hasattr(itor_bus, "stall"): - m.d.comb += itor_bus_stall.eq(getattr(self.bus, "stall", ~self.bus.ack)) + m.d.comb += itor_bus_stall.eq( + getattr(self.bus, "stall", ~self.bus.ack)) return m @@ -492,6 +510,7 @@ class InterconnectShared(Elaboratable): decoder : :class:`Decoder` The decoder that connects the shared bus to the list of SLAVEs. """ + def __init__(self, *, addr_width, data_width, itors, targets, **kwargs): self.addr_width = addr_width self.data_width = data_width diff --git a/nmigen_soc/wishbone/sram.py b/nmigen_soc/wishbone/sram.py index eeeada3..c5a139d 100644 --- a/nmigen_soc/wishbone/sram.py +++ b/nmigen_soc/wishbone/sram.py @@ -44,6 +44,7 @@ class SRAM(Elaboratable): The Wishbone bus interface providing access to the read/write ports of the memory. """ + def __init__(self, memory, read_only=False, bus=None, granularity=None, features=frozenset()): if not isinstance(memory, Memory): @@ -76,7 +77,8 @@ class SRAM(Elaboratable): # write if not self.read_only: - m.submodules.wrport = wrport = self.memory.write_port(granularity=self.granularity) + m.submodules.wrport = wrport = self.memory.write_port( + granularity=self.granularity) m.d.comb += [ wrport.addr.eq(self.bus.adr[:len(rdport.addr)]), wrport.data.eq(self.bus.dat_w)