Move write_gtkw from soc.experiment.alu_fsm
authorCesar Strauss <cestrauss@gmail.com>
Sat, 29 Aug 2020 20:52:58 +0000 (17:52 -0300)
committerCesar Strauss <cestrauss@gmail.com>
Sat, 29 Aug 2020 21:00:38 +0000 (18:00 -0300)
Move implementation to nmutil.gtkw.
Move example to a separate file.

README.md
setup.py
src/nmutil/gtkw.py [new file with mode: 0644]
src/nmutil/test/example_gtkwave.py [new file with mode: 0644]

index 9537e2a0312886b793aca5605e2afe9ab9b5e36f..77adfea6e99334e793d42a3829ee1a883b29d3c6 100644 (file)
--- a/README.md
+++ b/README.md
@@ -8,4 +8,4 @@ http://bugs.libre-riscv.org/show_bug.cgi?id=62
 
 * nmigen
 * yosys (latest git repository, required by nmigen)
-
+* pyvcd (for stylish GTKWave documents) - available on Pypi
index 2bcb8c3a453e85fbe5d52f8cb864a98a1b1d04f0..ff6096b882e40ecbf02069ede2ac0e1c6a1a18fc 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -8,6 +8,7 @@ NEWS = open(os.path.join(here, 'NEWS.txt')).read()
 version = '0.0.1'
 
 install_requires = [
+    'pyvcd',  # for stylish GTKWave documents - available on Pypi
 ]
 
 test_requires = [
diff --git a/src/nmutil/gtkw.py b/src/nmutil/gtkw.py
new file mode 100644 (file)
index 0000000..5327bc0
--- /dev/null
@@ -0,0 +1,145 @@
+from vcd.gtkw import GTKWSave, GTKWColor
+
+
+def write_gtkw(gtkw_name, vcd_name, gtkw_dom, gtkw_style=None,
+               module=None, loc=None, color=None, base=None,
+               zoom=-22.9, marker=-1):
+    """ Write a GTKWave document according to the supplied style and DOM.
+
+    :param gtkw_name: name of the generated GTKWave document
+    :param vcd_name: name of the waveform file
+    :param gtkw_dom: DOM style description for the trace pane
+    :param gtkw_style: style for signals, classes and groups
+    :param module: default module
+    :param color: default trace color
+    :param base: default numerical base
+    :param loc: source code location to include as a comment
+    :param zoom: initial zoom level, in GTKWave format
+    :param marker: initial location of a marker
+
+    **gtkw_style format**
+
+    Syntax: ``{selector: {attribute: value, ...}, ...}``
+
+    "selector" can be a signal, class or group
+
+    Signal groups propagate most attributes to their children
+
+    Attribute choices:
+
+    * module: instance path, for prepending to the signal name
+    * color: trace color
+    * base: numerical base for value display
+    * display: alternate text to display in the signal pane
+    * comment: comment to display in the signal pane
+
+    **gtkw_dom format**
+
+    Syntax: ``[signal, (signal, class), (group, [children]), comment, ...]``
+
+    The DOM is a list of nodes.
+
+    Nodes are signals, signal groups or comments.
+
+    * signals are strings, or tuples: ``(signal name, class, class, ...)``
+    * signal groups are tuples: ``(group name, class, class, ..., [nodes])``
+    * comments are: ``{'comment': 'comment string'}``
+
+    In place of a class name, an inline class description can be used.
+    ``(signal, {attribute: value, ...}, ...)``
+    """
+    colors = {
+        'blue': GTKWColor.blue,
+        'cycle': GTKWColor.cycle,
+        'green': GTKWColor.green,
+        'indigo': GTKWColor.indigo,
+        'normal': GTKWColor.normal,
+        'orange': GTKWColor.orange,
+        'red': GTKWColor.red,
+        'violet': GTKWColor.violet,
+        'yellow': GTKWColor.yellow,
+    }
+
+    with open(gtkw_name, "wt") as gtkw_file:
+        gtkw = GTKWSave(gtkw_file)
+        if loc is not None:
+            gtkw.comment("Auto-generated by " + loc)
+        gtkw.dumpfile(vcd_name)
+        # set a reasonable zoom level
+        # also, move the marker to an interesting place
+        gtkw.zoom_markers(zoom, marker)
+
+        # create an empty style, if needed
+        if gtkw_style is None:
+            gtkw_style = dict()
+
+        # create an empty root selector, if needed
+        root_style = gtkw_style.get('', dict())
+
+        # apply styles to the root selector, if provided
+        if module is not None:
+            root_style['module'] = module
+        if color is not None:
+            root_style['color'] = color
+        if base is not None:
+            root_style['base'] = base
+        # base cannot be None, use 'hex' by default
+        if root_style.get('base') is None:
+            root_style['base'] = 'hex'
+
+        # recursively walk the DOM
+        def walk(dom, style):
+            for node in dom:
+                node_name = None
+                children = None
+                # copy the style from the parent
+                node_style = style.copy()
+                # node is a signal name string
+                if isinstance(node, str):
+                    node_name = node
+                    # apply style from node name, if specified
+                    if node_name in gtkw_style:
+                        node_style.update(gtkw_style[node_name])
+                # node is a tuple
+                # could be a signal or a group
+                elif isinstance(node, tuple):
+                    node_name = node[0]
+                    # collect styles from the selectors
+                    # order goes from the most specific to most generic
+                    # which means earlier selectors override later ones
+                    for selector in reversed(node):
+                        # update the node style from the selector
+                        if isinstance(selector, str):
+                            if selector in gtkw_style:
+                                node_style.update(gtkw_style[selector])
+                        # apply an inline style description
+                        elif isinstance(selector, dict):
+                            node_style.update(selector)
+                    # node is a group if it has a child list
+                    if isinstance(node[-1], list):
+                        children = node[-1]
+                # comment
+                elif isinstance(node, dict):
+                    if 'comment' in node:
+                        gtkw.blank(node['comment'])
+                # emit the group delimiters and walk over the child list
+                if children is not None:
+                    gtkw.begin_group(node_name)
+                    # pass on the group style to its children
+                    walk(children, node_style)
+                    gtkw.end_group(node_name)
+                # emit a trace, if the node is a signal
+                elif node_name is not None:
+                    signal_name = node_name
+                    # prepend module name to signal
+                    if 'module' in node_style:
+                        node_module = node_style['module']
+                        if node_module is not None:
+                            signal_name = node_module + '.' + signal_name
+                    node_color = colors.get(node_style.get('color'))
+                    node_base = node_style.get('base')
+                    display = node_style.get('display')
+                    gtkw.trace(signal_name, color=node_color,
+                               datafmt=node_base, alias=display)
+
+        walk(gtkw_dom, root_style)
diff --git a/src/nmutil/test/example_gtkwave.py b/src/nmutil/test/example_gtkwave.py
new file mode 100644 (file)
index 0000000..9d92bda
--- /dev/null
@@ -0,0 +1,433 @@
+"""Simple example of a FSM-based ALU
+
+This demonstrates a design that follows the valid/ready protocol of the
+ALU, but with a FSM implementation, instead of a pipeline.  It is also
+intended to comply with both the CompALU API and the nmutil Pipeline API
+(Liskov Substitution Principle)
+
+The basic rules are:
+
+1) p.ready_o is asserted on the initial ("Idle") state, otherwise it keeps low.
+2) n.valid_o is asserted on the final ("Done") state, otherwise it keeps low.
+3) The FSM stays in the Idle state while p.valid_i is low, otherwise
+   it accepts the input data and moves on.
+4) The FSM stays in the Done state while n.ready_i is low, otherwise
+   it releases the output data and goes back to the Idle state.
+
+"""
+
+from nmigen import Elaboratable, Signal, Module, Cat
+cxxsim = False
+if cxxsim:
+    from nmigen.sim.cxxsim import Simulator, Settle
+else:
+    from nmigen.back.pysim import Simulator, Settle
+from nmigen.cli import rtlil
+from math import log2
+from nmutil.iocontrol import PrevControl, NextControl
+
+from soc.fu.base_input_record import CompOpSubsetBase
+from soc.decoder.power_enums import (MicrOp, Function)
+
+from vcd.gtkw import GTKWSave, GTKWColor
+from nmutil.gtkw import write_gtkw
+
+
+class CompFSMOpSubset(CompOpSubsetBase):
+    def __init__(self, name=None):
+        layout = (('sdir', 1),
+                  )
+
+        super().__init__(layout, name=name)
+
+
+
+class Dummy:
+    pass
+
+
+class Shifter(Elaboratable):
+    """Simple sequential shifter
+
+    Prev port data:
+    * p.data_i.data:  value to be shifted
+    * p.data_i.shift: shift amount
+    *                 When zero, no shift occurs.
+    *                 On POWER, range is 0 to 63 for 32-bit,
+    *                 and 0 to 127 for 64-bit.
+    *                 Other values wrap around.
+
+    Operation type
+    * op.sdir:       shift direction (0 = left, 1 = right)
+
+    Next port data:
+    * n.data_o.data: shifted value
+    """
+    class PrevData:
+        def __init__(self, width):
+            self.data = Signal(width, name="p_data_i")
+            self.shift = Signal(width, name="p_shift_i")
+            self.ctx = Dummy() # comply with CompALU API
+
+        def _get_data(self):
+            return [self.data, self.shift]
+
+    class NextData:
+        def __init__(self, width):
+            self.data = Signal(width, name="n_data_o")
+
+        def _get_data(self):
+            return [self.data]
+
+    def __init__(self, width):
+        self.width = width
+        self.p = PrevControl()
+        self.n = NextControl()
+        self.p.data_i = Shifter.PrevData(width)
+        self.n.data_o = Shifter.NextData(width)
+
+        # more pieces to make this example class comply with the CompALU API
+        self.op = CompFSMOpSubset(name="op")
+        self.p.data_i.ctx.op = self.op
+        self.i = self.p.data_i._get_data()
+        self.out = self.n.data_o._get_data()
+
+    def elaborate(self, platform):
+        m = Module()
+
+        m.submodules.p = self.p
+        m.submodules.n = self.n
+
+        # Note:
+        # It is good practice to design a sequential circuit as
+        # a data path and a control path.
+
+        # Data path
+        # ---------
+        # The idea is to have a register that can be
+        # loaded or shifted (left and right).
+
+        # the control signals
+        load = Signal()
+        shift = Signal()
+        direction = Signal()
+        # the data flow
+        shift_in = Signal(self.width)
+        shift_left_by_1 = Signal(self.width)
+        shift_right_by_1 = Signal(self.width)
+        next_shift = Signal(self.width)
+        # the register
+        shift_reg = Signal(self.width, reset_less=True)
+        # build the data flow
+        m.d.comb += [
+            # connect input and output
+            shift_in.eq(self.p.data_i.data),
+            self.n.data_o.data.eq(shift_reg),
+            # generate shifted views of the register
+            shift_left_by_1.eq(Cat(0, shift_reg[:-1])),
+            shift_right_by_1.eq(Cat(shift_reg[1:], 0)),
+        ]
+        # choose the next value of the register according to the
+        # control signals
+        # default is no change
+        m.d.comb += next_shift.eq(shift_reg)
+        with m.If(load):
+            m.d.comb += next_shift.eq(shift_in)
+        with m.Elif(shift):
+            with m.If(direction):
+                m.d.comb += next_shift.eq(shift_right_by_1)
+            with m.Else():
+                m.d.comb += next_shift.eq(shift_left_by_1)
+
+        # register the next value
+        m.d.sync += shift_reg.eq(next_shift)
+
+        # Control path
+        # ------------
+        # The idea is to have a SHIFT state where the shift register
+        # is shifted every cycle, while a counter decrements.
+        # This counter is loaded with shift amount in the initial state.
+        # The SHIFT state is left when the counter goes to zero.
+
+        # Shift counter
+        shift_width = int(log2(self.width)) + 1
+        next_count = Signal(shift_width)
+        count = Signal(shift_width, reset_less=True)
+        m.d.sync += count.eq(next_count)
+
+        with m.FSM():
+            with m.State("IDLE"):
+                m.d.comb += [
+                    # keep p.ready_o active on IDLE
+                    self.p.ready_o.eq(1),
+                    # keep loading the shift register and shift count
+                    load.eq(1),
+                    next_count.eq(self.p.data_i.shift),
+                ]
+                # capture the direction bit as well
+                m.d.sync += direction.eq(self.op.sdir)
+                with m.If(self.p.valid_i):
+                    # Leave IDLE when data arrives
+                    with m.If(next_count == 0):
+                        # short-circuit for zero shift
+                        m.next = "DONE"
+                    with m.Else():
+                        m.next = "SHIFT"
+            with m.State("SHIFT"):
+                m.d.comb += [
+                    # keep shifting, while counter is not zero
+                    shift.eq(1),
+                    # decrement the shift counter
+                    next_count.eq(count - 1),
+                ]
+                with m.If(next_count == 0):
+                    # exit when shift counter goes to zero
+                    m.next = "DONE"
+            with m.State("DONE"):
+                # keep n.valid_o active while the data is not accepted
+                m.d.comb += self.n.valid_o.eq(1)
+                with m.If(self.n.ready_i):
+                    # go back to IDLE when the data is accepted
+                    m.next = "IDLE"
+
+        return m
+
+    def __iter__(self):
+        yield self.op.sdir
+        yield self.p.data_i.data
+        yield self.p.data_i.shift
+        yield self.p.valid_i
+        yield self.p.ready_o
+        yield self.n.ready_i
+        yield self.n.valid_o
+        yield self.n.data_o.data
+
+    def ports(self):
+        return list(self)
+
+
+# Write a formatted GTKWave "save" file
+def write_gtkw_v1(base_name, top_dut_name, loc):
+    # hierarchy path, to prepend to signal names
+    dut = top_dut_name + "."
+    # color styles
+    style_input = GTKWColor.orange
+    style_output = GTKWColor.yellow
+    style_debug = GTKWColor.red
+    with open(base_name + ".gtkw", "wt") as gtkw_file:
+        gtkw = GTKWSave(gtkw_file)
+        gtkw.comment("Auto-generated by " + loc)
+        gtkw.dumpfile(base_name + ".vcd")
+        # set a reasonable zoom level
+        # also, move the marker to an interesting place
+        gtkw.zoom_markers(-22.9, 10500000)
+        gtkw.trace(dut + "clk")
+        # place a comment in the signal names panel
+        gtkw.blank("Shifter Demonstration")
+        with gtkw.group("prev port"):
+            gtkw.trace(dut + "op__sdir", color=style_input)
+            # demonstrates using decimal base (default is hex)
+            gtkw.trace(dut + "p_data_i[7:0]", color=style_input,
+                       datafmt='dec')
+            gtkw.trace(dut + "p_shift_i[7:0]", color=style_input,
+                       datafmt='dec')
+            gtkw.trace(dut + "p_valid_i", color=style_input)
+            gtkw.trace(dut + "p_ready_o", color=style_output)
+        with gtkw.group("debug"):
+            gtkw.blank("Some debug statements")
+            # change the displayed name in the panel
+            gtkw.trace("top.zero", alias='zero delay shift',
+                       color=style_debug)
+            gtkw.trace("top.interesting", color=style_debug)
+            gtkw.trace("top.test_case", alias="test case", color=style_debug)
+            gtkw.trace("top.msg", color=style_debug)
+        with gtkw.group("internal"):
+            gtkw.trace(dut + "fsm_state")
+            gtkw.trace(dut + "count[3:0]")
+            gtkw.trace(dut + "shift_reg[7:0]", datafmt='dec')
+        with gtkw.group("next port"):
+            gtkw.trace(dut + "n_data_o[7:0]", color=style_output,
+                       datafmt='dec')
+            gtkw.trace(dut + "n_valid_o", color=style_output)
+            gtkw.trace(dut + "n_ready_i", color=style_input)
+
+
+def test_shifter():
+    m = Module()
+    m.submodules.shf = dut = Shifter(8)
+    print("Shifter port names:")
+    for port in dut:
+        print("-", port.name)
+    # generate RTLIL
+    # try "proc; show" in yosys to check the data path
+    il = rtlil.convert(dut, ports=dut.ports())
+    with open("test_shifter.il", "w") as f:
+        f.write(il)
+
+    # Write the GTKWave project file
+    write_gtkw_v1("test_shifter", "top.shf", __file__)
+
+    # Describe a GTKWave document
+
+    # Style for signals, classes and groups
+    gtkwave_style = {
+        # Root selector. Gives default attributes for every signal.
+        '': {'base': 'dec'},
+        # color the traces, according to class
+        # class names are not hardcoded, they are just strings
+        'in': {'color': 'orange'},
+        'out': {'color': 'yellow'},
+        # signals in the debug group have a common color and module path
+        'debug': {'module': 'top', 'color': 'red'},
+        # display a different string replacing the signal name
+        'test_case': {'display': 'test case'},
+    }
+
+    # DOM style description for the trace pane
+    gtkwave_desc = [
+        # simple signal, without a class
+        # even so, it inherits the top-level root attributes
+        'clk',
+        # comment
+        {'comment': 'Shifter Demonstration'},
+        # collapsible signal group
+        ('prev port', [
+            # attach a class style for each signal
+            ('op__sdir', 'in'),
+            ('p_data_i[7:0]', 'in'),
+            ('p_shift_i[7:0]', 'in'),
+            ('p_valid_i', 'in'),
+            ('p_ready_o', 'out'),
+        ]),
+        # Signals in a signal group inherit the group attributes.
+        # In this case, a different module path and color.
+        ('debug', [
+            {'comment': 'Some debug statements'},
+            # inline attributes, instead of a class name
+            ('zero', {'display': 'zero delay shift'}),
+            'interesting',
+            'test_case',
+            'msg',
+        ]),
+        ('internal', [
+            'fsm_state',
+            'count[3:0]',
+            'shift_reg[7:0]',
+        ]),
+        ('next port', [
+            ('n_data_o[7:0]', 'out'),
+            ('n_valid_o', 'out'),
+            ('n_ready_i', 'in'),
+        ]),
+    ]
+
+    write_gtkw("test_shifter_v2.gtkw", "test_shifter.vcd",
+               gtkwave_desc,  gtkwave_style,
+               module="top.shf", loc=__file__, marker=10500000)
+
+    sim = Simulator(m)
+    sim.add_clock(1e-6)
+
+    # demonstrates adding extra debug signal traces
+    # they end up in the top module
+    #
+    zero = Signal()  # mark an interesting place
+    #
+    # demonstrates string traces
+    #
+    # display a message when the signal is high
+    # the low level is just an horizontal line
+    interesting = Signal(decoder=lambda v: 'interesting!' if v else '')
+    # choose between alternate strings based on numerical value
+    test_cases = ['', '13>>2', '3<<4', '21<<0']
+    test_case = Signal(8, decoder=lambda v: test_cases[v])
+    # hack to display arbitrary strings, like debug statements
+    msg = Signal(decoder=lambda _: msg.str)
+    msg.str = ''
+
+    def send(data, shift, direction):
+        # present input data and assert valid_i
+        yield dut.p.data_i.data.eq(data)
+        yield dut.p.data_i.shift.eq(shift)
+        yield dut.op.sdir.eq(direction)
+        yield dut.p.valid_i.eq(1)
+        yield
+        # wait for p.ready_o to be asserted
+        while not (yield dut.p.ready_o):
+            yield
+        # show current operation operation
+        if direction:
+            msg.str = f'{data}>>{shift}'
+        else:
+            msg.str = f'{data}<<{shift}'
+        # force dump of the above message by toggling the
+        # underlying signal
+        yield msg.eq(0)
+        yield msg.eq(1)
+        # clear input data and negate p.valid_i
+        yield dut.p.valid_i.eq(0)
+        yield dut.p.data_i.data.eq(0)
+        yield dut.p.data_i.shift.eq(0)
+        yield dut.op.sdir.eq(0)
+
+    def receive(expected):
+        # signal readiness to receive data
+        yield dut.n.ready_i.eq(1)
+        yield
+        # wait for n.valid_o to be asserted
+        while not (yield dut.n.valid_o):
+            yield
+        # read result
+        result = yield dut.n.data_o.data
+        # negate n.ready_i
+        yield dut.n.ready_i.eq(0)
+        # check result
+        assert result == expected
+        # finish displaying the current operation
+        msg.str = ''
+        yield msg.eq(0)
+        yield msg.eq(1)
+
+    def producer():
+        # 13 >> 2
+        yield from send(13, 2, 1)
+        # 3 << 4
+        yield from send(3, 4, 0)
+        # 21 << 0
+        # use a debug signal to mark an interesting operation
+        # in this case, it is a shift by zero
+        yield interesting.eq(1)
+        yield from send(21, 0, 0)
+        yield interesting.eq(0)
+
+    def consumer():
+        # the consumer is not in step with the producer, but the
+        # order of the results are preserved
+        # 13 >> 2 = 3
+        yield test_case.eq(1)
+        yield from receive(3)
+        # 3 << 4 = 48
+        yield test_case.eq(2)
+        yield from receive(48)
+        # 21 << 0 = 21
+        yield test_case.eq(3)
+        # you can look for the rising edge of this signal to quickly
+        # locate this point in the traces
+        yield zero.eq(1)
+        yield from receive(21)
+        yield zero.eq(0)
+        yield test_case.eq(0)
+
+    sim.add_sync_process(producer)
+    sim.add_sync_process(consumer)
+    sim_writer = sim.write_vcd(
+        "test_shifter.vcd",
+        # include additional signals in the trace dump
+        traces=[zero, interesting, test_case, msg],
+    )
+    with sim_writer:
+        sim.run()
+
+
+if __name__ == "__main__":
+    test_shifter()