1 """ Example 5: Making use of PyRTL and Introspection. """
3 from collections
.abc
import Sequence
5 from nmigen
import Signal
6 from nmigen
.hdl
.rec
import Record
7 from nmigen
import tracer
8 from nmigen
.compat
.fhdl
.bitcontainer
import value_bits_sign
9 from contextlib
import contextmanager
11 from singlepipe
import eq
, StageCls
, ControlBase
, BufferedPipeline
12 from singlepipe
import UnbufferedPipeline
15 # The following example shows how pyrtl can be used to make some interesting
16 # hardware structures using python introspection. In particular, this example
17 # makes a N-stage pipeline structure. Any specific pipeline is then a derived
18 # class of SimplePipeline where methods with names starting with "stage" are
19 # stages, and new members with names not starting with "_" are to be registered
22 def like(value
, rname
, pipe
):
23 if isinstance(value
, ObjectProxy
):
24 return ObjectProxy
.like(pipe
, value
, name
=rname
, reset_less
=True)
26 return Signal(value_bits_sign(value
), name
=rname
,
28 return Signal
.like(value
, name
=rname
, reset_less
=True)
32 def __init__(self
, pipe
, name
=None):
35 name
= tracer
.get_var_name(default
=None)
39 def like(cls
, pipe
, value
, name
=None, src_loc_at
=0, **kwargs
):
40 name
= name
or tracer
.get_var_name(depth
=2 + src_loc_at
,
43 src_loc_at_1
= 1 + src_loc_at
44 r
= ObjectProxy(pipe
, value
.name
)
45 for a
in value
.ports():
52 for a
in self
.ports():
54 ai
= getattr(i
, aname
)
60 for aname
in dir(self
):
61 a
= getattr(self
, aname
)
62 if isinstance(a
, Signal
) or isinstance(a
, ObjectProxy
) or \
63 isinstance(a
, Record
):
67 def __setattr__(self
, name
, value
):
68 if name
.startswith('_') or name
== 'name':
69 # do not do anything tricky with variables starting with '_'
70 object.__setattr
__(self
, name
, value
)
72 #rname = "%s_%s" % (self.name, name)
74 new_pipereg
= like(value
, rname
, self
._pipe
)
75 object.__setattr
__(self
, name
, new_pipereg
)
77 self
._pipe
.sync
+= eq(new_pipereg
, value
)
81 """ Pipeline builder stage with auto generation of pipeline registers.
84 def __init__(self
, name
, m
, prev
=None, pipemode
=False, ispec
=None):
86 self
._stagename
= name
88 self
._prev
_stage
= prev
91 print ("prev", prev
._stagename
, prev
._preg
_map
)
92 if prev
._stagename
in prev
._preg
_map
:
93 m
= prev
._preg
_map
[prev
._stagename
]
94 self
._preg
_map
[prev
._stagename
] = m
95 #for k, v in m.items():
96 #m[k] = like(v, k, self._m)
97 if '__nextstage__' in prev
._preg
_map
:
98 m
= prev
._preg
_map
['__nextstage__']
99 self
._preg
_map
[self
._stagename
] = m
100 #for k, v in m.items():
101 #m[k] = like(v, k, self._m)
102 print ("make current", self
._stagename
, m
)
103 self
._pipemode
= pipemode
106 def __getattr__(self
, name
):
108 v
= self
._preg
_map
[self
._stagename
][name
]
110 #return like(v, name, self._m)
112 raise AttributeError(
113 'error, no pipeline register "%s" defined for stage %s'
114 % (name
, self
._stagename
))
116 def __setattr__(self
, name
, value
):
117 if name
.startswith('_'):
118 # do not do anything tricky with variables starting with '_'
119 object.__setattr
__(self
, name
, value
)
121 pipereg_id
= self
._stagename
122 rname
= 'pipereg_' + pipereg_id
+ '_' + name
123 new_pipereg
= like(value
, rname
, self
._m
)
124 next_stage
= '__nextstage__'
125 if next_stage
not in self
._preg
_map
:
126 self
._preg
_map
[next_stage
] = {}
127 self
._preg
_map
[next_stage
][name
] = new_pipereg
129 self
._eqs
.append(value
)
130 print ("!pipemode: append", new_pipereg
, value
)
131 #self._m.d.comb += assign
133 print ("!pipemode: assign", new_pipereg
, value
)
134 assign
= eq(new_pipereg
, value
)
135 self
._m
.d
.sync
+= assign
138 class AutoStage(StageCls
):
139 def __init__(self
, inspecs
, outspecs
, eqs
):
140 self
.inspecs
, self
.outspecs
, self
.eqs
= inspecs
, outspecs
, eqs
141 def ispec(self
): return self
.like(self
.inspecs
)
142 def ospec(self
): return self
.like(self
.outspecs
)
143 def like(self
, specs
):
146 res
.append(like(v
, v
.name
, None))
149 def process(self
, i
):
151 def setup(self
, m
, i
):
152 m
.d
.comb
+= eq(self
.inspecs
, i
)
156 def __init__(self
, m
, pipemode
=False, pipetype
=None):
158 self
.pipemode
= pipemode
159 self
.pipetype
= pipetype
162 def Stage(self
, name
, prev
=None, ispec
=None):
163 stage
= PipelineStage(name
, self
.m
, prev
, self
.pipemode
, ispec
=ispec
)
165 yield stage
, stage
._m
170 print ("use ispec", stage
._ispec
)
171 inspecs
= stage
._ispec
173 inspecs
= self
.get_specs(stage
, name
)
174 outspecs
= self
.get_specs(stage
, '__nextstage__', liked
=True)
175 s
= AutoStage(inspecs
, outspecs
, stage
._eqs
)
176 self
.stages
.append(s
)
178 def get_specs(self
, stage
, name
, liked
=False):
179 if name
in stage
._preg
_map
:
181 for k
, v
in stage
._preg
_map
[name
].items():
182 #v = like(v, k, stage._m)
191 def __exit__(self
, *args
):
195 for s
in self
.stages
:
196 print (s
, s
.inspecs
, s
.outspecs
)
197 if self
.pipetype
== 'buffered':
198 p
= BufferedPipeline(s
)
200 p
= UnbufferedPipeline(s
)
202 self
.m
.submodules
+= p
204 self
.m
.d
.comb
+= cb
.connect(pipes
)
207 class SimplePipeline
:
208 """ Pipeline builder with auto generation of pipeline registers.
211 def __init__(self
, pipe
):
213 self
._pipeline
_register
_map
= {}
214 self
._current
_stage
_num
= 0
218 for method
in dir(self
):
219 if method
.startswith('stage'):
220 stage_list
.append(method
)
221 for stage
in sorted(stage_list
):
222 stage_method
= getattr(self
, stage
)
224 self
._current
_stage
_num
+= 1
226 def __getattr__(self
, name
):
228 return self
._pipeline
_register
_map
[self
._current
_stage
_num
][name
]
230 raise AttributeError(
231 'error, no pipeline register "%s" defined for stage %d'
232 % (name
, self
._current
_stage
_num
))
234 def __setattr__(self
, name
, value
):
235 if name
.startswith('_'):
236 # do not do anything tricky with variables starting with '_'
237 object.__setattr
__(self
, name
, value
)
239 next_stage
= self
._current
_stage
_num
+ 1
240 pipereg_id
= str(self
._current
_stage
_num
) + 'to' + str(next_stage
)
241 rname
= 'pipereg_' + pipereg_id
+ '_' + name
242 #new_pipereg = Signal(value_bits_sign(value), name=rname,
244 if isinstance(value
, ObjectProxy
):
245 new_pipereg
= ObjectProxy
.like(self
._pipe
, value
,
246 name
=rname
, reset_less
= True)
248 new_pipereg
= Signal
.like(value
, name
=rname
, reset_less
= True)
249 if next_stage
not in self
._pipeline
_register
_map
:
250 self
._pipeline
_register
_map
[next_stage
] = {}
251 self
._pipeline
_register
_map
[next_stage
][name
] = new_pipereg
252 self
._pipe
.sync
+= eq(new_pipereg
, value
)