-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodels.py
More file actions
126 lines (100 loc) · 5.56 KB
/
models.py
File metadata and controls
126 lines (100 loc) · 5.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import os
from datetime import datetime
from extensions import db, login_manager # Import from extensions
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
files = db.relationship('File', backref='owner', lazy=True)
monitored_folders = db.relationship('MonitoredFolder', backref='owner', lazy=True)
backup_jobs = db.relationship('BackupJob', backref='owner', lazy=True)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def __repr__(self):
return f'<User {self.username}>'
class MonitoredFolder(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255), nullable=False)
path = db.Column(db.String(1024), nullable=False)
is_active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
last_scan_at = db.Column(db.DateTime, nullable=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
backup_interval = db.Column(db.Integer, default=60) # Backup interval in minutes
backup_logs = db.relationship('BackupLog', backref='folder', lazy=True)
def __repr__(self):
return f'<MonitoredFolder {self.name}>'
class BackupJob(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
status = db.Column(db.String(50), default='pending') # pending, running, completed, failed
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
is_manual = db.Column(db.Boolean, default=False)
logs = db.relationship('BackupLog', backref='job', lazy=True)
def __repr__(self):
return f'<BackupJob {self.name}>'
class BackupLog(db.Model):
id = db.Column(db.Integer, primary_key=True)
message = db.Column(db.Text, nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
level = db.Column(db.String(20), default='info') # info, warning, error
folder_id = db.Column(db.Integer, db.ForeignKey('monitored_folder.id'))
job_id = db.Column(db.Integer, db.ForeignKey('backup_job.id'))
file_id = db.Column(db.Integer, db.ForeignKey('file.id'), nullable=True)
def __repr__(self):
return f'<BackupLog {self.id}>'
class File(db.Model):
id = db.Column(db.Integer, primary_key=True)
filename = db.Column(db.String(255), nullable=False)
original_filename = db.Column(db.String(255), nullable=False)
size = db.Column(db.Integer, nullable=False) # Size in bytes
content_type = db.Column(db.String(100), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
is_deleted = db.Column(db.Boolean, default=False)
source_folder_id = db.Column(db.Integer, db.ForeignKey('monitored_folder.id'), nullable=True)
source_path = db.Column(db.String(1024), nullable=True) # Original path in monitored folder
is_auto_backup = db.Column(db.Boolean, default=False)
versions = db.relationship('FileVersion', backref='file', lazy=True, cascade="all, delete-orphan")
backup_logs = db.relationship('BackupLog', backref='file', lazy=True)
def __repr__(self):
return f'<File {self.original_filename}>'
def get_latest_version(self):
return FileVersion.query.filter_by(file_id=self.id).order_by(FileVersion.version_number.desc()).first()
def get_path(self):
latest_version = self.get_latest_version()
if latest_version:
return latest_version.get_path()
return None
@property
def source_type(self):
"""Return 'manual' or 'auto' based on how the file was added"""
return 'auto' if self.is_auto_backup else 'manual'
class FileVersion(db.Model):
id = db.Column(db.Integer, primary_key=True)
file_id = db.Column(db.Integer, db.ForeignKey('file.id'), nullable=False)
version_number = db.Column(db.Integer, nullable=False)
filename = db.Column(db.String(255), nullable=False) # The internal filename on disk
size = db.Column(db.Integer, nullable=False) # Size in bytes
created_at = db.Column(db.DateTime, default=datetime.utcnow)
is_current = db.Column(db.Boolean, default=True)
checksum = db.Column(db.String(128), nullable=True) # For file integrity verification
change_reason = db.Column(db.String(255), nullable=True) # Description of what changed
def __repr__(self):
return f'<FileVersion {self.file_id}-{self.version_number}>'
def get_path(self):
from app import app
return os.path.join(app.config['UPLOAD_FOLDER'], self.filename)