53df4aae54d7990162487d6310d334e49345f70f
[lambdasoc.git] / lambdasoc / periph / event.py
1 from nmigen import *
2 from nmigen import tracer
3
4 from nmigen_soc import csr
5
6
7 __all__ = ["EventSource", "IRQLine", "InterruptSource"]
8
9
10 class EventSource:
11 """Event source.
12
13 Parameters
14 ----------
15 mode : ``"level"``, ``"rise"``, ``"fall"``
16 Trigger mode. If ``"level"``, a notification is raised when the ``stb`` signal is high.
17 If ``"rise"`` (or ``"fall"``) a notification is raised on a rising (or falling) edge
18 of ``stb``.
19 name : str
20 Name of the event. If ``None`` (default) the name is inferred from the variable
21 name this event source is assigned to.
22
23 Attributes
24 ----------
25 name : str
26 Name of the event
27 mode : ``"level"``, ``"rise"``, ``"fall"``
28 Trigger mode.
29 stb : Signal, in
30 Event strobe.
31 """
32 def __init__(self, *, mode="level", name=None, src_loc_at=0):
33 if name is not None and not isinstance(name, str):
34 raise TypeError("Name must be a string, not {!r}".format(name))
35
36 choices = ("level", "rise", "fall")
37 if mode not in choices:
38 raise ValueError("Invalid trigger mode {!r}; must be one of {}"
39 .format(mode, ", ".join(choices)))
40
41 self.name = name or tracer.get_var_name(depth=2 + src_loc_at)
42 self.mode = mode
43 self.stb = Signal(name="{}_stb".format(self.name))
44
45
46 class IRQLine(Signal):
47 """Interrupt request line."""
48 def __init__(self, *, name=None, src_loc_at=0):
49 super().__init__(name=name, src_loc_at=1 + src_loc_at)
50
51 __hash__ = object.__hash__
52
53
54 class InterruptSource(Elaboratable):
55 """Interrupt source.
56
57 A mean of gathering multiple event sources into a single interrupt request line.
58
59 Parameters
60 ----------
61 events : iter(:class:`EventSource`)
62 Event sources.
63 name : str
64 Name of the interrupt source. If ``None`` (default) the name is inferred from the
65 variable name this interrupt source is assigned to.
66
67 Attributes
68 ----------
69 name : str
70 Name of the interrupt source.
71 status : :class:`csr.Element`, read-only
72 Event status register. Each bit displays the level of the strobe of an event source.
73 Events are ordered by position in the `events` parameter.
74 pending : :class:`csr.Element`, read/write
75 Event pending register. If a bit is 1, the associated event source has a pending
76 notification. Writing 1 to a bit clears it.
77 Events are ordered by position in the `events` parameter.
78 enable : :class:`csr.Element`, read/write
79 Event enable register. Writing 1 to a bit enables its associated event source.
80 Writing 0 disables it.
81 Events are ordered by position in the `events` parameter.
82 irq : :class:`IRQLine`, out
83 Interrupt request. It is raised if any event source is enabled and has a pending
84 notification.
85 """
86 def __init__(self, events, *, name=None, src_loc_at=0):
87 if name is not None and not isinstance(name, str):
88 raise TypeError("Name must be a string, not {!r}".format(name))
89 self.name = name or tracer.get_var_name(depth=2 + src_loc_at)
90
91 for event in events:
92 if not isinstance(event, EventSource):
93 raise TypeError("Event source must be an instance of EventSource, not {!r}"
94 .format(event))
95 self._events = list(events)
96
97 width = len(events)
98 self.status = csr.Element(width, "r", name="{}_status".format(self.name))
99 self.pending = csr.Element(width, "rw", name="{}_pending".format(self.name))
100 self.enable = csr.Element(width, "rw", name="{}_enable".format(self.name))
101
102 self.irq = IRQLine(name="{}_irq".format(self.name))
103
104 def elaborate(self, platform):
105 m = Module()
106
107 with m.If(self.pending.w_stb):
108 m.d.sync += self.pending.r_data.eq(self.pending.r_data & ~self.pending.w_data)
109
110 with m.If(self.enable.w_stb):
111 m.d.sync += self.enable.r_data.eq(self.enable.w_data)
112
113 for i, event in enumerate(self._events):
114 m.d.sync += self.status.r_data[i].eq(event.stb)
115
116 if event.mode in ("rise", "fall"):
117 event_stb_r = Signal.like(event.stb, name_suffix="_r")
118 m.d.sync += event_stb_r.eq(event.stb)
119
120 event_trigger = Signal(name="{}_trigger".format(event.name))
121 if event.mode == "level":
122 m.d.comb += event_trigger.eq(event.stb)
123 elif event.mode == "rise":
124 m.d.comb += event_trigger.eq(~event_stb_r & event.stb)
125 elif event.mode == "fall":
126 m.d.comb += event_trigger.eq(event_stb_r & ~event.stb)
127 else:
128 assert False # :nocov:
129
130 with m.If(event_trigger):
131 m.d.sync += self.pending.r_data[i].eq(1)
132
133 m.d.comb += self.irq.eq((self.pending.r_data & self.enable.r_data).any())
134
135 return m