1 """ Example 5: Making use of PyRTL and Introspection. """
3 from collections
.abc
import Sequence
5 from nmigen
import Signal
6 from nmigen
.hdl
.rec
import Record
7 from nmigen
import tracer
8 from nmigen
.compat
.fhdl
.bitcontainer
import value_bits_sign
9 from contextlib
import contextmanager
11 from singlepipe
import eq
, StageCls
, ControlBase
, BufferedPipeline
12 from singlepipe
import UnbufferedPipeline
15 # The following example shows how pyrtl can be used to make some interesting
16 # hardware structures using python introspection. In particular, this example
17 # makes a N-stage pipeline structure. Any specific pipeline is then a derived
18 # class of SimplePipeline where methods with names starting with "stage" are
19 # stages, and new members with names not starting with "_" are to be registered
22 def like(value
, rname
, pipe
):
23 if isinstance(value
, ObjectProxy
):
24 return ObjectProxy
.like(None, value
, name
=rname
, reset_less
=True)
26 return Signal(value_bits_sign(value
), name
=rname
,
28 return Signal
.like(value
, name
=rname
, reset_less
=True)
32 def __init__(self
, m
, name
=None, pipemode
=False):
35 name
= tracer
.get_var_name(default
=None)
37 self
._pipemode
= pipemode
40 def like(cls
, m
, value
, name
=None, src_loc_at
=0, **kwargs
):
41 name
= name
or tracer
.get_var_name(depth
=2 + src_loc_at
,
44 src_loc_at_1
= 1 + src_loc_at
45 r
= ObjectProxy(m
, value
.name
)
46 for a
in value
.ports():
53 for a
in self
.ports():
55 ai
= getattr(i
, aname
)
61 for aname
in dir(self
):
62 a
= getattr(self
, aname
)
63 if isinstance(a
, Signal
) or isinstance(a
, ObjectProxy
) or \
64 isinstance(a
, Record
):
68 def __setattr__(self
, name
, value
):
69 if name
.startswith('_') or name
== 'name':
70 # do not do anything tricky with variables starting with '_'
71 object.__setattr
__(self
, name
, value
)
73 #rname = "%s_%s" % (self.name, name)
75 new_pipereg
= like(value
, rname
, self
._m
)
76 object.__setattr
__(self
, name
, new_pipereg
)
78 self
._m
.d
.sync
+= eq(new_pipereg
, value
)
82 """ Pipeline builder stage with auto generation of pipeline registers.
85 def __init__(self
, name
, m
, prev
=None, pipemode
=False, ispec
=None):
87 self
._stagename
= name
89 self
._prev
_stage
= prev
92 print ("prev", prev
._stagename
, prev
._preg
_map
)
93 if prev
._stagename
in prev
._preg
_map
:
94 m
= prev
._preg
_map
[prev
._stagename
]
95 self
._preg
_map
[prev
._stagename
] = m
96 #for k, v in m.items():
97 #m[k] = like(v, k, self._m)
98 if '__nextstage__' in prev
._preg
_map
:
99 m
= prev
._preg
_map
['__nextstage__']
100 self
._preg
_map
[self
._stagename
] = m
101 #for k, v in m.items():
102 #m[k] = like(v, k, self._m)
103 print ("make current", self
._stagename
, m
)
104 self
._pipemode
= pipemode
107 def __getattr__(self
, name
):
109 v
= self
._preg
_map
[self
._stagename
][name
]
111 #return like(v, name, self._m)
113 raise AttributeError(
114 'error, no pipeline register "%s" defined for stage %s'
115 % (name
, self
._stagename
))
117 def __setattr__(self
, name
, value
):
118 if name
.startswith('_'):
119 # do not do anything tricky with variables starting with '_'
120 object.__setattr
__(self
, name
, value
)
122 pipereg_id
= self
._stagename
123 rname
= 'pipereg_' + pipereg_id
+ '_' + name
124 new_pipereg
= like(value
, rname
, self
._m
)
125 next_stage
= '__nextstage__'
126 if next_stage
not in self
._preg
_map
:
127 self
._preg
_map
[next_stage
] = {}
128 self
._preg
_map
[next_stage
][name
] = new_pipereg
130 self
._eqs
.append(value
)
131 print ("!pipemode: append", new_pipereg
, value
)
132 #self._m.d.comb += assign
134 print ("!pipemode: assign", new_pipereg
, value
)
135 assign
= eq(new_pipereg
, value
)
136 self
._m
.d
.sync
+= assign
139 class AutoStage(StageCls
):
140 def __init__(self
, inspecs
, outspecs
, eqs
):
141 self
.inspecs
, self
.outspecs
, self
.eqs
= inspecs
, outspecs
, eqs
142 def ispec(self
): return self
.like(self
.inspecs
)
143 def ospec(self
): return self
.like(self
.outspecs
)
144 def like(self
, specs
):
147 res
.append(like(v
, v
.name
, None))
150 def process(self
, i
):
152 def setup(self
, m
, i
):
153 m
.d
.comb
+= eq(self
.inspecs
, i
)
157 def __init__(self
, m
, pipemode
=False, pipetype
=None):
159 self
.pipemode
= pipemode
160 self
.pipetype
= pipetype
163 def Stage(self
, name
, prev
=None, ispec
=None):
164 stage
= PipelineStage(name
, self
.m
, prev
, self
.pipemode
, ispec
=ispec
)
166 yield stage
, stage
._m
171 print ("use ispec", stage
._ispec
)
172 inspecs
= stage
._ispec
174 inspecs
= self
.get_specs(stage
, name
)
175 outspecs
= self
.get_specs(stage
, '__nextstage__', liked
=True)
176 s
= AutoStage(inspecs
, outspecs
, stage
._eqs
)
177 self
.stages
.append(s
)
179 def get_specs(self
, stage
, name
, liked
=False):
180 if name
in stage
._preg
_map
:
182 for k
, v
in stage
._preg
_map
[name
].items():
183 #v = like(v, k, stage._m)
192 def __exit__(self
, *args
):
196 for s
in self
.stages
:
197 print (s
, s
.inspecs
, s
.outspecs
)
198 if self
.pipetype
== 'buffered':
199 p
= BufferedPipeline(s
)
201 p
= UnbufferedPipeline(s
)
203 self
.m
.submodules
+= p
205 self
.m
.d
.comb
+= cb
.connect(pipes
)
208 class SimplePipeline
:
209 """ Pipeline builder with auto generation of pipeline registers.
212 def __init__(self
, m
):
214 self
._pipeline
_register
_map
= {}
215 self
._current
_stage
_num
= 0
219 for method
in dir(self
):
220 if method
.startswith('stage'):
221 stage_list
.append(method
)
222 for stage
in sorted(stage_list
):
223 stage_method
= getattr(self
, stage
)
225 self
._current
_stage
_num
+= 1
227 def __getattr__(self
, name
):
229 return self
._pipeline
_register
_map
[self
._current
_stage
_num
][name
]
231 raise AttributeError(
232 'error, no pipeline register "%s" defined for stage %d'
233 % (name
, self
._current
_stage
_num
))
235 def __setattr__(self
, name
, value
):
236 if name
.startswith('_'):
237 # do not do anything tricky with variables starting with '_'
238 object.__setattr
__(self
, name
, value
)
240 next_stage
= self
._current
_stage
_num
+ 1
241 pipereg_id
= str(self
._current
_stage
_num
) + 'to' + str(next_stage
)
242 rname
= 'pipereg_' + pipereg_id
+ '_' + name
243 #new_pipereg = Signal(value_bits_sign(value), name=rname,
245 if isinstance(value
, ObjectProxy
):
246 new_pipereg
= ObjectProxy
.like(self
._m
, value
,
247 name
=rname
, reset_less
= True)
249 new_pipereg
= Signal
.like(value
, name
=rname
, reset_less
= True)
250 if next_stage
not in self
._pipeline
_register
_map
:
251 self
._pipeline
_register
_map
[next_stage
] = {}
252 self
._pipeline
_register
_map
[next_stage
][name
] = new_pipereg
253 self
._m
.d
.sync
+= eq(new_pipereg
, value
)