Demonstrates creating stylish GTKWave "save" files from python
[soc.git] / src / soc / experiment / alu_fsm.py
index f95c865d941345609afdebb9a1bc0896a9ede3ff..3a3d9db111af3d9152d21017b96048746662562f 100644 (file)
@@ -1,7 +1,9 @@
 """Simple example of a FSM-based ALU
 
 This demonstrates a design that follows the valid/ready protocol of the
-ALU, but with a FSM implementation, instead of a pipeline.
+ALU, but with a FSM implementation, instead of a pipeline.  It is also
+intended to comply with both the CompALU API and the nmutil Pipeline API
+(Liskov Substitution Principle)
 
 The basic rules are:
 
@@ -11,73 +13,308 @@ The basic rules are:
    it accepts the input data and moves on.
 4) The FSM stays in the Done state while n.ready_i is low, otherwise
    it releases the output data and goes back to the Idle state.
+
 """
 
-from nmigen import Elaboratable, Signal, Module
-from nmigen.back.pysim import Simulator
+from nmigen import Elaboratable, Signal, Module, Cat
+cxxsim = False
+if cxxsim:
+    from nmigen.sim.cxxsim import Simulator, Settle
+else:
+    from nmigen.back.pysim import Simulator, Settle
+from nmigen.cli import rtlil
+from math import log2
+from nmutil.iocontrol import PrevControl, NextControl
+
+from soc.fu.base_input_record import CompOpSubsetBase
+from soc.decoder.power_enums import (MicrOp, Function)
+
+from vcd.gtkw import GTKWSave, GTKWColor
+
+
+class CompFSMOpSubset(CompOpSubsetBase):
+    def __init__(self, name=None):
+        layout = (('sdir', 1),
+                  )
+
+        super().__init__(layout, name=name)
+
+
+
+class Dummy:
+    pass
 
 
 class Shifter(Elaboratable):
     """Simple sequential shifter
 
     Prev port data:
-    * p.data_i.data:  Value to be shifted
-    * p.data_i.shift: Shift amount
-    * p.data_i.dir:   Shift direction
+    * p.data_i.data:  value to be shifted
+    * p.data_i.shift: shift amount
+    *                 When zero, no shift occurs.
+    *                 On POWER, range is 0 to 63 for 32-bit,
+    *                 and 0 to 127 for 64-bit.
+    *                 Other values wrap around.
+
+    Operation type
+    * op.sdir:       shift direction (0 = left, 1 = right)
 
     Next port data:
-    * n.data_o: Shifted value
+    * n.data_o.data: shifted value
     """
     class PrevData:
         def __init__(self, width):
             self.data = Signal(width, name="p_data_i")
             self.shift = Signal(width, name="p_shift_i")
-            self.dir = Signal(name="p_dir_i")
+            self.ctx = Dummy() # comply with CompALU API
 
-    class PrevPort:
-        def __init__(self, width):
-            self.data_i = Shifter.PrevData(width)
-            self.valid_i = Signal(name="p_valid_i")
-            self.ready_o = Signal(name="p_ready_o")
+        def _get_data(self):
+            return [self.data, self.shift]
 
-    class NextPort:
+    class NextData:
         def __init__(self, width):
-            self.data_o = Signal(width, name="n_data_o")
-            self.valid_o = Signal(name="n_valid_o")
-            self.ready_i = Signal(name="n_ready_i")
+            self.data = Signal(width, name="n_data_o")
+
+        def _get_data(self):
+            return [self.data]
 
     def __init__(self, width):
         self.width = width
-        self.p = self.PrevPort(width)
-        self.n = self.NextPort(width)
+        self.p = PrevControl()
+        self.n = NextControl()
+        self.p.data_i = Shifter.PrevData(width)
+        self.n.data_o = Shifter.NextData(width)
+
+        # more pieces to make this example class comply with the CompALU API
+        self.op = CompFSMOpSubset(name="op")
+        self.p.data_i.ctx.op = self.op
+        self.i = self.p.data_i._get_data()
+        self.out = self.n.data_o._get_data()
 
     def elaborate(self, platform):
         m = Module()
-        # TODO: Implement Module
+
+        m.submodules.p = self.p
+        m.submodules.n = self.n
+
+        # Note:
+        # It is good practice to design a sequential circuit as
+        # a data path and a control path.
+
+        # Data path
+        # ---------
+        # The idea is to have a register that can be
+        # loaded or shifted (left and right).
+
+        # the control signals
+        load = Signal()
+        shift = Signal()
+        direction = Signal()
+        # the data flow
+        shift_in = Signal(self.width)
+        shift_left_by_1 = Signal(self.width)
+        shift_right_by_1 = Signal(self.width)
+        next_shift = Signal(self.width)
+        # the register
+        shift_reg = Signal(self.width, reset_less=True)
+        # build the data flow
+        m.d.comb += [
+            # connect input and output
+            shift_in.eq(self.p.data_i.data),
+            self.n.data_o.data.eq(shift_reg),
+            # generate shifted views of the register
+            shift_left_by_1.eq(Cat(0, shift_reg[:-1])),
+            shift_right_by_1.eq(Cat(shift_reg[1:], 0)),
+        ]
+        # choose the next value of the register according to the
+        # control signals
+        # default is no change
+        m.d.comb += next_shift.eq(shift_reg)
+        with m.If(load):
+            m.d.comb += next_shift.eq(shift_in)
+        with m.Elif(shift):
+            with m.If(direction):
+                m.d.comb += next_shift.eq(shift_right_by_1)
+            with m.Else():
+                m.d.comb += next_shift.eq(shift_left_by_1)
+
+        # register the next value
+        m.d.sync += shift_reg.eq(next_shift)
+
+        # Control path
+        # ------------
+        # The idea is to have a SHIFT state where the shift register
+        # is shifted every cycle, while a counter decrements.
+        # This counter is loaded with shift amount in the initial state.
+        # The SHIFT state is left when the counter goes to zero.
+
+        # Shift counter
+        shift_width = int(log2(self.width)) + 1
+        next_count = Signal(shift_width)
+        count = Signal(shift_width, reset_less=True)
+        m.d.sync += count.eq(next_count)
+
+        with m.FSM():
+            with m.State("IDLE"):
+                m.d.comb += [
+                    # keep p.ready_o active on IDLE
+                    self.p.ready_o.eq(1),
+                    # keep loading the shift register and shift count
+                    load.eq(1),
+                    next_count.eq(self.p.data_i.shift),
+                ]
+                # capture the direction bit as well
+                m.d.sync += direction.eq(self.op.sdir)
+                with m.If(self.p.valid_i):
+                    # Leave IDLE when data arrives
+                    with m.If(next_count == 0):
+                        # short-circuit for zero shift
+                        m.next = "DONE"
+                    with m.Else():
+                        m.next = "SHIFT"
+            with m.State("SHIFT"):
+                m.d.comb += [
+                    # keep shifting, while counter is not zero
+                    shift.eq(1),
+                    # decrement the shift counter
+                    next_count.eq(count - 1),
+                ]
+                with m.If(next_count == 0):
+                    # exit when shift counter goes to zero
+                    m.next = "DONE"
+            with m.State("DONE"):
+                # keep n.valid_o active while the data is not accepted
+                m.d.comb += self.n.valid_o.eq(1)
+                with m.If(self.n.ready_i):
+                    # go back to IDLE when the data is accepted
+                    m.next = "IDLE"
+
         return m
 
     def __iter__(self):
+        yield self.op.sdir
         yield self.p.data_i.data
         yield self.p.data_i.shift
-        yield self.p.data_i.dir
         yield self.p.valid_i
         yield self.p.ready_o
         yield self.n.ready_i
         yield self.n.valid_o
-        yield self.n.data_o
+        yield self.n.data_o.data
 
     def ports(self):
         return list(self)
 
 
+# Write a formatted GTKWave "save" file
+def write_gtkw(base_name, top_dut_name, loc):
+    # hierarchy path, to prepend to signal names
+    dut = top_dut_name + "."
+    # color styles
+    style_input = GTKWColor.orange
+    style_output = GTKWColor.yellow
+    with open(base_name + ".gtkw", "wt") as gtkw_file:
+        gtkw = GTKWSave(gtkw_file)
+        gtkw.comment("Auto-generated by " + loc)
+        gtkw.dumpfile(base_name + ".vcd")
+        # set a reasonable zoom level
+        # also, move the marker to an interesting place
+        gtkw.zoom_markers(-22.9, 10500000)
+        gtkw.trace(dut + "clk")
+        # place a comment in the signal names panel
+        gtkw.blank("Shifter Demonstration")
+        with gtkw.group("prev port"):
+            gtkw.trace(dut + "op__sdir", color=style_input)
+            # demonstrates using decimal base (default is hex)
+            gtkw.trace(dut + "p_data_i[7:0]", color=style_input,
+                       datafmt='dec')
+            gtkw.trace(dut + "p_shift_i[7:0]", color=style_input,
+                       datafmt='dec')
+            gtkw.trace(dut + "p_valid_i", color=style_input)
+            gtkw.trace(dut + "p_ready_o", color=style_output)
+        with gtkw.group("internal"):
+            gtkw.trace(dut + "fsm_state")
+            gtkw.trace(dut + "count[3:0]")
+            gtkw.trace(dut + "shift_reg[7:0]", datafmt='dec')
+        with gtkw.group("next port"):
+            gtkw.trace(dut + "n_data_o[7:0]", color=style_output,
+                       datafmt='dec')
+            gtkw.trace(dut + "n_valid_o", color=style_output)
+            gtkw.trace(dut + "n_ready_i", color=style_input)
+
+
 def test_shifter():
     m = Module()
     m.submodules.shf = dut = Shifter(8)
     print("Shifter port names:")
     for port in dut:
         print("-", port.name)
+    # generate RTLIL
+    # try "proc; show" in yosys to check the data path
+    il = rtlil.convert(dut, ports=dut.ports())
+    with open("test_shifter.il", "w") as f:
+        f.write(il)
+
+    # Write the GTKWave project file
+    write_gtkw("test_shifter", "top.shf", __file__)
+
     sim = Simulator(m)
-    # Todo: Implement Simulation
+    sim.add_clock(1e-6)
+
+    def send(data, shift, direction):
+        # present input data and assert valid_i
+        yield dut.p.data_i.data.eq(data)
+        yield dut.p.data_i.shift.eq(shift)
+        yield dut.op.sdir.eq(direction)
+        yield dut.p.valid_i.eq(1)
+        yield
+        # wait for p.ready_o to be asserted
+        while not (yield dut.p.ready_o):
+            yield
+        # clear input data and negate p.valid_i
+        yield dut.p.valid_i.eq(0)
+        yield dut.p.data_i.data.eq(0)
+        yield dut.p.data_i.shift.eq(0)
+        yield dut.op.sdir.eq(0)
+
+    def receive(expected):
+        # signal readiness to receive data
+        yield dut.n.ready_i.eq(1)
+        yield
+        # wait for n.valid_o to be asserted
+        while not (yield dut.n.valid_o):
+            yield
+        # read result
+        result = yield dut.n.data_o.data
+        # negate n.ready_i
+        yield dut.n.ready_i.eq(0)
+        # check result
+        assert result == expected
+
+    def producer():
+        # 13 >> 2
+        yield from send(13, 2, 1)
+        # 3 << 4
+        yield from send(3, 4, 0)
+        # 21 << 0
+        yield from send(21, 0, 0)
+
+    def consumer():
+        # the consumer is not in step with the producer, but the
+        # order of the results are preserved
+        # 13 >> 2 = 3
+        yield from receive(3)
+        # 3 << 4 = 48
+        yield from receive(48)
+        # 21 << 0 = 21
+        yield from receive(21)
+
+    sim.add_sync_process(producer)
+    sim.add_sync_process(consumer)
+    sim_writer = sim.write_vcd(
+        "test_shifter.vcd",
+    )
+    with sim_writer:
+        sim.run()
 
 
 if __name__ == "__main__":