12081aca9caffa9e1752630cbb997c9000bd509d
1 from collections
import OrderedDict
, namedtuple
2 from collections
.abc
import Iterable
3 from contextlib
import contextmanager
, _GeneratorContextManager
4 from functools
import wraps
8 from .._utils
import flatten
, bits_for
, deprecated
16 __all__
= ["SyntaxError", "SyntaxWarning", "Module"]
19 class SyntaxError(Exception):
23 class SyntaxWarning(Warning):
27 class _ModuleBuilderProxy
:
28 def __init__(self
, builder
, depth
):
29 object.__setattr
__(self
, "_builder", builder
)
30 object.__setattr
__(self
, "_depth", depth
)
33 class _ModuleBuilderDomain(_ModuleBuilderProxy
):
34 def __init__(self
, builder
, depth
, domain
):
35 super().__init
__(builder
, depth
)
38 def __iadd__(self
, assigns
):
39 self
._builder
._add
_statement
(assigns
, domain
=self
._domain
, depth
=self
._depth
)
43 class _ModuleBuilderDomains(_ModuleBuilderProxy
):
44 def __getattr__(self
, name
):
45 if name
== "submodules":
46 warnings
.warn("Using '<module>.d.{}' would add statements to clock domain {!r}; "
47 "did you mean <module>.{} instead?"
48 .format(name
, name
, name
),
49 SyntaxWarning, stacklevel
=2)
54 return _ModuleBuilderDomain(self
._builder
, self
._depth
, domain
)
56 def __getitem__(self
, name
):
57 return self
.__getattr
__(name
)
59 def __setattr__(self
, name
, value
):
61 object.__setattr
__(self
, name
, value
)
62 elif not isinstance(value
, _ModuleBuilderDomain
):
63 raise AttributeError("Cannot assign 'd.{}' attribute; did you mean 'd.{} +='?"
66 def __setitem__(self
, name
, value
):
67 return self
.__setattr
__(name
, value
)
70 class _ModuleBuilderRoot
:
71 def __init__(self
, builder
, depth
):
72 self
._builder
= builder
73 self
.domain
= self
.d
= _ModuleBuilderDomains(builder
, depth
)
75 def __getattr__(self
, name
):
76 if name
in ("comb", "sync"):
77 raise AttributeError("'{}' object has no attribute '{}'; did you mean 'd.{}'?"
78 .format(type(self
).__name
__, name
, name
))
79 raise AttributeError("'{}' object has no attribute '{}'"
80 .format(type(self
).__name
__, name
))
83 class _ModuleBuilderSubmodules
:
84 def __init__(self
, builder
):
85 object.__setattr
__(self
, "_builder", builder
)
87 def __iadd__(self
, modules
):
88 for module
in flatten([modules
]):
89 self
._builder
._add
_submodule
(module
)
92 def __setattr__(self
, name
, submodule
):
93 self
._builder
._add
_submodule
(submodule
, name
)
95 def __setitem__(self
, name
, value
):
96 return self
.__setattr
__(name
, value
)
98 def __getattr__(self
, name
):
99 return self
._builder
._get
_submodule
(name
)
101 def __getitem__(self
, name
):
102 return self
.__getattr
__(name
)
105 class _ModuleBuilderDomainSet
:
106 def __init__(self
, builder
):
107 object.__setattr
__(self
, "_builder", builder
)
109 def __iadd__(self
, domains
):
110 for domain
in flatten([domains
]):
111 if not isinstance(domain
, ClockDomain
):
112 raise TypeError("Only clock domains may be added to `m.domains`, not {!r}"
114 self
._builder
._add
_domain
(domain
)
117 def __setattr__(self
, name
, domain
):
118 if not isinstance(domain
, ClockDomain
):
119 raise TypeError("Only clock domains may be added to `m.domains`, not {!r}"
121 if domain
.name
!= name
:
122 raise NameError("Clock domain name {!r} must match name in `m.domains.{} += ...` "
124 .format(domain
.name
, name
))
125 self
._builder
._add
_domain
(domain
)
128 # It's not particularly clean to depend on an internal interface, but, unfortunately, __bool__
129 # must be defined on a class to be called during implicit conversion.
130 class _GuardedContextManager(_GeneratorContextManager
):
131 def __init__(self
, keyword
, func
, args
, kwds
):
132 self
.keyword
= keyword
133 return super().__init
__(func
, args
, kwds
)
136 raise SyntaxError("`if m.{kw}(...):` does not work; use `with m.{kw}(...)`"
137 .format(kw
=self
.keyword
))
140 def _guardedcontextmanager(keyword
):
143 def helper(*args
, **kwds
):
144 return _GuardedContextManager(keyword
, func
, args
, kwds
)
150 def __init__(self
, state
, encoding
, decoding
):
152 self
.encoding
= encoding
153 self
.decoding
= decoding
155 def ongoing(self
, name
):
156 if name
not in self
.encoding
:
157 self
.encoding
[name
] = len(self
.encoding
)
158 return Operator("==", [self
.state
, self
.encoding
[name
]], src_loc_at
=0)
161 class Module(_ModuleBuilderRoot
, Elaboratable
):
163 def __init_subclass__(cls
):
164 raise SyntaxError("Instead of inheriting from `Module`, inherit from `Elaboratable` "
165 "and return a `Module` from the `elaborate(self, platform)` method")
168 _ModuleBuilderRoot
.__init
__(self
, self
, depth
=0)
169 self
.submodules
= _ModuleBuilderSubmodules(self
)
170 self
.domains
= _ModuleBuilderDomainSet(self
)
172 self
._statements
= Statement
.cast([])
173 self
._ctrl
_context
= None
174 self
._ctrl
_stack
= []
176 self
._driving
= SignalDict()
177 self
._named
_submodules
= {}
178 self
._anon
_submodules
= []
182 def _check_context(self
, construct
, context
):
183 if self
._ctrl
_context
!= context
:
184 if self
._ctrl
_context
is None:
185 raise SyntaxError("{} is not permitted outside of {}"
186 .format(construct
, context
))
188 if self
._ctrl
_context
== "Switch":
189 secondary_context
= "Case"
190 if self
._ctrl
_context
== "FSM":
191 secondary_context
= "State"
192 raise SyntaxError("{} is not permitted directly inside of {}; it is permitted "
194 .format(construct
, self
._ctrl
_context
,
195 self
._ctrl
_context
, secondary_context
))
197 def _get_ctrl(self
, name
):
199 top_name
, top_data
= self
._ctrl
_stack
[-1]
203 def _flush_ctrl(self
):
204 while len(self
._ctrl
_stack
) > self
.domain
._depth
:
207 def _set_ctrl(self
, name
, data
):
209 self
._ctrl
_stack
.append((name
, data
))
212 def _check_signed_cond(self
, cond
):
213 cond
= Value
.cast(cond
)
214 width
, signed
= cond
.shape()
216 warnings
.warn("Signed values in If/Elif conditions usually result from inverting "
217 "Python booleans with ~, which leads to unexpected results. "
218 "Replace `~flag` with `not flag`. (If this is a false positive, "
219 "silence this warning with `m.If(x)` → `m.If(x.bool())`.)",
220 SyntaxWarning, stacklevel
=4)
223 @_guardedcontextmanager("If")
225 self
._check
_context
("If", context
=None)
226 cond
= self
._check
_signed
_cond
(cond
)
227 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
228 if_data
= self
._set
_ctrl
("If", {
235 _outer_case
, self
._statements
= self
._statements
, []
236 self
.domain
._depth
+= 1
239 if_data
["tests"].append(cond
)
240 if_data
["bodies"].append(self
._statements
)
241 if_data
["src_locs"].append(src_loc
)
243 self
.domain
._depth
-= 1
244 self
._statements
= _outer_case
246 @_guardedcontextmanager("Elif")
247 def Elif(self
, cond
):
248 self
._check
_context
("Elif", context
=None)
249 cond
= self
._check
_signed
_cond
(cond
)
250 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
251 if_data
= self
._get
_ctrl
("If")
253 raise SyntaxError("Elif without preceding If")
255 _outer_case
, self
._statements
= self
._statements
, []
256 self
.domain
._depth
+= 1
259 if_data
["tests"].append(cond
)
260 if_data
["bodies"].append(self
._statements
)
261 if_data
["src_locs"].append(src_loc
)
263 self
.domain
._depth
-= 1
264 self
._statements
= _outer_case
266 @_guardedcontextmanager("Else")
268 self
._check
_context
("Else", context
=None)
269 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
270 if_data
= self
._get
_ctrl
("If")
272 raise SyntaxError("Else without preceding If/Elif")
274 _outer_case
, self
._statements
= self
._statements
, []
275 self
.domain
._depth
+= 1
278 if_data
["bodies"].append(self
._statements
)
279 if_data
["src_locs"].append(src_loc
)
281 self
.domain
._depth
-= 1
282 self
._statements
= _outer_case
286 def Switch(self
, test
):
287 self
._check
_context
("Switch", context
=None)
288 switch_data
= self
._set
_ctrl
("Switch", {
289 "test": Value
.cast(test
),
290 "cases": OrderedDict(),
291 "src_loc": tracer
.get_src_loc(src_loc_at
=1),
295 self
._ctrl
_context
= "Switch"
296 self
.domain
._depth
+= 1
299 self
.domain
._depth
-= 1
300 self
._ctrl
_context
= None
304 def Case(self
, *patterns
):
305 self
._check
_context
("Case", context
="Switch")
306 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
307 switch_data
= self
._get
_ctrl
("Switch")
309 for pattern
in patterns
:
310 if not isinstance(pattern
, (int, str, Enum
)):
311 raise SyntaxError("Case pattern must be an integer, a string, or an enumeration, "
314 if isinstance(pattern
, str) and any(bit
not in "01- \t" for bit
in pattern
):
315 raise SyntaxError("Case pattern '{}' must consist of 0, 1, and - (don't care) "
316 "bits, and may include whitespace"
318 if (isinstance(pattern
, str) and
319 len("".join(pattern
.split())) != len(switch_data
["test"])):
320 raise SyntaxError("Case pattern '{}' must have the same width as switch value "
322 .format(pattern
, len(switch_data
["test"])))
323 if isinstance(pattern
, int) and bits_for(pattern
) > len(switch_data
["test"]):
324 warnings
.warn("Case pattern '{:b}' is wider than switch value "
325 "(which has width {}); comparison will never be true"
326 .format(pattern
, len(switch_data
["test"])),
327 SyntaxWarning, stacklevel
=3)
329 if isinstance(pattern
, Enum
) and bits_for(pattern
.value
) > len(switch_data
["test"]):
330 warnings
.warn("Case pattern '{:b}' ({}.{}) is wider than switch value "
331 "(which has width {}); comparison will never be true"
332 .format(pattern
.value
, pattern
.__class
__.__name
__, pattern
.name
,
333 len(switch_data
["test"])),
334 SyntaxWarning, stacklevel
=3)
336 new_patterns
= (*new_patterns
, pattern
)
338 _outer_case
, self
._statements
= self
._statements
, []
339 self
._ctrl
_context
= None
342 # If none of the provided cases can possibly be true, omit this branch completely.
343 # This needs to be differentiated from no cases being provided in the first place,
344 # which means the branch will always match.
345 if not (patterns
and not new_patterns
):
346 switch_data
["cases"][new_patterns
] = self
._statements
347 switch_data
["case_src_locs"][new_patterns
] = src_loc
349 self
._ctrl
_context
= "Switch"
350 self
._statements
= _outer_case
356 def FSM(self
, reset
=None, domain
="sync", name
="fsm"):
357 self
._check
_context
("FSM", context
=None)
359 raise ValueError("FSM may not be driven by the '{}' domain".format(domain
))
360 fsm_data
= self
._set
_ctrl
("FSM", {
362 "signal": Signal(name
="{}_state".format(name
), src_loc_at
=2),
365 "encoding": OrderedDict(),
366 "decoding": OrderedDict(),
367 "states": OrderedDict(),
368 "src_loc": tracer
.get_src_loc(src_loc_at
=1),
369 "state_src_locs": {},
371 self
._generated
[name
] = fsm
= \
372 FSM(fsm_data
["signal"], fsm_data
["encoding"], fsm_data
["decoding"])
374 self
._ctrl
_context
= "FSM"
375 self
.domain
._depth
+= 1
377 for state_name
in fsm_data
["encoding"]:
378 if state_name
not in fsm_data
["states"]:
379 raise NameError("FSM state '{}' is referenced but not defined"
382 self
.domain
._depth
-= 1
383 self
._ctrl
_context
= None
387 def State(self
, name
):
388 self
._check
_context
("FSM State", context
="FSM")
389 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
390 fsm_data
= self
._get
_ctrl
("FSM")
391 if name
in fsm_data
["states"]:
392 raise NameError("FSM state '{}' is already defined".format(name
))
393 if name
not in fsm_data
["encoding"]:
394 fsm_data
["encoding"][name
] = len(fsm_data
["encoding"])
396 _outer_case
, self
._statements
= self
._statements
, []
397 self
._ctrl
_context
= None
400 fsm_data
["states"][name
] = self
._statements
401 fsm_data
["state_src_locs"][name
] = src_loc
403 self
._ctrl
_context
= "FSM"
404 self
._statements
= _outer_case
408 raise SyntaxError("Only assignment to `m.next` is permitted")
411 def next(self
, name
):
412 if self
._ctrl
_context
!= "FSM":
413 for level
, (ctrl_name
, ctrl_data
) in enumerate(reversed(self
._ctrl
_stack
)):
414 if ctrl_name
== "FSM":
415 if name
not in ctrl_data
["encoding"]:
416 ctrl_data
["encoding"][name
] = len(ctrl_data
["encoding"])
418 assigns
=[ctrl_data
["signal"].eq(ctrl_data
["encoding"][name
])],
419 domain
=ctrl_data
["domain"],
420 depth
=len(self
._ctrl
_stack
))
423 raise SyntaxError("`m.next = <...>` is only permitted inside an FSM state")
426 name
, data
= self
._ctrl
_stack
.pop()
427 src_loc
= data
["src_loc"]
430 if_tests
, if_bodies
= data
["tests"], data
["bodies"]
431 if_src_locs
= data
["src_locs"]
433 tests
, cases
= [], OrderedDict()
434 for if_test
, if_case
in zip(if_tests
+ [None], if_bodies
):
435 if if_test
is not None:
436 if_test
= Value
.cast(if_test
)
437 if len(if_test
) != 1:
438 if_test
= if_test
.bool()
439 tests
.append(if_test
)
441 if if_test
is not None:
442 match
= ("1" + "-" * (len(tests
) - 1)).rjust(len(if_tests
), "-")
445 cases
[match
] = if_case
447 self
._statements
.append(Switch(Cat(tests
), cases
,
448 src_loc
=src_loc
, case_src_locs
=dict(zip(cases
, if_src_locs
))))
451 switch_test
, switch_cases
= data
["test"], data
["cases"]
452 switch_case_src_locs
= data
["case_src_locs"]
454 self
._statements
.append(Switch(switch_test
, switch_cases
,
455 src_loc
=src_loc
, case_src_locs
=switch_case_src_locs
))
458 fsm_signal
, fsm_reset
, fsm_encoding
, fsm_decoding
, fsm_states
= \
459 data
["signal"], data
["reset"], data
["encoding"], data
["decoding"], data
["states"]
460 fsm_state_src_locs
= data
["state_src_locs"]
463 fsm_signal
.width
= bits_for(len(fsm_encoding
) - 1)
464 if fsm_reset
is None:
465 fsm_signal
.reset
= fsm_encoding
[next(iter(fsm_states
))]
467 fsm_signal
.reset
= fsm_encoding
[fsm_reset
]
468 # The FSM is encoded such that the state with encoding 0 is always the reset state.
469 fsm_decoding
.update((n
, s
) for s
, n
in fsm_encoding
.items())
470 fsm_signal
.decoder
= lambda n
: "{}/{}".format(fsm_decoding
[n
], n
)
471 self
._statements
.append(Switch(fsm_signal
,
472 OrderedDict((fsm_encoding
[name
], stmts
) for name
, stmts
in fsm_states
.items()),
473 src_loc
=src_loc
, case_src_locs
={fsm_encoding
[name
]: fsm_state_src_locs
[name
]
474 for name
in fsm_states
}))
476 def _add_statement(self
, assigns
, domain
, depth
, compat_mode
=False):
477 def domain_name(domain
):
483 while len(self
._ctrl
_stack
) > self
.domain
._depth
:
486 for stmt
in Statement
.cast(assigns
):
487 if not compat_mode
and not isinstance(stmt
, (Assign
, Assert
, Assume
, Cover
)):
489 "Only assignments and property checks may be appended to d.{}"
490 .format(domain_name(domain
)))
492 stmt
._MustUse
__used
= True
493 stmt
= SampleDomainInjector(domain
)(stmt
)
495 for signal
in stmt
._lhs
_signals
():
496 if signal
not in self
._driving
:
497 self
._driving
[signal
] = domain
498 elif self
._driving
[signal
] != domain
:
499 cd_curr
= self
._driving
[signal
]
501 "Driver-driver conflict: trying to drive {!r} from d.{}, but it is "
502 "already driven from d.{}"
503 .format(signal
, domain_name(domain
), domain_name(cd_curr
)))
505 self
._statements
.append(stmt
)
507 def _add_submodule(self
, submodule
, name
=None):
508 if not hasattr(submodule
, "elaborate"):
509 raise TypeError("Trying to add {!r}, which does not implement .elaborate(), as "
510 "a submodule".format(submodule
))
512 self
._anon
_submodules
.append(submodule
)
514 if name
in self
._named
_submodules
:
515 raise NameError("Submodule named '{}' already exists".format(name
))
516 self
._named
_submodules
[name
] = submodule
518 def _get_submodule(self
, name
):
519 if name
in self
._named
_submodules
:
520 return self
._named
_submodules
[name
]
522 raise AttributeError("No submodule named '{}' exists".format(name
))
524 def _add_domain(self
, cd
):
525 if cd
.name
in self
._domains
:
526 raise NameError("Clock domain named '{}' already exists".format(cd
.name
))
527 self
._domains
[cd
.name
] = cd
530 while self
._ctrl
_stack
:
533 def elaborate(self
, platform
):
536 fragment
= Fragment()
537 for name
in self
._named
_submodules
:
538 fragment
.add_subfragment(Fragment
.get(self
._named
_submodules
[name
], platform
), name
)
539 for submodule
in self
._anon
_submodules
:
540 fragment
.add_subfragment(Fragment
.get(submodule
, platform
), None)
541 statements
= SampleDomainInjector("sync")(self
._statements
)
542 fragment
.add_statements(statements
)
543 for signal
, domain
in self
._driving
.items():
544 fragment
.add_driver(signal
, domain
)
545 fragment
.add_domains(self
._domains
.values())
546 fragment
.generated
.update(self
._generated
)