-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathload_transactions.py
More file actions
96 lines (87 loc) · 2.73 KB
/
load_transactions.py
File metadata and controls
96 lines (87 loc) · 2.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import pandas as pd
import uuid
from datetime import datetime
from database import transactions_col, accounts_col, customer_memory_col
df = pd.read_csv("clean_transactions.csv", header=None)
df.columns = [
"timestamp",
"sender_customer_id",
"sender_account_id",
"receiver_customer_id",
"receiver_account_id",
"amount",
"currency",
"original_amount",
"original_currency",
"payment_method"
]
for _, row in df.iterrows():
txn_doc = {
"transaction_id": str(uuid.uuid4()),
"timestamp": datetime.strptime(row["timestamp"], "%Y/%m/%d %H:%M"),
"sender": {
"customer_id": row["sender_customer_id"],
"account_id": row["sender_account_id"]
},
"receiver": {
"customer_id": row["receiver_customer_id"],
"account_id": row["receiver_account_id"]
},
"amount": float(row["amount"]),
"currency": row["currency"],
"original_amount": float(row["original_amount"]),
"original_currency": row["original_currency"],
"payment_method": row["payment_method"],
"risk_flags": [],
"risk_score": 0,
"created_at": datetime.utcnow()
}
transactions_col.insert_one(txn_doc)
# Sender account
accounts_col.update_one(
{"account_id": row["sender_account_id"]},
{
"$setOnInsert": {
"account_id": row["sender_account_id"],
"customer_id": row["sender_customer_id"],
"account_status": "ACTIVE",
"risk_score": 0,
"flags": [],
"created_at": datetime.utcnow()
}
},
upsert=True
)
# Receiver account
accounts_col.update_one(
{"account_id": row["receiver_account_id"]},
{
"$setOnInsert": {
"account_id": row["receiver_account_id"],
"customer_id": row["receiver_customer_id"],
"account_status": "ACTIVE",
"risk_score": 0,
"flags": [],
"created_at": datetime.utcnow()
}
},
upsert=True
)
# Customer memory
customer_memory_col.update_one(
{"customer_id": row["sender_customer_id"]},
{
"$setOnInsert": {
"customer_id": row["sender_customer_id"],
"total_transactions": 0,
"total_sent_amount": 0.0,
"avg_transaction_amount": 0.0,
"historical_risk_scores": [],
"suspicious_count": 0,
"last_suspicious_date": None,
"updated_at": datetime.utcnow()
}
},
upsert=True
)
print("Data inserted successfully.")