From 5d2ba14e9f8cedb43af972a7058ecc5168c2b81b Mon Sep 17 00:00:00 2001 From: seungwoo1124 Date: Tue, 30 Dec 2025 14:50:06 +0900 Subject: [PATCH 1/2] change path from cwlog to s3 --- IaC/lambda/lambda_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/IaC/lambda/lambda_function.py b/IaC/lambda/lambda_function.py index 6bd0fa2571..d4f21742ec 100644 --- a/IaC/lambda/lambda_function.py +++ b/IaC/lambda/lambda_function.py @@ -77,7 +77,7 @@ def lambda_handler(event, context): bucket_name = os.environ.get('BUCKET_NAME', None) if bucket_name is None: raise ValueError("BUCKET_NAME environment variable must be set") - object_name = "behavior/" + start_time.strftime("%Y/%m/%d") + ".csv.gz" + object_name = "raw/behavior/" + start_time.strftime("%Y/%m/%d") + ".csv.gz" s3.upload_file(base_dir + filename, bucket_name, object_name) return { From 6c0c1b344063f37aa2f4bef12a548f1393966fc6 Mon Sep 17 00:00:00 2001 From: seungwoo1124 Date: Tue, 30 Dec 2025 17:18:04 +0900 Subject: [PATCH 2/2] add filtering process code --- DataProcess/.env.example | 4 + DataProcess/.gitignore | 1 + DataProcess/filter_data.py | 202 +++++++++++++++++++++++++++++++++++ DataProcess/requirements.txt | 3 + DataProcess/verify_upload.py | 120 +++++++++++++++++++++ 5 files changed, 330 insertions(+) create mode 100644 DataProcess/.env.example create mode 100644 DataProcess/.gitignore create mode 100644 DataProcess/filter_data.py create mode 100644 DataProcess/requirements.txt create mode 100644 DataProcess/verify_upload.py diff --git a/DataProcess/.env.example b/DataProcess/.env.example new file mode 100644 index 0000000000..71db009689 --- /dev/null +++ b/DataProcess/.env.example @@ -0,0 +1,4 @@ +PROFILE_NAME="" +REGION="" +BUCKET_NAME="" +OBJECT_PATH="" \ No newline at end of file diff --git a/DataProcess/.gitignore b/DataProcess/.gitignore new file mode 100644 index 0000000000..4f509e525f --- /dev/null +++ b/DataProcess/.gitignore @@ -0,0 +1 @@ +*.env \ No newline at end of file diff --git a/DataProcess/filter_data.py b/DataProcess/filter_data.py new file mode 100644 index 0000000000..c3c3fbc717 --- /dev/null +++ b/DataProcess/filter_data.py @@ -0,0 +1,202 @@ +import os +import boto3 +import polars as pl +from dotenv import load_dotenv +from datetime import datetime, timedelta +from io import BytesIO +from collections import defaultdict + +# Load environment variables +load_dotenv() + +PROFILE_NAME = os.getenv("PROFILE_NAME") +REGION = os.getenv("REGION") +BUCKET_NAME = os.getenv("BUCKET_NAME") +OBJECT_PATH = os.getenv("OBJECT_PATH") + + +def get_s3_session(): + """Create and return a boto3 session.""" + try: + # Use profile_name and region_name if provided in .env + session_kwargs = {} + if PROFILE_NAME: + session_kwargs['profile_name'] = PROFILE_NAME + if REGION: + session_kwargs['region_name'] = REGION + + return boto3.Session(**session_kwargs) + except Exception as e: + print(f"Error creating boto3 session: {e}") + return None + + +def get_date_range(start_date_str, end_date_str): + """Generate a list of dates between start_date and end_date (inclusive).""" + start_date = datetime.strptime(start_date_str, "%Y-%m-%d") + end_date = datetime.strptime(end_date_str, "%Y-%m-%d") + + delta = end_date - start_date + if delta.days < 0: + return [] + return [start_date + timedelta(days=i) for i in range(delta.days + 1)] + + +def fetch_data_from_s3(start_date_str, end_date_str): + """ + Fetch data from S3 for the given date range using Polars. + Returns a dictionary where keys are 'YYYY/MM/DD.csv.gz' and values are Polars DataFrames. + """ + session = get_s3_session() + if not session: + return {} + + s3 = session.client('s3') + dates = get_date_range(start_date_str, end_date_str) + + df_dict = {} + missing_dates = [] + monthly_rows = defaultdict(int) + total_rows = 0 + + for date in dates: + year = date.strftime("%Y") + month = date.strftime("%m") + day = date.strftime("%d") + date_key = date.strftime("%Y-%m-%d") + + # Relative key for dictionary: YYYY/MM/DD.csv.gz + relative_key = f"{year}/{month}/{day}.csv.gz" + # Full S3 key: path/YYYY/MM/DD.csv.gz + full_key = f"{OBJECT_PATH}/{relative_key}" + + try: + print(f"Fetching: s3://{BUCKET_NAME}/{full_key}") + response = s3.get_object(Bucket=BUCKET_NAME, Key=full_key) + content = response['Body'].read() + + # Read gzipped CSV using Polars + df = pl.read_csv(BytesIO(content)) + df_dict[relative_key] = df + + # Update statistics + row_count = len(df) + monthly_rows[f"{year}-{month}"] += row_count + total_rows += row_count + + print(f" -> Successfully loaded {row_count} rows.") + + except s3.exceptions.NoSuchKey: + print(f" -> File not found: {full_key}") + missing_dates.append(date_key) + except Exception as e: + print(f" -> Error fetching {full_key}: {e}") + missing_dates.append(date_key) + + # Print Report + print("\n" + "="*40) + print(" S3 DATA FETCH REPORT") + print("="*40) + + if missing_dates: + print(f"Missing Dates ({len(missing_dates)}):") + for d in missing_dates: + print(f" - {d}") + else: + print("All dates in range were successfully loaded.") + + print("\nMonthly Row Counts:") + for mon, count in sorted(monthly_rows.items()): + print(f" - {mon}: {count:,} rows") + + print(f"\nTotal Row Count: {total_rows:,} rows") + print("="*40 + "\n") + + return df_dict + + +def remove_pii_data(df_dict): + """ + Remove PII (user_name column) from all DataFrames in the dictionary. + """ + cleaned_dict = {} + for key, df in df_dict.items(): + if "user_name" in df.columns: + cleaned_dict[key] = df.drop("user_name") + else: + cleaned_dict[key] = df + return cleaned_dict + + +def upload_data_to_s3(df_dict, target_root_key): + """ + Upload processed DataFrames back to S3 under a specific root key. + Maintains the YYYY/MM/DD.csv.gz structure. + """ + session = get_s3_session() + if not session: + return False + + s3 = session.client('s3') + success_count = 0 + total_count = len(df_dict) + + print("\n" + "="*40) + print(f" UPLOADING DATA TO S3 (Root: {target_root_key})") + print("="*40) + + import gzip + for relative_key, df in df_dict.items(): + target_key = f"{target_root_key}/{relative_key}" + try: + csv_buffer = BytesIO() + df.write_csv(csv_buffer) + + compressed_buffer = BytesIO() + with gzip.GzipFile(fileobj=compressed_buffer, mode='wb') as f: + f.write(csv_buffer.getvalue()) + + print(f"Uploading: s3://{BUCKET_NAME}/{target_key}") + s3.put_object( + Bucket=BUCKET_NAME, + Key=target_key, + Body=compressed_buffer.getvalue() + ) + success_count += 1 + print(f" -> Successfully uploaded.") + except Exception as e: + print(f" -> Error uploading {target_key}: {e}") + + print("\n" + "="*40) + print(f"Upload Complete: {success_count}/{total_count} files uploaded.") + print("="*40 + "\n") + return success_count == total_count + + +if __name__ == "__main__": + # Settings + START_DATE = "2025-09-18" + END_DATE = "2025-12-29" + TARGET_ROOT_KEY = "filtered/behavior" + + # 1. Download + data_dict = fetch_data_from_s3(START_DATE, END_DATE) + + if data_dict: + # Debug: Show head after download + first_key = list(data_dict.keys())[0] + print(f"\n[DEBUG] Sample data head after download ({first_key}):") + print(data_dict[first_key].head(3)) + + # 2. Process (Remove PII) + print("\nRemoving PII data (user_name)...") + data_dict = remove_pii_data(data_dict) + + # Debug: Show head after PII removal + print(f"\n[DEBUG] Sample data head after PII removal ({first_key}):") + print(data_dict[first_key].head(3)) + + # 3. Upload + upload_data_to_s3(data_dict, TARGET_ROOT_KEY) + else: + print("No data fetched to process.") diff --git a/DataProcess/requirements.txt b/DataProcess/requirements.txt new file mode 100644 index 0000000000..38c293e5e0 --- /dev/null +++ b/DataProcess/requirements.txt @@ -0,0 +1,3 @@ +polars +boto3 +python-dotenv diff --git a/DataProcess/verify_upload.py b/DataProcess/verify_upload.py new file mode 100644 index 0000000000..850f85fa0a --- /dev/null +++ b/DataProcess/verify_upload.py @@ -0,0 +1,120 @@ +import os +import boto3 +import polars as pl +from dotenv import load_dotenv +from io import BytesIO + +from datetime import datetime, timedelta + +# Load environment variables +load_dotenv() + +PROFILE_NAME = os.getenv("PROFILE_NAME") +REGION = os.getenv("REGION") +BUCKET_NAME = os.getenv("BUCKET_NAME") +OBJECT_PATH = os.getenv("OBJECT_PATH") # Original source path + +def get_s3_session(): + """Create and return a boto3 session.""" + session_kwargs = {} + if PROFILE_NAME: + session_kwargs['profile_name'] = PROFILE_NAME + if REGION: + session_kwargs['region_name'] = REGION + return boto3.Session(**session_kwargs) + +def get_date_range(start_date_str, end_date_str): + """Generate a list of dates between start_date and end_date (inclusive).""" + start_date = datetime.strptime(start_date_str, "%Y-%m-%d") + end_date = datetime.strptime(end_date_str, "%Y-%m-%d") + + delta = end_date - start_date + if delta.days < 0: + return [] + return [start_date + timedelta(days=i) for i in range(delta.days + 1)] + +def verify_s3_uploads(start_date_str, end_date_str, target_root): + """ + Rigorous verification: + 1. Fetches source data from OBJECT_PATH. + 2. Fetches uploaded data from target_root. + 3. Compares row counts 1:1. + 4. Verifies 'user_name' is missing in target. + """ + session = get_s3_session() + s3 = session.client('s3') + dates = get_date_range(start_date_str, end_date_str) + + print("\n" + "="*70) + print(f" RIGOROUS S3 DATA CROSS-VERIFICATION") + print(f" Source: {OBJECT_PATH}") + print(f" Target: {target_root}") + print(f" Range: {start_date_str} to {end_date_str}") + print("="*70) + + stats = {"pass": 0, "mismatch": 0, "missing_src": 0, "missing_tgt": 0, "pii_fail": 0} + + for date in dates: + date_str = date.strftime("%Y-%m-%d") + rel_path = f"{date.strftime('%Y/%m/%d')}.csv.gz" + src_key = f"{OBJECT_PATH}/{rel_path}" + tgt_key = f"{target_root}/{rel_path}" + + src_rows = None + tgt_rows = None + + # Fetch Source + try: + resp = s3.get_object(Bucket=BUCKET_NAME, Key=src_key) + src_df = pl.read_csv(BytesIO(resp['Body'].read())) + src_rows = len(src_df) + except s3.exceptions.NoSuchKey: + src_rows = -1 # Mark as missing + except Exception as e: + print(f" [ERR] Source {date_str}: {e}") + + # Fetch Target + try: + resp = s3.get_object(Bucket=BUCKET_NAME, Key=tgt_key) + tgt_df = pl.read_csv(BytesIO(resp['Body'].read())) + tgt_rows = len(tgt_df) + + # PII Check + if "user_name" in tgt_df.columns: + print(f" [FAILURE] {date_str}: PII ('user_name') found in target!") + stats["pii_fail"] += 1 + except s3.exceptions.NoSuchKey: + tgt_rows = -1 + except Exception as e: + print(f" [ERR] Target {date_str}: {e}") + + # Comparison + if src_rows == -1: + print(f" [SKIP] {date_str}: Source file missing in S3 ({src_key})") + stats["missing_src"] += 1 + elif tgt_rows == -1: + print(f" [FAIL] {date_str}: Target file missing in S3 ({tgt_key})") + stats["missing_tgt"] += 1 + elif src_rows != tgt_rows: + print(f" [FAIL] {date_str}: Row count mismatch! (Src: {src_rows}, Tgt: {tgt_rows})") + stats["mismatch"] += 1 + else: + print(f" [PASS] {date_str}: Row counts match ({src_rows}).") + stats["pass"] += 1 + + print("\n" + "="*70) + print(f"Verification Summary:") + print(f" - PASS: {stats['pass']}") + print(f" - Mismatch: {stats['mismatch']}") + print(f" - Missing Src: {stats['missing_src']}") + print(f" - Missing Tgt: {stats['missing_tgt']}") + print(f" - PII Failures: {stats['pii_fail']}") + print("="*70 + "\n") + +if __name__ == "__main__": + # Settings (Should match filter_data.py) + START_DATE = "2025-09-18" + END_DATE = "2025-12-29" + TARGET_ROOT_KEY = "filtered/behavior" + + verify_s3_uploads(START_DATE, END_DATE, TARGET_ROOT_KEY)