-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
164 lines (123 loc) · 4.21 KB
/
database.py
File metadata and controls
164 lines (123 loc) · 4.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
Database module for SMINT Stock Prediction Application
Handles user authentication with SQLite and password hashing
"""
import sqlite3
import hashlib
from datetime import datetime
# Database file path
DATABASE_PATH = "smint.db"
def get_connection():
"""Create and return a database connection."""
conn = sqlite3.connect(DATABASE_PATH)
conn.row_factory = sqlite3.Row # Enables column access by name
return conn
def init_database():
"""Initialize the database and create tables if they don't exist."""
conn = get_connection()
cursor = conn.cursor()
# Create users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT NOT NULL,
password_hash TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
# Create default admin user if not exists
if not user_exists("admin"):
create_user("admin", "admin@smint.com", "password123")
def hash_password(password):
"""Hash a password using SHA-256."""
return hashlib.sha256(password.encode()).hexdigest()
def create_user(username, email, password):
"""
Create a new user in the database.
Returns: (success: bool, message: str)
"""
# Check if username already exists
if user_exists(username):
return False, "Username already exists."
try:
conn = get_connection()
cursor = conn.cursor()
# Hash the password before storing
password_hash = hash_password(password)
cursor.execute('''
INSERT INTO users (username, email, password_hash, created_at)
VALUES (?, ?, ?, ?)
''', (username, email, password_hash, datetime.now()))
conn.commit()
conn.close()
return True, f"Account created successfully for {username}!"
except sqlite3.Error as e:
return False, f"Database error: {str(e)}"
def verify_user(username, password):
"""
Verify user credentials.
Returns: (success: bool, message: str)
"""
try:
conn = get_connection()
cursor = conn.cursor()
# Hash the entered password to compare
password_hash = hash_password(password)
cursor.execute('''
SELECT * FROM users
WHERE username = ? AND password_hash = ?
''', (username, password_hash))
user = cursor.fetchone()
conn.close()
if user:
return True, f"Welcome back, {username}!"
else:
return False, "Incorrect username or password."
except sqlite3.Error as e:
return False, f"Database error: {str(e)}"
def user_exists(username):
"""Check if a username already exists in the database."""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute('SELECT 1 FROM users WHERE username = ?', (username,))
exists = cursor.fetchone() is not None
conn.close()
return exists
except sqlite3.Error:
return False
def get_user_info(username):
"""Get user information by username."""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT id, username, email, created_at
FROM users WHERE username = ?
''', (username,))
user = cursor.fetchone()
conn.close()
if user:
return {
"id": user["id"],
"username": user["username"],
"email": user["email"],
"created_at": user["created_at"]
}
return None
except sqlite3.Error:
return None
def get_all_users():
"""Get all users (admin function)."""
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute('SELECT id, username, email, created_at FROM users')
users = cursor.fetchall()
conn.close()
return [dict(user) for user in users]
except sqlite3.Error:
return []