sim._pycoro: make src_loc() more robust.
[nmigen.git] / nmigen / sim / _pycoro.py
1 import inspect
2
3 from ..hdl import *
4 from ..hdl.ast import Statement, SignalSet
5 from ._cmds import *
6 from ._core import Process
7 from ._pyrtl import _ValueCompiler, _RHSValueCompiler, _StatementCompiler
8
9
10 __all__ = ["PyCoroProcess"]
11
12
13 class PyCoroProcess(Process):
14 def __init__(self, state, domains, constructor, *, default_cmd=None):
15 self.state = state
16 self.domains = domains
17 self.constructor = constructor
18 self.default_cmd = default_cmd
19
20 self.reset()
21
22 def reset(self):
23 self.runnable = True
24 self.passive = False
25 self.coroutine = self.constructor()
26 self.exec_locals = {
27 "slots": self.state.slots,
28 "result": None,
29 **_ValueCompiler.helpers
30 }
31 self.waits_on = SignalSet()
32
33 def src_loc(self):
34 coroutine = self.coroutine
35 if coroutine is None:
36 return None
37 while coroutine.gi_yieldfrom is not None and inspect.isgenerator(coroutine.gi_yieldfrom):
38 coroutine = coroutine.gi_yieldfrom
39 if inspect.isgenerator(coroutine):
40 frame = coroutine.gi_frame
41 if inspect.iscoroutine(coroutine):
42 frame = coroutine.cr_frame
43 return "{}:{}".format(inspect.getfile(frame), inspect.getlineno(frame))
44
45 def add_trigger(self, signal, trigger=None):
46 self.state.add_trigger(self, signal, trigger=trigger)
47 self.waits_on.add(signal)
48
49 def clear_triggers(self):
50 for signal in self.waits_on:
51 self.state.remove_trigger(self, signal)
52 self.waits_on.clear()
53
54 def run(self):
55 if self.coroutine is None:
56 return
57
58 self.clear_triggers()
59
60 response = None
61 while True:
62 try:
63 command = self.coroutine.send(response)
64 if command is None:
65 command = self.default_cmd
66 response = None
67
68 if isinstance(command, Value):
69 exec(_RHSValueCompiler.compile(self.state, command, mode="curr"),
70 self.exec_locals)
71 response = Const.normalize(self.exec_locals["result"], command.shape())
72
73 elif isinstance(command, Statement):
74 exec(_StatementCompiler.compile(self.state, command),
75 self.exec_locals)
76
77 elif type(command) is Tick:
78 domain = command.domain
79 if isinstance(domain, ClockDomain):
80 pass
81 elif domain in self.domains:
82 domain = self.domains[domain]
83 else:
84 raise NameError("Received command {!r} that refers to a nonexistent "
85 "domain {!r} from process {!r}"
86 .format(command, command.domain, self.src_loc()))
87 self.add_trigger(domain.clk, trigger=1 if domain.clk_edge == "pos" else 0)
88 if domain.rst is not None and domain.async_reset:
89 self.add_trigger(domain.rst, trigger=1)
90 return
91
92 elif type(command) is Settle:
93 self.state.timeline.delay(None, self)
94 return
95
96 elif type(command) is Delay:
97 self.state.timeline.delay(command.interval, self)
98 return
99
100 elif type(command) is Passive:
101 self.passive = True
102
103 elif type(command) is Active:
104 self.passive = False
105
106 elif command is None: # only possible if self.default_cmd is None
107 raise TypeError("Received default command from process {!r} that was added "
108 "with add_process(); did you mean to add this process with "
109 "add_sync_process() instead?"
110 .format(self.src_loc()))
111
112 else:
113 raise TypeError("Received unsupported command {!r} from process {!r}"
114 .format(command, self.src_loc()))
115
116 except StopIteration:
117 self.passive = True
118 self.coroutine = None
119 return
120
121 except Exception as exn:
122 self.coroutine.throw(exn)