Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions DataProcess/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
PROFILE_NAME=""
REGION=""
BUCKET_NAME=""
OBJECT_PATH=""
1 change: 1 addition & 0 deletions DataProcess/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.env
202 changes: 202 additions & 0 deletions DataProcess/filter_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import os
import boto3
import polars as pl
from dotenv import load_dotenv
from datetime import datetime, timedelta
from io import BytesIO
from collections import defaultdict

# Load environment variables
load_dotenv()

PROFILE_NAME = os.getenv("PROFILE_NAME")
REGION = os.getenv("REGION")
BUCKET_NAME = os.getenv("BUCKET_NAME")
OBJECT_PATH = os.getenv("OBJECT_PATH")


def get_s3_session():
"""Create and return a boto3 session."""
try:
# Use profile_name and region_name if provided in .env
session_kwargs = {}
if PROFILE_NAME:
session_kwargs['profile_name'] = PROFILE_NAME
if REGION:
session_kwargs['region_name'] = REGION

return boto3.Session(**session_kwargs)
except Exception as e:
print(f"Error creating boto3 session: {e}")
return None


def get_date_range(start_date_str, end_date_str):
"""Generate a list of dates between start_date and end_date (inclusive)."""
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")

delta = end_date - start_date
if delta.days < 0:
return []
return [start_date + timedelta(days=i) for i in range(delta.days + 1)]


def fetch_data_from_s3(start_date_str, end_date_str):
"""
Fetch data from S3 for the given date range using Polars.
Returns a dictionary where keys are 'YYYY/MM/DD.csv.gz' and values are Polars DataFrames.
"""
session = get_s3_session()
if not session:
return {}

s3 = session.client('s3')
dates = get_date_range(start_date_str, end_date_str)

df_dict = {}
missing_dates = []
monthly_rows = defaultdict(int)
total_rows = 0

for date in dates:
year = date.strftime("%Y")
month = date.strftime("%m")
day = date.strftime("%d")
date_key = date.strftime("%Y-%m-%d")

# Relative key for dictionary: YYYY/MM/DD.csv.gz
relative_key = f"{year}/{month}/{day}.csv.gz"
# Full S3 key: path/YYYY/MM/DD.csv.gz
full_key = f"{OBJECT_PATH}/{relative_key}"

try:
print(f"Fetching: s3://{BUCKET_NAME}/{full_key}")
response = s3.get_object(Bucket=BUCKET_NAME, Key=full_key)
content = response['Body'].read()

# Read gzipped CSV using Polars
df = pl.read_csv(BytesIO(content))
df_dict[relative_key] = df

# Update statistics
row_count = len(df)
monthly_rows[f"{year}-{month}"] += row_count
total_rows += row_count

print(f" -> Successfully loaded {row_count} rows.")

except s3.exceptions.NoSuchKey:
print(f" -> File not found: {full_key}")
missing_dates.append(date_key)
except Exception as e:
print(f" -> Error fetching {full_key}: {e}")
missing_dates.append(date_key)

# Print Report
print("\n" + "="*40)
print(" S3 DATA FETCH REPORT")
print("="*40)

if missing_dates:
print(f"Missing Dates ({len(missing_dates)}):")
for d in missing_dates:
print(f" - {d}")
else:
print("All dates in range were successfully loaded.")

print("\nMonthly Row Counts:")
for mon, count in sorted(monthly_rows.items()):
print(f" - {mon}: {count:,} rows")

print(f"\nTotal Row Count: {total_rows:,} rows")
print("="*40 + "\n")

return df_dict


def remove_pii_data(df_dict):
"""
Remove PII (user_name column) from all DataFrames in the dictionary.
"""
cleaned_dict = {}
for key, df in df_dict.items():
if "user_name" in df.columns:
cleaned_dict[key] = df.drop("user_name")
else:
cleaned_dict[key] = df
return cleaned_dict


def upload_data_to_s3(df_dict, target_root_key):
"""
Upload processed DataFrames back to S3 under a specific root key.
Maintains the YYYY/MM/DD.csv.gz structure.
"""
session = get_s3_session()
if not session:
return False

s3 = session.client('s3')
success_count = 0
total_count = len(df_dict)

print("\n" + "="*40)
print(f" UPLOADING DATA TO S3 (Root: {target_root_key})")
print("="*40)

import gzip
for relative_key, df in df_dict.items():
target_key = f"{target_root_key}/{relative_key}"
try:
csv_buffer = BytesIO()
df.write_csv(csv_buffer)

compressed_buffer = BytesIO()
with gzip.GzipFile(fileobj=compressed_buffer, mode='wb') as f:
f.write(csv_buffer.getvalue())

print(f"Uploading: s3://{BUCKET_NAME}/{target_key}")
s3.put_object(
Bucket=BUCKET_NAME,
Key=target_key,
Body=compressed_buffer.getvalue()
)
success_count += 1
print(f" -> Successfully uploaded.")
except Exception as e:
print(f" -> Error uploading {target_key}: {e}")

print("\n" + "="*40)
print(f"Upload Complete: {success_count}/{total_count} files uploaded.")
print("="*40 + "\n")
return success_count == total_count


if __name__ == "__main__":
# Settings
START_DATE = "2025-09-18"
END_DATE = "2025-12-29"
TARGET_ROOT_KEY = "filtered/behavior"

# 1. Download
data_dict = fetch_data_from_s3(START_DATE, END_DATE)

if data_dict:
# Debug: Show head after download
first_key = list(data_dict.keys())[0]
print(f"\n[DEBUG] Sample data head after download ({first_key}):")
print(data_dict[first_key].head(3))

# 2. Process (Remove PII)
print("\nRemoving PII data (user_name)...")
data_dict = remove_pii_data(data_dict)

# Debug: Show head after PII removal
print(f"\n[DEBUG] Sample data head after PII removal ({first_key}):")
print(data_dict[first_key].head(3))

# 3. Upload
upload_data_to_s3(data_dict, TARGET_ROOT_KEY)
else:
print("No data fetched to process.")
3 changes: 3 additions & 0 deletions DataProcess/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
polars
boto3
python-dotenv
120 changes: 120 additions & 0 deletions DataProcess/verify_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import os
import boto3
import polars as pl
from dotenv import load_dotenv
from io import BytesIO

from datetime import datetime, timedelta

# Load environment variables
load_dotenv()

PROFILE_NAME = os.getenv("PROFILE_NAME")
REGION = os.getenv("REGION")
BUCKET_NAME = os.getenv("BUCKET_NAME")
OBJECT_PATH = os.getenv("OBJECT_PATH") # Original source path

def get_s3_session():
"""Create and return a boto3 session."""
session_kwargs = {}
if PROFILE_NAME:
session_kwargs['profile_name'] = PROFILE_NAME
if REGION:
session_kwargs['region_name'] = REGION
return boto3.Session(**session_kwargs)

def get_date_range(start_date_str, end_date_str):
"""Generate a list of dates between start_date and end_date (inclusive)."""
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")

delta = end_date - start_date
if delta.days < 0:
return []
return [start_date + timedelta(days=i) for i in range(delta.days + 1)]

def verify_s3_uploads(start_date_str, end_date_str, target_root):
"""
Rigorous verification:
1. Fetches source data from OBJECT_PATH.
2. Fetches uploaded data from target_root.
3. Compares row counts 1:1.
4. Verifies 'user_name' is missing in target.
"""
session = get_s3_session()
s3 = session.client('s3')
dates = get_date_range(start_date_str, end_date_str)

print("\n" + "="*70)
print(f" RIGOROUS S3 DATA CROSS-VERIFICATION")
print(f" Source: {OBJECT_PATH}")
print(f" Target: {target_root}")
print(f" Range: {start_date_str} to {end_date_str}")
print("="*70)

stats = {"pass": 0, "mismatch": 0, "missing_src": 0, "missing_tgt": 0, "pii_fail": 0}

for date in dates:
date_str = date.strftime("%Y-%m-%d")
rel_path = f"{date.strftime('%Y/%m/%d')}.csv.gz"
src_key = f"{OBJECT_PATH}/{rel_path}"
tgt_key = f"{target_root}/{rel_path}"

src_rows = None
tgt_rows = None

# Fetch Source
try:
resp = s3.get_object(Bucket=BUCKET_NAME, Key=src_key)
src_df = pl.read_csv(BytesIO(resp['Body'].read()))
src_rows = len(src_df)
except s3.exceptions.NoSuchKey:
src_rows = -1 # Mark as missing
except Exception as e:
print(f" [ERR] Source {date_str}: {e}")

# Fetch Target
try:
resp = s3.get_object(Bucket=BUCKET_NAME, Key=tgt_key)
tgt_df = pl.read_csv(BytesIO(resp['Body'].read()))
tgt_rows = len(tgt_df)

# PII Check
if "user_name" in tgt_df.columns:
print(f" [FAILURE] {date_str}: PII ('user_name') found in target!")
stats["pii_fail"] += 1
except s3.exceptions.NoSuchKey:
tgt_rows = -1
except Exception as e:
print(f" [ERR] Target {date_str}: {e}")

# Comparison
if src_rows == -1:
print(f" [SKIP] {date_str}: Source file missing in S3 ({src_key})")
stats["missing_src"] += 1
elif tgt_rows == -1:
print(f" [FAIL] {date_str}: Target file missing in S3 ({tgt_key})")
stats["missing_tgt"] += 1
elif src_rows != tgt_rows:
print(f" [FAIL] {date_str}: Row count mismatch! (Src: {src_rows}, Tgt: {tgt_rows})")
stats["mismatch"] += 1
else:
print(f" [PASS] {date_str}: Row counts match ({src_rows}).")
stats["pass"] += 1

print("\n" + "="*70)
print(f"Verification Summary:")
print(f" - PASS: {stats['pass']}")
print(f" - Mismatch: {stats['mismatch']}")
print(f" - Missing Src: {stats['missing_src']}")
print(f" - Missing Tgt: {stats['missing_tgt']}")
print(f" - PII Failures: {stats['pii_fail']}")
print("="*70 + "\n")

if __name__ == "__main__":
# Settings (Should match filter_data.py)
START_DATE = "2025-09-18"
END_DATE = "2025-12-29"
TARGET_ROOT_KEY = "filtered/behavior"

verify_s3_uploads(START_DATE, END_DATE, TARGET_ROOT_KEY)
2 changes: 1 addition & 1 deletion IaC/lambda/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def lambda_handler(event, context):
bucket_name = os.environ.get('BUCKET_NAME', None)
if bucket_name is None:
raise ValueError("BUCKET_NAME environment variable must be set")
object_name = "behavior/" + start_time.strftime("%Y/%m/%d") + ".csv.gz"
object_name = "raw/behavior/" + start_time.strftime("%Y/%m/%d") + ".csv.gz"
s3.upload_file(base_dir + filename, bucket_name, object_name)

return {
Expand Down
Loading