slave) could be derived from ControlBase, for example.
"""
-from nmigen import Signal, Cat, Const, Mux, Module, Value, Elaboratable
+from nmigen import Signal, Cat, Const, Module, Value, Elaboratable
from nmigen.cli import verilog, rtlil
from nmigen.hdl.rec import Record
-from abc import ABCMeta, abstractmethod
from collections.abc import Sequence, Iterable
from collections import OrderedDict
-import inspect
import nmoperator
return list(self)
-def _spec(fn, name=None):
- if name is None:
- return fn()
- varnames = dict(inspect.getmembers(fn.__code__))['co_varnames']
- if 'name' in varnames:
- return fn(name=name)
- return fn()
-
-
class PrevControl(Elaboratable):
""" contains signals that come *from* the previous stage (both in and out)
* valid_i: previous stage indicating all incoming data is valid.
return self.s_ready_o # set dynamically by stage
return self._ready_o # return this when not under dynamic control
- def _connect_in(self, prev, direct=False, fn=None):
+ def _connect_in(self, prev, direct=False, fn=None, do_data=True):
""" internal helper function to connect stage to an input source.
do not use to connect stage-to-stage!
"""
valid_i = prev.valid_i if direct else prev.valid_i_test
+ res = [self.valid_i.eq(valid_i),
+ prev.ready_o.eq(self.ready_o)]
+ if do_data is False:
+ return res
data_i = fn(prev.data_i) if fn is not None else prev.data_i
- return [self.valid_i.eq(valid_i),
- prev.ready_o.eq(self.ready_o),
- nmoperator.eq(self.data_i, data_i),
- ]
+ return res + [nmoperator.eq(self.data_i, data_i)]
@property
def valid_i_test(self):
return m
def eq(self, i):
- return [self.data_i.eq(i.data_i),
+ return [nmoperator.eq(self.data_i, i.data_i),
self.ready_o.eq(i.ready_o),
self.valid_i.eq(i.valid_i)]
return self.ready_i & self.d_valid
return self.ready_i
- def connect_to_next(self, nxt):
+ def connect_to_next(self, nxt, do_data=True):
""" helper function to connect to the next stage data/valid/ready.
data/valid is passed *TO* nxt, and ready comes *IN* from nxt.
use this when connecting stage-to-stage
"""
- return [nxt.valid_i.eq(self.valid_o),
- self.ready_i.eq(nxt.ready_o),
- nmoperator.eq(nxt.data_i, self.data_o),
- ]
+ res = [nxt.valid_i.eq(self.valid_o),
+ self.ready_i.eq(nxt.ready_o)]
+ if do_data:
+ res.append(nmoperator.eq(nxt.data_i, self.data_o))
+ return res
- def _connect_out(self, nxt, direct=False, fn=None):
+ def _connect_out(self, nxt, direct=False, fn=None, do_data=True):
""" internal helper function to connect stage to an output source.
do not use to connect stage-to-stage!
"""
ready_i = nxt.ready_i if direct else nxt.ready_i_test
+ res = [nxt.valid_o.eq(self.valid_o),
+ self.ready_i.eq(ready_i)]
+ if not do_data:
+ return res
data_o = fn(nxt.data_o) if fn is not None else nxt.data_o
- return [nxt.valid_o.eq(self.valid_o),
- self.ready_i.eq(ready_i),
- nmoperator.eq(data_o, self.data_o),
- ]
+ return res + [nmoperator.eq(data_o, self.data_o)]
def elaborate(self, platform):
m = Module()
def ports(self):
return list(self)
-
-class StageCls(metaclass=ABCMeta):
- """ Class-based "Stage" API. requires instantiation (after derivation)
-
- see "Stage API" above.. Note: python does *not* require derivation
- from this class. All that is required is that the pipelines *have*
- the functions listed in this class. Derivation from this class
- is therefore merely a "courtesy" to maintainers.
- """
- @abstractmethod
- def ispec(self): pass # REQUIRED
- @abstractmethod
- def ospec(self): pass # REQUIRED
- #@abstractmethod
- #def setup(self, m, i): pass # OPTIONAL
- #@abstractmethod
- #def process(self, i): pass # REQUIRED
-
-
-class Stage(metaclass=ABCMeta):
- """ Static "Stage" API. does not require instantiation (after derivation)
-
- see "Stage API" above. Note: python does *not* require derivation
- from this class. All that is required is that the pipelines *have*
- the functions listed in this class. Derivation from this class
- is therefore merely a "courtesy" to maintainers.
- """
- @staticmethod
- @abstractmethod
- def ispec(): pass
-
- @staticmethod
- @abstractmethod
- def ospec(): pass
-
- #@staticmethod
- #@abstractmethod
- #def setup(m, i): pass
-
- #@staticmethod
- #@abstractmethod
- #def process(i): pass
-
-
-class StageChain(StageCls):
- """ pass in a list of stages, and they will automatically be
- chained together via their input and output specs into a
- combinatorial chain, to create one giant combinatorial block.
-
- the end result basically conforms to the exact same Stage API.
-
- * input to this class will be the input of the first stage
- * output of first stage goes into input of second
- * output of second goes into input into third
- * ... (etc. etc.)
- * the output of this class will be the output of the last stage
-
- NOTE: whilst this is very similar to ControlBase.connect(), it is
- *really* important to appreciate that StageChain is pure
- combinatorial and bypasses (does not involve, at all, ready/valid
- signalling of any kind).
-
- ControlBase.connect on the other hand respects, connects, and uses
- ready/valid signalling.
-
- Arguments:
-
- * :chain: a chain of combinatorial blocks conforming to the Stage API
- NOTE: StageChain.ispec and ospect have to have something
- to return (beginning and end specs of the chain),
- therefore the chain argument must be non-zero length
-
- * :specallocate: if set, new input and output data will be allocated
- and connected (eq'd) to each chained Stage.
- in some cases if this is not done, the nmigen warning
- "driving from two sources, module is being flattened"
- will be issued.
-
- NOTE: do NOT use StageChain with combinatorial blocks that have
- side-effects (state-based / clock-based input) or conditional
- (inter-chain) dependencies, unless you really know what you are doing.
- """
- def __init__(self, chain, specallocate=False):
- assert len(chain) > 0, "stage chain must be non-zero length"
- self.chain = chain
- self.specallocate = specallocate
-
- def ispec(self):
- """ returns the ispec of the first of the chain
- """
- return _spec(self.chain[0].ispec, "chainin")
-
- def ospec(self):
- """ returns the ospec of the last of the chain
- """
- return _spec(self.chain[-1].ospec, "chainout")
-
- def _specallocate_setup(self, m, i):
- o = i # in case chain is empty
- for (idx, c) in enumerate(self.chain):
- if hasattr(c, "setup"):
- c.setup(m, i) # stage may have some module stuff
- ofn = self.chain[idx].ospec # last assignment survives
- o = _spec(ofn, 'chainin%d' % idx)
- m.d.comb += nmoperator.eq(o, c.process(i)) # process input into "o"
- if idx == len(self.chain)-1:
- break
- ifn = self.chain[idx+1].ispec # new input on next loop
- i = _spec(ifn, 'chainin%d' % (idx+1))
- m.d.comb += nmoperator.eq(i, o) # assign to next input
- return o # last loop is the output
-
- def _noallocate_setup(self, m, i):
- o = i # in case chain is empty
- for (idx, c) in enumerate(self.chain):
- if hasattr(c, "setup"):
- c.setup(m, i) # stage may have some module stuff
- i = o = c.process(i) # store input into "o"
- return o # last loop is the output
-
- def setup(self, m, i):
- if self.specallocate:
- self.o = self._specallocate_setup(m, i)
- else:
- self.o = self._noallocate_setup(m, i)
-
- def process(self, i):
- return self.o # conform to Stage API: return last-loop output
-
-
-class ControlBase(Elaboratable):
- """ Common functions for Pipeline API. Note: a "pipeline stage" only
- exists (conceptually) when a ControlBase derivative is handed
- a Stage (combinatorial block)
- """
- def __init__(self, stage=None, in_multi=None, stage_ctl=False):
- """ Base class containing ready/valid/data to previous and next stages
-
- * p: contains ready/valid to the previous stage
- * n: contains ready/valid to the next stage
-
- Except when calling Controlbase.connect(), user must also:
- * add data_i member to PrevControl (p) and
- * add data_o member to NextControl (n)
- """
- self.stage = stage
-
- # set up input and output IO ACK (prev/next ready/valid)
- self.p = PrevControl(in_multi, stage_ctl)
- self.n = NextControl(stage_ctl)
-
- # set up the input and output data
- if stage is not None:
- self._new_data(self, self, "data")
-
- def _new_data(self, p, n, name):
- """ allocates new data_i and data_o
- """
- self.p.data_i = _spec(p.stage.ispec, "%s_i" % name)
- self.n.data_o = _spec(n.stage.ospec, "%s_o" % name)
-
- def connect_to_next(self, nxt):
- """ helper function to connect to the next stage data/valid/ready.
- """
- return self.n.connect_to_next(nxt.p)
-
- def _connect_in(self, prev):
- """ internal helper function to connect stage to an input source.
- do not use to connect stage-to-stage!
- """
- return self.p._connect_in(prev.p)
-
- def _connect_out(self, nxt):
- """ internal helper function to connect stage to an output source.
- do not use to connect stage-to-stage!
- """
- return self.n._connect_out(nxt.n)
-
- def connect(self, pipechain):
- """ connects a chain (list) of Pipeline instances together and
- links them to this ControlBase instance:
-
- in <----> self <---> out
- | ^
- v |
- [pipe1, pipe2, pipe3, pipe4]
- | ^ | ^ | ^
- v | v | v |
- out---in out--in out---in
-
- Also takes care of allocating data_i/data_o, by looking up
- the data spec for each end of the pipechain. i.e It is NOT
- necessary to allocate self.p.data_i or self.n.data_o manually:
- this is handled AUTOMATICALLY, here.
-
- Basically this function is the direct equivalent of StageChain,
- except that unlike StageChain, the Pipeline logic is followed.
-
- Just as StageChain presents an object that conforms to the
- Stage API from a list of objects that also conform to the
- Stage API, an object that calls this Pipeline connect function
- has the exact same pipeline API as the list of pipline objects
- it is called with.
-
- Thus it becomes possible to build up larger chains recursively.
- More complex chains (multi-input, multi-output) will have to be
- done manually.
-
- Argument:
-
- * :pipechain: - a sequence of ControlBase-derived classes
- (must be one or more in length)
-
- Returns:
-
- * a list of eq assignments that will need to be added in
- an elaborate() to m.d.comb
- """
- assert len(pipechain) > 0, "pipechain must be non-zero length"
- eqs = [] # collated list of assignment statements
-
- # connect inter-chain
- for i in range(len(pipechain)-1):
- pipe1 = pipechain[i]
- pipe2 = pipechain[i+1]
- eqs += pipe1.connect_to_next(pipe2)
-
- # connect front and back of chain to ourselves
- front = pipechain[0]
- end = pipechain[-1]
- self._new_data(front, end, "chain") # NOTE: REPLACES existing data
- eqs += front._connect_in(self)
- eqs += end._connect_out(self)
-
- return eqs
-
- @property
- def data_r(self):
- if self.stage and hasattr(self.stage, "process"):
- return self.stage.process(self.p.data_i)
- return self.p.data_i
-
- def _postprocess(self, i): # XXX DISABLED
- return i # RETURNS INPUT
- if hasattr(self.stage, "postprocess"):
- return self.stage.postprocess(i)
- return i
-
- def set_input(self, i):
- """ helper function to set the input data
- """
- return nmoperator.eq(self.p.data_i, i)
-
- def __iter__(self):
- yield from self.p
- yield from self.n
-
- def ports(self):
- return list(self)
-
- def elaborate(self, platform):
- """ handles case where stage has dynamic ready/valid functions
- """
- m = Module()
- m.submodules.p = self.p
- m.submodules.n = self.n
-
- if self.stage is not None and hasattr(self.stage, "setup"):
- self.stage.setup(m, self.p.data_i)
-
- if not self.p.stage_ctl:
- return m
-
- # intercept the previous (outgoing) "ready", combine with stage ready
- m.d.comb += self.p.s_ready_o.eq(self.p._ready_o & self.stage.d_ready)
-
- # intercept the next (incoming) "ready" and combine it with data valid
- sdv = self.stage.d_valid(self.n.ready_i)
- m.d.comb += self.n.d_valid.eq(self.n.ready_i & sdv)
-
- return m
-