1 from contextlib
import contextmanager
4 from vcd
import VCDWriter
5 from vcd
.gtkw
import GTKWSave
7 from .._utils
import deprecated
9 from ..hdl
.ast
import SignalDict
12 from ._pyrtl
import _FragmentCompiler
13 from ._pycoro
import PyCoroProcess
14 from ._pyclock
import PyClockProcess
17 __all__
= ["Settle", "Delay", "Tick", "Passive", "Active", "Simulator"]
22 self
.names
= SignalDict()
24 def __call__(self
, fragment
, *, hierarchy
=("top",)):
25 def add_signal_name(signal
):
26 hierarchical_signal_name
= (*hierarchy
, signal
.name
)
27 if signal
not in self
.names
:
28 self
.names
[signal
] = {hierarchical_signal_name}
30 self
.names
[signal
].add(hierarchical_signal_name
)
32 for domain_name
, domain_signals
in fragment
.drivers
.items():
33 if domain_name
is not None:
34 domain
= fragment
.domains
[domain_name
]
35 add_signal_name(domain
.clk
)
36 if domain
.rst
is not None:
37 add_signal_name(domain
.rst
)
39 for statement
in fragment
.statements
:
40 for signal
in statement
._lhs
_signals
() | statement
._rhs
_signals
():
41 if not isinstance(signal
, (ClockSignal
, ResetSignal
)):
42 add_signal_name(signal
)
44 for subfragment_index
, (subfragment
, subfragment_name
) in enumerate(fragment
.subfragments
):
45 if subfragment_name
is None:
46 subfragment_name
= "U${}".format(subfragment_index
)
47 self(subfragment
, hierarchy
=(*hierarchy
, subfragment_name
))
52 class _WaveformWriter
:
53 def update(self
, timestamp
, signal
, value
):
54 raise NotImplementedError # :nocov:
56 def close(self
, timestamp
):
57 raise NotImplementedError # :nocov:
60 class _VCDWaveformWriter(_WaveformWriter
):
62 def timestamp_to_vcd(timestamp
):
63 return timestamp
* (10 ** 10) # 1/(100 ps)
66 def decode_to_vcd(signal
, value
):
67 return signal
.decoder(value
).expandtabs().replace(" ", "_")
69 def __init__(self
, fragment
, *, vcd_file
, gtkw_file
=None, traces
=()):
70 if isinstance(vcd_file
, str):
71 vcd_file
= open(vcd_file
, "wt")
72 if isinstance(gtkw_file
, str):
73 gtkw_file
= open(gtkw_file
, "wt")
75 self
.vcd_vars
= SignalDict()
76 self
.vcd_file
= vcd_file
77 self
.vcd_writer
= vcd_file
and VCDWriter(self
.vcd_file
,
78 timescale
="100 ps", comment
="Generated by nMigen")
80 self
.gtkw_names
= SignalDict()
81 self
.gtkw_file
= gtkw_file
82 self
.gtkw_save
= gtkw_file
and GTKWSave(self
.gtkw_file
)
86 signal_names
= _NameExtractor()(fragment
)
88 trace_names
= SignalDict()
90 if trace
not in signal_names
:
91 trace_names
[trace
] = {("top", trace
.name
)}
92 self
.traces
.append(trace
)
94 if self
.vcd_writer
is None:
97 for signal
, names
in itertools
.chain(signal_names
.items(), trace_names
.items()):
101 var_init
= self
.decode_to_vcd(signal
, signal
.reset
)
104 var_size
= signal
.width
105 var_init
= signal
.reset
107 for (*var_scope
, var_name
) in names
:
112 var_name_suffix
= var_name
114 var_name_suffix
= "{}${}".format(var_name
, suffix
)
115 if signal
not in self
.vcd_vars
:
116 vcd_var
= self
.vcd_writer
.register_var(
117 scope
=var_scope
, name
=var_name_suffix
,
118 var_type
=var_type
, size
=var_size
, init
=var_init
)
119 self
.vcd_vars
[signal
] = vcd_var
121 self
.vcd_writer
.register_alias(
122 scope
=var_scope
, name
=var_name_suffix
,
123 var
=self
.vcd_vars
[signal
])
126 suffix
= (suffix
or 0) + 1
128 if signal
not in self
.gtkw_names
:
129 self
.gtkw_names
[signal
] = (*var_scope
, var_name_suffix
)
131 def update(self
, timestamp
, signal
, value
):
132 vcd_var
= self
.vcd_vars
.get(signal
)
136 vcd_timestamp
= self
.timestamp_to_vcd(timestamp
)
138 var_value
= self
.decode_to_vcd(signal
, value
)
141 self
.vcd_writer
.change(vcd_var
, vcd_timestamp
, var_value
)
143 def close(self
, timestamp
):
144 if self
.vcd_writer
is not None:
145 self
.vcd_writer
.close(self
.timestamp_to_vcd(timestamp
))
147 if self
.gtkw_save
is not None:
148 self
.gtkw_save
.dumpfile(self
.vcd_file
.name
)
149 self
.gtkw_save
.dumpfile_size(self
.vcd_file
.tell())
151 self
.gtkw_save
.treeopen("top")
152 for signal
in self
.traces
:
153 if len(signal
) > 1 and not signal
.decoder
:
154 suffix
= "[{}:0]".format(len(signal
) - 1)
157 self
.gtkw_save
.trace(".".join(self
.gtkw_names
[signal
]) + suffix
)
159 if self
.vcd_file
is not None:
160 self
.vcd_file
.close()
161 if self
.gtkw_file
is not None:
162 self
.gtkw_file
.close()
166 __slots__
= ("signal", "curr", "next", "waiters", "pending")
168 def __init__(self
, signal
, pending
):
170 self
.pending
= pending
171 self
.waiters
= dict()
172 self
.curr
= self
.next
= signal
.reset
174 def set(self
, value
):
175 if self
.next
== value
:
178 self
.pending
.add(self
)
181 if self
.curr
== self
.next
:
183 self
.curr
= self
.next
186 for process
, trigger
in self
.waiters
.items():
187 if trigger
is None or trigger
== self
.curr
:
188 process
.runnable
= awoken_any
= True
192 class _SimulatorState
:
194 self
.timeline
= Timeline()
195 self
.signals
= SignalDict()
200 self
.timeline
.reset()
201 for signal
, index
in self
.signals
.items():
202 self
.slots
[index
].curr
= self
.slots
[index
].next
= signal
.reset
205 def get_signal(self
, signal
):
207 return self
.signals
[signal
]
209 index
= len(self
.slots
)
210 self
.slots
.append(_SignalState(signal
, self
.pending
))
211 self
.signals
[signal
] = index
214 def add_trigger(self
, process
, signal
, *, trigger
=None):
215 index
= self
.get_signal(signal
)
216 assert (process
not in self
.slots
[index
].waiters
or
217 self
.slots
[index
].waiters
[process
] == trigger
)
218 self
.slots
[index
].waiters
[process
] = trigger
220 def remove_trigger(self
, process
, signal
):
221 index
= self
.get_signal(signal
)
222 assert process
in self
.slots
[index
].waiters
223 del self
.slots
[index
].waiters
[process
]
227 for signal_state
in self
.pending
:
228 if signal_state
.commit():
235 def __init__(self
, fragment
):
236 self
._state
= _SimulatorState()
237 self
._fragment
= Fragment
.get(fragment
, platform
=None).prepare()
238 self
._processes
= _FragmentCompiler(self
._state
)(self
._fragment
)
239 self
._clocked
= set()
240 self
._waveform
_writers
= []
242 def _check_process(self
, process
):
243 if not (inspect
.isgeneratorfunction(process
) or inspect
.iscoroutinefunction(process
)):
244 raise TypeError("Cannot add a process {!r} because it is not a generator function"
248 def _add_coroutine_process(self
, process
, *, default_cmd
):
249 self
._processes
.add(PyCoroProcess(self
._state
, self
._fragment
.domains
, process
,
250 default_cmd
=default_cmd
))
252 def add_process(self
, process
):
253 process
= self
._check
_process
(process
)
255 # Only start a bench process after comb settling, so that the reset values are correct.
258 self
._add
_coroutine
_process
(wrapper
, default_cmd
=None)
260 def add_sync_process(self
, process
, *, domain
="sync"):
261 process
= self
._check
_process
(process
)
263 # Only start a sync process after the first clock edge (or reset edge, if the domain
264 # uses an asynchronous reset). This matches the behavior of synchronous FFs.
267 return self
._add
_coroutine
_process
(wrapper
, default_cmd
=Tick(domain
))
269 def add_clock(self
, period
, *, phase
=None, domain
="sync", if_exists
=False):
270 """Add a clock process.
272 Adds a process that drives the clock signal of ``domain`` at a 50% duty cycle.
277 Clock period. The process will toggle the ``domain`` clock signal every ``period / 2``
279 phase : None or float
280 Clock phase. The process will wait ``phase`` seconds before the first clock transition.
281 If not specified, defaults to ``period / 2``.
282 domain : str or ClockDomain
283 Driven clock domain. If specified as a string, the domain with that name is looked up
284 in the root fragment of the simulation.
286 If ``False`` (the default), raise an error if the driven domain is specified as
287 a string and the root fragment does not have such a domain. If ``True``, do nothing
290 if isinstance(domain
, ClockDomain
):
292 elif domain
in self
._fragment
.domains
:
293 domain
= self
._fragment
.domains
[domain
]
297 raise ValueError("Domain {!r} is not present in simulation"
299 if domain
in self
._clocked
:
300 raise ValueError("Domain {!r} already has a clock driving it"
301 .format(domain
.name
))
304 # By default, delay the first edge by half period. This causes any synchronous activity
305 # to happen at a non-zero time, distinguishing it from the reset values in the waveform
308 self
._processes
.add(PyClockProcess(self
._state
, domain
.clk
, phase
=phase
, period
=period
))
309 self
._clocked
.add(domain
)
312 """Reset the simulation.
314 Assign the reset value to every signal in the simulation, and restart every user process.
317 for process
in self
._processes
:
320 def _real_step(self
):
321 """Step the simulation.
323 Run every process and commit changes until a fixed point is reached. If there is
324 an unstable combinatorial loop, this function will never return.
326 # Performs the two phases of a delta cycle in a loop:
329 # 1. eval: run and suspend every non-waiting process once, queueing signal changes
330 for process
in self
._processes
:
332 process
.runnable
= False
335 for waveform_writer
in self
._waveform
_writers
:
336 for signal_state
in self
._state
.pending
:
337 waveform_writer
.update(self
._state
.timeline
.now
,
338 signal_state
.signal
, signal_state
.next
)
340 # 2. commit: apply every queued signal change, waking up any waiting processes
341 converged
= self
._state
.commit()
343 # TODO(nmigen-0.4): replace with _real_step
344 @deprecated("instead of `sim.step()`, use `sim.advance()`")
346 return self
.advance()
349 """Advance the simulation.
351 Run every process and commit changes until a fixed point is reached, then advance time
352 to the closest deadline (if any). If there is an unstable combinatorial loop,
353 this function will never return.
355 Returns ``True`` if there are any active processes, ``False`` otherwise.
358 self
._state
.timeline
.advance()
359 return any(not process
.passive
for process
in self
._processes
)
362 """Run the simulation while any processes are active.
364 Processes added with :meth:`add_process` and :meth:`add_sync_process` are initially active,
365 and may change their status using the ``yield Passive()`` and ``yield Active()`` commands.
366 Processes compiled from HDL and added with :meth:`add_clock` are always passive.
368 while self
.advance():
371 def run_until(self
, deadline
, *, run_passive
=False):
372 """Run the simulation until it advances to ``deadline``.
374 If ``run_passive`` is ``False``, the simulation also stops when there are no active
375 processes, similar to :meth:`run`. Otherwise, the simulation will stop only after it
376 advances to or past ``deadline``.
378 If the simulation stops advancing, this function will never return.
380 assert self
._state
.timeline
.now
<= deadline
381 while (self
.advance() or run_passive
) and self
._state
.timeline
.now
< deadline
:
385 def write_vcd(self
, vcd_file
, gtkw_file
=None, *, traces
=()):
386 """Write waveforms to a Value Change Dump file, optionally populating a GTKWave save file.
388 This method returns a context manager. It can be used as: ::
390 sim = Simulator(frag)
392 with sim.write_vcd("dump.vcd", "dump.gtkw"):
397 vcd_file : str or file-like object
398 Verilog Value Change Dump file or filename.
399 gtkw_file : str or file-like object
400 GTKWave save file or filename.
401 traces : iterable of Signal
402 Signals to display traces for.
404 if self
._state
.timeline
.now
!= 0.0:
405 for file in (vcd_file
, gtkw_file
):
406 if hasattr(file, "close"):
408 raise ValueError("Cannot start writing waveforms after advancing simulation time")
410 waveform_writer
= _VCDWaveformWriter(self
._fragment
,
411 vcd_file
=vcd_file
, gtkw_file
=gtkw_file
, traces
=traces
)
413 self
._waveform
_writers
.append(waveform_writer
)
416 waveform_writer
.close(self
._state
.timeline
.now
)
417 self
._waveform
_writers
.remove(waveform_writer
)