diff --git a/.gitignore b/.gitignore index e69de29..0a6ef2e 100644 --- a/.gitignore +++ b/.gitignore @@ -0,0 +1,4 @@ +.venv +test_logs.txt +apache_logs.db +.ipynb_checkpoints \ No newline at end of file diff --git a/README.md b/README.md index e69de29..0ae087c 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,25 @@ +# 📊 LogForge Apache Log ETL + +## 📁 Overview +LogForge is an Apache log parser and analyzer pipeline that extracts, parses, stores, and summarizes log data using Python and SQLite. + +--- + +## ⚙️ Project Structure + +- `etl_apache.py` – CLI runner for the ETL pipeline +- `parser.py` – Parses raw log lines using regex +- `database.py` – Manages DB schema, connection, and insert logic +- `summerizer.py` – Generates daily summary CSVs +- `apache_logs.db` – Local SQLite DB +- `data/logs/apache_logs` – Folder containing raw logs + +--- + +## 🔍 Schema +![alt text](image.png) + +## 🚀 Usage + +```bash +python etl_apache.py --log data/logs/apache_logs diff --git a/__pycache__/database.cpython-312.pyc b/__pycache__/database.cpython-312.pyc new file mode 100644 index 0000000..dfdc374 Binary files /dev/null and b/__pycache__/database.cpython-312.pyc differ diff --git a/__pycache__/parser.cpython-312.pyc b/__pycache__/parser.cpython-312.pyc new file mode 100644 index 0000000..0e086f0 Binary files /dev/null and b/__pycache__/parser.cpython-312.pyc differ diff --git a/database.py b/database.py index 46cbdc6..02828ab 100644 --- a/database.py +++ b/database.py @@ -1 +1,68 @@ -# Contains DB Connection and Schema, and insertion logic \ No newline at end of file +import sqlite3 + +def connect_db(db_path="apache_logs.db"): + return sqlite3.connect(db_path) + +def create_tables(conn): + cursor = conn.cursor() + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ip TEXT, + timestamp TEXT, + method TEXT, + path TEXT, + protocol TEXT, + status INTEGER, + bytes INTEGER, + referrer TEXT, + user_agent TEXT, + signature_hash TEXT UNIQUE + ) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS errors ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + raw_line TEXT, + error_reason TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP + ) + """) + + conn.commit() + +def insert_logs(conn, logs_df): + cursor = conn.cursor() + for _, row in logs_df.iterrows(): + try: + cursor.execute(""" + INSERT INTO logs ( + ip, timestamp, method, path, protocol, status, + bytes, referrer, user_agent, signature_hash + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + row["ip"], + row["timestamp"], + row["method"], + row["path"], + row["protocol"], + row["status"], + row["bytes_sent"], # this maps to `bytes` in DB + row["referrer"], + row["user_agent"], + row["signature_hash"] + )) + except sqlite3.IntegrityError: + continue # Skip duplicates + conn.commit() + +def insert_errors(conn, errors_df): + cursor = conn.cursor() + for _, row in errors_df.iterrows(): + cursor.execute(""" + INSERT INTO errors (raw_line, error_reason) + VALUES (?, ?) + """, (row["raw_log"], row["error_reason"])) + conn.commit() diff --git a/etl_apache.py b/etl_apache.py index de1d6e3..5453082 100644 --- a/etl_apache.py +++ b/etl_apache.py @@ -1 +1,50 @@ -# Entry-point CLI \ No newline at end of file +# etl_apache.py — Entry-point CLI tool for ETL +import argparse +import pandas as pd +from parser import transform_logs +from database import connect_db, create_tables, insert_logs, insert_errors +import chardet + +def detect_encoding(file_path): + with open(file_path, 'rb') as f: + raw = f.read(10000) # sample + return chardet.detect(raw)['encoding'] + +def extract_logs(file_path): + """Reads raw log lines into a DataFrame.""" + encoding = detect_encoding(file_path) + with open(file_path, 'r', encoding=encoding,errors='replace' ) as f: + lines = f.readlines() + return pd.DataFrame({'raw_log': [line.strip() for line in lines]}) + +def run_etl(log_path): + print("🔄 Starting ETL pipeline...") + + raw_df = extract_logs(log_path) + + # Connect to SQLite and set up schema + conn = connect_db() + create_tables(conn) + + # Transform logs + cleaned_df, malformed_df = transform_logs(raw_df) + + # Load valid logs + if not cleaned_df.empty: + insert_logs(conn, cleaned_df) + print(f"✅ Inserted {len(cleaned_df)} valid logs.") + + # Load errors + if not malformed_df.empty: + insert_errors(conn, malformed_df) + print(f"⚠️ Logged {len(malformed_df)} malformed lines.") + + conn.close() + print(" ETL process completed.") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run Apache Log ETL") + parser.add_argument('--log', type=str, required=True, help='Path to Apache log file') + args = parser.parse_args() + + run_etl(args.log) diff --git a/image.png b/image.png new file mode 100644 index 0000000..ec64531 Binary files /dev/null and b/image.png differ diff --git a/parser.py b/parser.py index f17d252..4295c71 100644 --- a/parser.py +++ b/parser.py @@ -1 +1,60 @@ -# Log Parsing Logic \ No newline at end of file +import re +import pandas as pd +import hashlib +from datetime import datetime + +# regex pattern +log_pattern = re.compile( + r'(?P\d+\.\d+\.\d+\.\d+)\s-\s-\s' + r'\[(?P[^\]]+)\]\s' + r'"(?P\w+)\s(?P.*?)\s(?PHTTP/\d\.\d)"\s' + r'(?P\d{3})\s(?P\d+)\s' + r'"(?P[^"]*)"\s' + r'"(?P[^"]*)"' +) + +# Parse timestamp +def parse_timestamp(raw: str) -> str | None: + try: + return datetime.strptime(raw, "%d/%b/%Y:%H:%M:%S %z").isoformat() + except Exception: + return None + +# Generate unique hash for deduplication +def generate_hash(ip: str, timestamp: str, path: str) -> str: + return hashlib.md5(f"{ip}_{timestamp}_{path}".encode()).hexdigest() + +# Main transform function +def transform_logs(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]: + structured_logs = [] + malformed_logs = [] + + for raw in df["raw_log"]: + raw = raw.strip() + match = log_pattern.match(raw) + if not match: + malformed_logs.append({"raw_log": raw, "error_reason": "Regex match failed"}) + continue + + data = match.groupdict() + timestamp = parse_timestamp(data["timestamp"]) + if not timestamp: + malformed_logs.append({"raw_log": raw, "error_reason": "Invalid timestamp"}) + continue + + uid = generate_hash(data["ip"], timestamp, data["path"]) + + structured_logs.append({ + "signature_hash": uid, + "ip": data["ip"], + "timestamp": timestamp, + "method": data["method"], + "path": data["path"], + "protocol": data["protocol"], + "status": int(data["status"]), + "bytes_sent": int(data["bytes"]), + "referrer": data["referrer"], + "user_agent": data["user_agent"] + }) + + return pd.DataFrame(structured_logs), pd.DataFrame(malformed_logs) diff --git a/requirements.txt b/requirements.txt index e69de29..1624071 100644 --- a/requirements.txt +++ b/requirements.txt @@ -0,0 +1,2 @@ +pandas +chardet \ No newline at end of file diff --git a/summarizer.py b/summarizer.py index b8b5d98..fa49ab9 100644 --- a/summarizer.py +++ b/summarizer.py @@ -1 +1,23 @@ -# Report Generation \ No newline at end of file +# summarizer.py +import sqlite3 +import pandas as pd +import json + +def generate_daily_summary(date_str): + conn = sqlite3.connect("apache_logs.db") + query = f""" + SELECT date(timestamp) AS log_date, status, COUNT(*) AS hits + FROM logs + WHERE date(timestamp) = '{date_str}' + GROUP BY status + ORDER BY status + """ + df = pd.read_sql_query(query, conn) + + # Save as JSON instead of CSV + output_filename = f"summary_{date_str}.json" + with open(output_filename, 'w', encoding='utf-8') as f: + json.dump(df.to_dict(orient='records'), f, indent=2) + + print(f"📦 Daily summary saved as {output_filename}") + conn.close()