fix markdown header writing tests
[utils.git] / src / budget_sync / write_budget_markdown.py
1 from collections import defaultdict
2 from pathlib import Path
3 from typing import Dict, List, Any, Optional
4 from io import StringIO
5 import enum
6 from budget_sync.budget_graph import BudgetGraph, Node, Payment, PayeeState, PaymentSummary
7 from budget_sync.config import Person, Milestone
8 from budget_sync.money import Money
9 from budget_sync.ordered_set import OrderedSet
10 from budget_sync.util import BugStatus
11
12
13 def _markdown_escape_char(char: str) -> str:
14 if char == "<":
15 return "&lt;"
16 if char == "&":
17 return "&amp;"
18 if char in "\\`*_{}[]()#+-.!":
19 return "\\" + char
20 return char
21
22
23 def markdown_escape(v: Any) -> str:
24 return "".join([_markdown_escape_char(char) for char in str(v)])
25
26
27 class DisplayStatus(enum.Enum):
28 Hidden = "Hidden"
29 NotYetStarted = "Not yet started"
30 InProgress = "Currently working on"
31 Completed = "Completed but not yet added to payees list"
32
33 @staticmethod
34 def from_status(status: BugStatus) -> "DisplayStatus":
35 return _DISPLAY_STATUS_MAP[status]
36
37
38 _DISPLAY_STATUS_MAP = {
39 BugStatus.UNCONFIRMED: DisplayStatus.Hidden,
40 BugStatus.CONFIRMED: DisplayStatus.NotYetStarted,
41 BugStatus.IN_PROGRESS: DisplayStatus.InProgress,
42 BugStatus.DEFERRED: DisplayStatus.Hidden,
43 BugStatus.RESOLVED: DisplayStatus.Completed,
44 BugStatus.VERIFIED: DisplayStatus.Completed,
45 BugStatus.PAYMENTPENDING: DisplayStatus.Completed,
46 }
47
48
49 class MarkdownWriter:
50 last_headers: List[str]
51
52 def __init__(self):
53 self.buffer = StringIO()
54 self.last_headers = []
55
56 def write_headers(self, headers: List[str]):
57 if headers == self.last_headers:
58 return
59 for i in range(len(headers)):
60 if not headers[i].startswith("\n" + "#" * (i + 1) + " "):
61 raise ValueError(
62 "invalid markdown header. if you're not trying to make a"
63 " markdown header, don't use write_headers!")
64 if i >= len(self.last_headers):
65 print(headers[i], file=self.buffer)
66 self.last_headers.append(headers[i])
67 elif headers[i] != self.last_headers[i]:
68 del self.last_headers[i:]
69 print(headers[i], file=self.buffer)
70 self.last_headers.append(headers[i])
71 if len(self.last_headers) > len(headers):
72 raise ValueError("tried to go from deeper header scope stack to "
73 "ancestor scope without starting a new header, "
74 "which is not supported by markdown",
75 self.last_headers, headers)
76 assert headers == self.last_headers
77
78 def write_node_header(self,
79 headers: List[str],
80 node: Optional[Node]):
81 self.write_headers(headers)
82 if node is None:
83 print("* None", file=self.buffer)
84 return
85 summary = markdown_escape(node.bug.summary)
86 print(f"* [Bug #{node.bug.id}]({node.bug_url}):\n {summary}",
87 file=self.buffer)
88
89 def write_node(self,
90 headers: List[str],
91 node: Node,
92 payment: Optional[Payment]):
93 self.write_node_header(headers, node)
94 if payment is not None:
95 if node.fixed_budget_excluding_subtasks \
96 != node.budget_excluding_subtasks:
97 total = (f"&euro;{node.fixed_budget_excluding_subtasks} ("
98 f"total is fixed from amount appearing in bug report,"
99 f" which is &euro;{node.budget_excluding_subtasks})")
100 else:
101 total = f"&euro;{node.fixed_budget_excluding_subtasks}"
102 if payment.submitted:
103 print(f" * submitted on {payment.submitted}",
104 file=self.buffer)
105 if payment.paid:
106 print(f" * paid on {payment.paid}",
107 file=self.buffer)
108 if payment.amount != node.fixed_budget_excluding_subtasks \
109 or payment.amount != node.budget_excluding_subtasks:
110 print(f" * &euro;{payment.amount} out of total of {total}",
111 file=self.buffer)
112 else:
113 print(f" * &euro;{payment.amount} which is the total amount",
114 file=self.buffer)
115 closest = node.closest_bug_in_mou
116 if closest is node:
117 print(f" * this task is a MoU Milestone",
118 file=self.buffer)
119 elif closest is not None:
120 print(f" * this task is part of MoU Milestone\n"
121 f" [Bug #{closest.bug.id}]({closest.bug_url})",
122 file=self.buffer)
123 elif payment is not None: # only report this if there's a payment
124 print(f" * neither this task nor any parent tasks are in "
125 f"the MoU",
126 file=self.buffer)
127
128
129 def _markdown_for_person(person: Person,
130 payments_dict: Dict[Milestone, List[Payment]],
131 assigned_nodes: List[Node],
132 nodes_subset: Optional[OrderedSet[Node]] = None,
133 ) -> str:
134 def node_included(node: Node) -> bool:
135 return nodes_subset is None or node in nodes_subset
136 writer = MarkdownWriter()
137 print(f"<!-- autogenerated by budget-sync -->", file=writer.buffer)
138 writer.write_headers([f"\n# {person.full_name} ({person.identifier})\n"])
139 print(file=writer.buffer)
140 status_tracking_header = "\n# Status Tracking\n"
141 writer.write_headers([status_tracking_header])
142 displayed_nodes_dict: Dict[DisplayStatus, List[Node]]
143 displayed_nodes_dict = {i: [] for i in DisplayStatus}
144 for node in assigned_nodes:
145 display_status = DisplayStatus.from_status(node.status)
146 displayed_nodes_dict[display_status].append(node)
147
148 def write_display_status_chunk(display_status: DisplayStatus):
149 display_status_header = f"\n## {display_status.value}\n"
150 for node in displayed_nodes_dict[display_status]:
151 if not node_included(node):
152 continue
153 if display_status == DisplayStatus.Completed:
154 payment_found = False
155 for payment in node.payments.values():
156 if payment.payee == person:
157 payment_found = True
158 break
159 if payment_found:
160 continue
161 if len(node.payments) == 0 \
162 and node.budget_excluding_subtasks == 0 \
163 and node.budget_including_subtasks == 0:
164 continue
165 writer.write_node(
166 headers=[status_tracking_header, display_status_header],
167 node=node, payment=None)
168
169 for display_status in DisplayStatus:
170 if display_status == DisplayStatus.Hidden \
171 or display_status == DisplayStatus.NotYetStarted:
172 continue
173 write_display_status_chunk(display_status)
174
175 for payee_state in PayeeState:
176 # work out headers per status
177 if payee_state == PayeeState.NotYetSubmitted:
178 display_status_header = "\n## Payment not yet submitted\n"
179 subtotals_msg = ("\nMoU Milestone subtotals for not "
180 "yet submitted payments\n")
181 elif payee_state == PayeeState.Submitted:
182 display_status_header = ("\n## Submitted to NLNet but "
183 "not yet paid\n")
184 subtotals_msg = ("\nMoU Milestone subtotals for "
185 "submitted but not yet paid payments\n")
186 else:
187 assert payee_state == PayeeState.Paid
188 display_status_header = "\n## Paid by NLNet\n"
189 subtotals_msg = ("\nMoU Milestone subtotals for paid "
190 "payments\n")
191 # list all the payments grouped by Grant
192 for milestone, payments_list in payments_dict.items():
193 milestone_header = f"\n### {milestone.identifier}\n"
194 mou_subtotals: Dict[Optional[Node], Money] = defaultdict(Money)
195 headers = [status_tracking_header,
196 display_status_header,
197 milestone_header]
198 # write out the payments and also compute the subtotals per
199 # mou milestone
200 for payment in payments_list:
201 node = payment.node
202 if payment.state == payee_state and node_included(node):
203 mou_subtotals[node.closest_bug_in_mou] += payment.amount
204 writer.write_node(headers=headers,
205 node=payment.node, payment=payment)
206 # now display the mou subtotals. really, this should be before
207 for node, subtotal in mou_subtotals.items():
208 writer.write_headers(headers)
209 print(subtotals_msg, file=writer.buffer)
210 writer.write_node_header(headers, node)
211 if node is None:
212 budget = ""
213 elif node.fixed_budget_including_subtasks \
214 != node.budget_including_subtasks:
215 budget = (" out of total including subtasks of "
216 f"&euro;{node.fixed_budget_including_subtasks}"
217 " (budget is fixed from amount appearing in "
218 "bug report, which is "
219 f"&euro;{node.budget_including_subtasks})")
220 else:
221 budget = (" out of total including subtasks of "
222 f"&euro;{node.fixed_budget_including_subtasks}")
223 print(f" * subtotal &euro;{subtotal}{budget}",
224 file=writer.buffer)
225
226 # write_display_status_chunk(DisplayStatus.NotYetStarted)
227
228 return writer.buffer.getvalue()
229
230
231 def write_budget_markdown(budget_graph: BudgetGraph,
232 output_dir: Path,
233 nodes_subset: Optional[OrderedSet[Node]] = None):
234 output_dir.mkdir(parents=True, exist_ok=True)
235 for person, payments_dict in budget_graph.payments.items():
236 markdown = _markdown_for_person(person,
237 payments_dict,
238 budget_graph.assigned_nodes[person],
239 nodes_subset)
240 output_file = output_dir.joinpath(person.output_markdown_file)
241 output_file.write_text(markdown, encoding="utf-8")
242
243
244 def markdown_for_person(budget_graph: BudgetGraph, person: Person,
245 nodes_subset: Optional[OrderedSet[Node]] = None,
246 ) -> str:
247 return _markdown_for_person(person, budget_graph.payments[person],
248 budget_graph.assigned_nodes[person],
249 nodes_subset)