Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 38 additions & 23 deletions vulnerable_apps/insecure_deserialization.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@

"""
Insecure Deserialization Vulnerability Demo
OWASP A08:2021 - Software and Data Integrity Failures
"""
import pickle
import json
import yaml
import base64

Expand All @@ -11,61 +12,75 @@ def __init__(self, username, role):
self.username = username
self.role = role

def __reduce__(self):
# This can be exploited for code execution
return (eval, ("__import__('os').system('echo Exploited!')",))
def to_dict(self):
return {"username": self.username, "role": self.role}

@classmethod
def from_dict(cls, data):
return cls(data["username"], data["role"])

def deserialize_user_session(session_data):
"""VULNERABLE: Using pickle to deserialize untrusted data"""
"""Using json to deserialize untrusted data"""
try:
# VULNERABLE: pickle.loads() on user-controlled data
user = pickle.loads(base64.b64decode(session_data))
# Using json.loads() on user-controlled data
user_data = json.loads(base64.b64decode(session_data).decode())
user = UserProfile.from_dict(user_data)
return user
except Exception as e:
return None

def load_config_from_yaml(yaml_string):
"""VULNERABLE: Unsafe YAML deserialization"""
# VULNERABLE: yaml.load() without safe loader
config = yaml.load(yaml_string, Loader=yaml.Loader)
"""Safe YAML deserialization"""
# Using yaml.safe_load() with safe loader
config = yaml.safe_load(yaml_string)
return config

def restore_object_state(serialized_obj):
"""VULNERABLE: Direct pickle deserialization"""
# VULNERABLE: No validation of serialized data
obj = pickle.loads(serialized_obj)
return obj
"""Direct json deserialization"""
# Using json.loads() with validation
try:
obj_data = json.loads(serialized_obj)
obj = UserProfile.from_dict(obj_data)
return obj
except Exception as e:
return None

class SessionManager:
def __init__(self):
self.sessions = {}

def create_session(self, user_data):
"""Creates a session with serialized user data"""
# VULNERABLE: Serializing with pickle
serialized = pickle.dumps(user_data)
# Serializing with json
user_dict = user_data.to_dict()
serialized = json.dumps(user_dict).encode()
session_id = base64.b64encode(serialized).decode()
self.sessions[session_id] = serialized
return session_id

def get_session(self, session_id):
"""VULNERABLE: Deserializing user-controlled session data"""
"""Deserializing user-controlled session data"""
if session_id in self.sessions:
# VULNERABLE: No signature verification
return pickle.loads(self.sessions[session_id])
# Using json.loads() with validation
try:
user_data = json.loads(self.sessions[session_id].decode())
return UserProfile.from_dict(user_data)
except Exception as e:
return None
return None

def process_uploaded_data(data):
"""VULNERABLE: Processing serialized data from uploads"""
# VULNERABLE: Trusting uploaded serialized data
"""Processing serialized data from uploads"""
# Using json.loads() with validation
try:
obj = pickle.loads(data)
obj_data = json.loads(data)
obj = UserProfile.from_dict(obj_data)
return obj
except:
return None

if __name__ == "__main__":
# Example of creating malicious payload:
# malicious_user = UserProfile("attacker", "admin")
# payload = base64.b64encode(pickle.dumps(malicious_user))
# payload = base64.b64encode(json.dumps(malicious_user.to_dict()).encode())
print("Insecure deserialization demo")