-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnormalize.py
More file actions
131 lines (107 loc) · 4.23 KB
/
normalize.py
File metadata and controls
131 lines (107 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from __future__ import annotations
import csv
import json
from pathlib import Path
from typing import Any, Dict, List
def load_airbyte_records(path: Path) -> Dict[str, List[Dict[str, Any]]]:
"""
Load Airbyte RECORD messages from a JSONL file and group them by stream.
"""
by_stream: Dict[str, List[Dict[str, Any]]] = {}
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
msg = json.loads(line)
if msg.get("type") != "RECORD":
continue
record = msg["record"]
stream = record["stream"]
data = record["data"]
by_stream.setdefault(stream, []).append(data)
return by_stream
def build_dim_accounts(accounts: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Normalize account schema drift across pages into a single dimension table.
- Unify status: prefer `account_status`, fall back to `status`.
- Unify billing/address: prefer `billing.country/city`, fall back to `address.billing_country/billing_city`.
"""
normalised: List[Dict[str, Any]] = []
for acc in accounts:
billing = acc.get("billing") or acc.get("address") or {}
country = billing.get("country") or billing.get("billing_country")
city = billing.get("city") or billing.get("billing_city")
status = acc.get("account_status") or acc.get("status")
normalised.append(
{
"account_id": acc.get("id"),
"updated_at": acc.get("updated_at"),
"created_at": acc.get("created_at"),
"name": acc.get("name"),
"status": status,
"industry": acc.get("industry"),
"billing_country": country,
"billing_city": city,
"owner_user_id": acc.get("owner_user_id"),
}
)
return normalised
def build_fact_invoices(invoices: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Build invoice fact table at the invoice grain.
We intentionally keep line item detail out of this first pass and focus on:
- invoice id
- account id
- issued_at / updated_at
- amount / currency
- status
"""
facts: List[Dict[str, Any]] = []
for inv in invoices:
facts.append(
{
"invoice_id": inv.get("id"),
"account_id": inv.get("account_id"),
"issued_at": inv.get("issued_at"),
"updated_at": inv.get("updated_at"),
"amount": inv.get("amount"),
"currency": inv.get("currency"),
"status": inv.get("status"),
}
)
return facts
def write_csv(path: Path, rows: List[Dict[str, Any]]) -> None:
if not rows:
return
fieldnames = list(rows[0].keys())
with path.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
def main() -> None:
"""
1. Read an Airbyte JSONL output file.
2. Build `dim_accounts` and `fact_invoices` in memory.
3. Export them to the `analytics/` folder as CSV.
"""
raw_path = Path("raw_output.jsonl")
if not raw_path.exists():
raise SystemExit(
f"Expected Airbyte output at {raw_path}. "
"Run `python main.py read --config config.json --catalog configured_catalog.json --state state.json > raw_output.jsonl --catalog catalog.json` first."
)
records_by_stream = load_airbyte_records(raw_path)
accounts = records_by_stream.get("accounts", [])
invoices = records_by_stream.get("invoices", [])
dim_accounts_rows = build_dim_accounts(accounts) if accounts else []
fact_invoices_rows = build_fact_invoices(invoices) if invoices else []
analytics_dir = Path("analytics")
analytics_dir.mkdir(exist_ok=True)
if dim_accounts_rows:
write_csv(analytics_dir / "dim_accounts.csv", dim_accounts_rows)
if fact_invoices_rows:
write_csv(analytics_dir / "fact_invoices.csv", fact_invoices_rows)
print("Normalisation complete. Outputs written to analytics/ directory.")
if __name__ == "__main__":
main()