Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/pr-build-merge.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ jobs:
run: cp env.example .env

- name: Run formatting checks
run: uv --project backend run ruff --config backend/pyproject.toml format --check --diff backend
run: uv --project backend run ruff format --check --diff backend

- name: Run linting
run: uv --project backend run ruff --config backend/pyproject.toml check backend
run: uv --project backend run ruff check backend

- name: Run security checks
run: uv --project backend run bandit -c backend/pyproject.toml -r backend -f json -o bandit.json
Expand Down
188 changes: 147 additions & 41 deletions backend/app/billing/notification_helper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import calendar
import json
import logging
import textwrap
from collections.abc import Awaitable, Callable
from datetime import date

from adaptive_cards import card_types as ct # type: ignore[import-untyped]
Expand All @@ -12,20 +14,32 @@
from app.notifications import (
ColumnHeader,
NotificationDetails,
build_card_payload,
send_exception,
send_info,
send_warning,
)

logger = logging.getLogger(__name__)
# MS Teams Incoming Webhook limit is 28 KB; we leave headroom for
# title/emoji/(Part i/N) prefixes and JSON whitespace not counted by our
# size estimator. See:
# https://learn.microsoft.com/microsoftteams/platform/webhooks-and-connectors/how-to/add-incoming-webhook
MSTEAMS_PAYLOAD_BUDGET = 22 * 1024
MAX_MESSAGE_CHARS = 2000

NOTIFICATION_FUNCTIONS = {
NOTIFICATION_TITLE_COLORS: dict[NotificationLevel, ct.Colors] = {
NotificationLevel.SUCCESS: ct.Colors.ACCENT,
NotificationLevel.IN_PROGRESS: ct.Colors.WARNING,
NotificationLevel.ERROR: ct.Colors.ATTENTION,
}

NOTIFICATION_FUNCTIONS: dict[NotificationLevel, Callable[..., Awaitable[None]]] = {
NotificationLevel.SUCCESS: send_info,
NotificationLevel.IN_PROGRESS: send_warning,
NotificationLevel.ERROR: send_exception,
}


NOTIFICATION_TEXTS: dict[NotificationLevel, tuple[str, str]] = {
NotificationLevel.SUCCESS: (
"{month_name} {year} Billing Finalized.",
Expand All @@ -42,6 +56,12 @@
),
}

RESULT_ICON: dict[str, str] = {
"JOURNAL_GENERATED": "✅",
"JOURNAL_SKIPPED": "⏭️",
"ERROR": "❌",
}


def _build_notification_title_text(
level: NotificationLevel, month_name: str, year: int
Expand All @@ -53,55 +73,139 @@ def _build_notification_title_text(
)


def _build_notification_details(details: list) -> NotificationDetails:
def _build_header() -> tuple[ColumnHeader, ...]:
return (
ColumnHeader(
"Authorization", width="120px", horizontal_alignment=ct.HorizontalAlignment.CENTER
),
ColumnHeader("Journal", width="120px", horizontal_alignment=ct.HorizontalAlignment.CENTER),
ColumnHeader("Status", width="50px", horizontal_alignment=ct.HorizontalAlignment.CENTER),
ColumnHeader("Message", width="stretch"),
)


def _build_rows(details: list[ProcessResultInfo]) -> list[tuple[str, ...]]:
return [
(
f"{item.authorization_id}",
f"{item.journal_id or ''}",
f"{RESULT_ICON.get(item.result.value.upper(), '')}",
"\n\n".join(textwrap.wrap((item.message or "")[:MAX_MESSAGE_CHARS], width=80)),
)
for item in details
]


def _chunk_rows_by_size(
rows: list[tuple[str, ...]],
measure: Callable[[list[tuple[str, ...]]], int],
budget: int = MSTEAMS_PAYLOAD_BUDGET,
) -> list[list[tuple[str, ...]]]:
"""
This function builds a NotificationDetails object depending on the
given notification level.
Partition `rows` so each chunk's measured size stays within `budget`.
`measure` returns the serialized size (bytes) of a card containing the
given subset of rows, including all per-card overhead. Raises
``ValueError`` if a single row alone exceeds the budget — callers must
cap the row's variable-size fields (the message column) at
``MAX_MESSAGE_CHARS`` in `_build_rows` so this invariant holds.
"""
prefix_icon = {
"JOURNAL_GENERATED": "✅",
"JOURNAL_SKIPPED": "⏭️",
"ERROR": "❌",
}

return NotificationDetails(
header=(
ColumnHeader(
"Authorization", width="120px", horizontal_alignment=ct.HorizontalAlignment.CENTER
),
ColumnHeader(
"Journal", width="120px", horizontal_alignment=ct.HorizontalAlignment.CENTER
),
ColumnHeader(
"Status", width="50px", horizontal_alignment=ct.HorizontalAlignment.CENTER
),
ColumnHeader("Message", width="stretch"),
),
rows=[
(
f"{item.authorization_id}",
f"{item.journal_id or ''}",
f"{prefix_icon.get(item.result.value.upper(), '')}",
"\n\n".join(textwrap.wrap(item.message or "", width=80)),

# json.dumps default separator between list items is ", " (2 bytes).
separator_bytes = 2

# Measure once: empty-card overhead.
empty_size = measure([])
# N measurements for marginal row sizes (each row in isolation).
row_sizes = [measure([row]) - empty_size for row in rows]

chunks: list[list[tuple[str, ...]]] = []
buffer: list[tuple[str, ...]] = []
buffer_size = empty_size

for row, row_size in zip(rows, row_sizes, strict=True):
added = row_size + (separator_bytes if buffer else 0)
if buffer_size + added <= budget:
buffer.append(row)
buffer_size += added
continue
if buffer:
chunks.append(buffer)
if empty_size + row_size > budget:
raise ValueError(
f"Row for authorization={row[0]} contributes {row_size} bytes; "
f"exceeds per-chunk budget {budget - empty_size}."
)
for item in details
],
buffer = [row]
buffer_size = empty_size + row_size
if buffer:
chunks.append(buffer)
return chunks


def _measure_card_payload_size(
title: str,
text: str,
header: tuple[ColumnHeader, ...],
rows: list[tuple[str, ...]],
title_color: ct.Colors = ct.Colors.DEFAULT,
open_url: str | None = None,
) -> int:
details = NotificationDetails(header=header, rows=rows)
card_payload = build_card_payload(
title=title,
text=text,
title_color=title_color,
details=details,
open_url=open_url,
)
return len(json.dumps(card_payload).encode("utf-8"))


async def _send_notification(
level: NotificationLevel, month_name: str, year: int, results_counter_details: list
level: NotificationLevel,
month_name: str,
year: int,
results_counter_details: list[ProcessResultInfo],
) -> None:
"""
This function sends a notification at the given level.
Sends one or more notifications at the given level. Rows are split
into multiple messages whenever the serialized payload would exceed
`MSTEAMS_PAYLOAD_BUDGET`. Each part carries a "(Part i/N)" suffix
in the title when N > 1.
"""

func = NOTIFICATION_FUNCTIONS[level]
title, text = _build_notification_title_text(level, month_name, year)
await func(
title=title,
text=text,
details=_build_notification_details(details=results_counter_details),
)
title, text = _build_notification_title_text(level=level, month_name=month_name, year=year)
header = _build_header()
rows = _build_rows(details=results_counter_details)
if not rows:
await func(
title=title,
text=text,
details=NotificationDetails(header=header, rows=[]),
)
return

title_color = NOTIFICATION_TITLE_COLORS[level]

def measure(chunk_rows: list[tuple[str, ...]]) -> int:
return _measure_card_payload_size(
title=title,
text=text,
header=header,
rows=chunk_rows,
title_color=title_color,
)

chunks = _chunk_rows_by_size(rows=rows, measure=measure)
total = len(chunks)
for idx, chunk_rows in enumerate(chunks, start=1):
chunk_title = f"{title} (Part {idx}/{total})" if total > 1 else title
await func(
title=chunk_title,
text=text,
details=NotificationDetails(header=header, rows=chunk_rows),
)


def check_results(
Expand All @@ -116,7 +220,9 @@ def check_results(
return ProcessResult.JOURNAL_GENERATED in results_type, ProcessResult.ERROR in results_type


async def send_notifications(results: list, year: int, month: int, cutoff_day: int = 5):
async def send_notifications(
results: list[ProcessResultInfo], year: int, month: int, cutoff_day: int = 5
) -> None:
"""
This function process the given results list and sends
notifications according to the number of journals successfully generated
Expand Down
17 changes: 8 additions & 9 deletions backend/app/billing/process_billing.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ async def process_billing(
if authorization_id:
authorization = await mpt_client.get_authorization(authorization_id)
processor = AuthorizationProcessor(year, month, authorization, dry_run)
await processor.process()
return
response = [await processor.process()]

else:
tasks = []
semaphore = asyncio.Semaphore(int(settings.ffc_billing_process_max_concurrency))
Expand All @@ -91,9 +91,7 @@ async def process_billing(

logger.info(f"Processing {len(tasks)} authorizations for {product_id}")
response = list(await asyncio.gather(*tasks))
await send_notifications(
results=response, year=year, month=month, cutoff_day=cutoff_day
)
await send_notifications(results=response, year=year, month=month, cutoff_day=cutoff_day)
finally:
await mpt_client.httpx_client.aclose()

Expand Down Expand Up @@ -236,11 +234,11 @@ async def evaluate_journal_status(self, journal_external_id) -> dict[str, Any] |
)
self.logger.error(error_msg)
raise JournalStatusError(error_msg, journal_id)
except ValueError:
except ValueError as error:
journal_status = journal["status"]
error_msg = f"Found the journal {journal_id} with status {journal_status}"
self.logger.error(error_msg)
raise JournalStatusError(error_msg, journal_id)
raise JournalStatusError(error_msg, journal_id) from error

async def process(self) -> ProcessResultInfo:
"""
Expand Down Expand Up @@ -314,7 +312,6 @@ async def process(self) -> ProcessResultInfo:
journal,
journal_external_id,
)
await self.maybe_call(self._safe_unlink, filepath)
result_info = ProcessResultInfo(
authorization_id=self.authorization_id,
result=ProcessResult.JOURNAL_GENERATED,
Expand All @@ -337,6 +334,8 @@ async def process(self) -> ProcessResultInfo:
result_info.message = str(error)
result_info.journal_id = error.journal_id
return result_info
finally:
await self.maybe_call(self._safe_unlink, filepath)

except HTTPStatusError as error:
status = error.response.status_code
Expand Down Expand Up @@ -985,4 +984,4 @@ def split_entitlement_days_into_ranges(entitlement_days: set[int]):
return ranges


VALID_JOURNAL_STATUSES = frozenset(JournalStatus)
VALID_JOURNAL_STATUSES = frozenset({JournalStatus.DRAFT, JournalStatus.VALIDATED})
Loading
Loading