forked from avikaprasad22/genescope_backend
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain.py
More file actions
249 lines (236 loc) · 8.51 KB
/
main.py
File metadata and controls
249 lines (236 loc) · 8.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# imports from flask
import json
import os
import shutil
from urllib.parse import urljoin, urlparse
from flask import abort, redirect, render_template, request, send_from_directory, url_for, jsonify, current_app, current_app
from flask_login import current_user, login_user, logout_user, login_required, login_required
from flask.cli import AppGroup
from flask_cors import CORS
from werkzeug.security import generate_password_hash
# Flask app setup
from __init__ import app, db, login_manager
CORS(app, supports_credentials=True)
# CORS for frontend running on a different port
# Flask app setup
# API imports (teammate + yours)
from api.user import user_api
from api.news import news_api
from api.pfp import pfp_api
from api.nestImg import nestImg_api
from api.riskquiz import riskquiz_api
from api.post import post_api
from api.channel import channel_api
from api.group import group_api
from api.section import section_api
from api.student import student_api
from api.nestPost import nestPost_api
from api.messages_api import messages_api
from api.questions import questions_api
from api.scoreboard import scoreboard_api
from api.vote import vote_api
from api.resource import resource_api
# :white_check_mark: Yours
from api.vote import vote_api
from api.resource import resource_api
from api.illumina import illumina_api
from api.dna_sequencing import dna_api
# from api.chatbot import chatbot_api
from api.dnabot import dnabot_api
from api.college import college_api # <-- added line
from api.matching import matching_api
# Register all blueprints
from api.gene_resource import gene_resource_api
# Register all blueprints
app.register_blueprint(user_api)
app.register_blueprint(news_api)
app.register_blueprint(pfp_api)
app.register_blueprint(nestImg_api)
app.register_blueprint(post_api)
app.register_blueprint(channel_api)
app.register_blueprint(group_api)
app.register_blueprint(section_api)
app.register_blueprint(student_api)
app.register_blueprint(riskquiz_api)
app.register_blueprint(nestPost_api)
app.register_blueprint(messages_api)
app.register_blueprint(questions_api)
app.register_blueprint(scoreboard_api)
app.register_blueprint(vote_api)
app.register_blueprint(resource_api)
# :white_check_mark: Register yours
app.register_blueprint(illumina_api)
app.register_blueprint(college_api)
app.register_blueprint(matching_api)
app.register_blueprint(dnabot_api)
# Login Manager
app.register_blueprint(dna_api)
# app.register_blueprint(chatbot_api)
app.register_blueprint(gene_resource_api)
# Login Manager
login_manager.login_view = "login"
@login_manager.unauthorized_handler
def unauthorized_callback():
return redirect(url_for('login', next=request.path))
@login_manager.user_loader
def load_user(user_id):
from model.user import User
return User.query.get(int(user_id))
@app.context_processor
def inject_user():
return dict(current_user=current_user)
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
@app.route('/login', methods=['GET', 'POST'])
def login():
from model.user import User
error = None
next_page = request.args.get('next', '') or request.form.get('next', '')
if request.method == 'POST':
user = User.query.filter_by(_uid=request.form['username']).first()
if user and user.is_password(request.form['password']):
login_user(user)
if not is_safe_url(next_page):
return abort(400)
return redirect(next_page or url_for('index'))
else:
error = 'Invalid username or password.'
return render_template("login.html", error=error, next=next_page)
@app.route('/logout')
def logout():
logout_user()
return redirect(url_for('index'))
@app.errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404
@app.route('/')
def index():
print("Home:", current_user)
return render_template("index.html")
@app.route('/users/table')
@login_required
def utable():
from model.user import User
users = User.query.all()
return render_template("utable.html", user_data=users)
@app.route('/users/table2')
@login_required
def u2table():
from model.user import User
users = User.query.all()
return render_template("u2table.html", user_data=users)
@app.route('/uploads/<path:filename>')
def uploaded_file(filename):
return send_from_directory(current_app.config['UPLOAD_FOLDER'], filename)
@app.route('/users/delete/<int:user_id>', methods=['DELETE'])
@login_required
def delete_user(user_id):
from model.user import User
user = User.query.get(user_id)
if user:
user.delete()
return jsonify({'message': 'User deleted successfully'}), 200
return jsonify({'error': 'User not found'}), 404
@app.route('/users/reset_password/<int:user_id>', methods=['POST'])
@login_required
def reset_password(user_id):
from model.user import User
if current_user.role != 'Admin':
return jsonify({'error': 'Unauthorized'}), 403
user = User.query.get(user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
if user.update({"password": app.config['DEFAULT_PASSWORD']}):
return jsonify({'message': 'Password reset successfully'}), 200
return jsonify({'error': 'Password reset failed'}), 500
# CLI Setup
custom_cli = AppGroup('custom', help='Custom commands')
@custom_cli.command('generate_data')
def generate_data():
from model.user import initUsers
from model.section import initSections
from model.group import initGroups
from model.channel import initChannels
from model.post import initPosts
from model.nestPost import initNestPosts
from model.vote import initVotes
from model.gene_resource import init_gene_resources
init_gene_resources()
initUsers()
initSections()
initGroups()
initChannels()
initPosts()
initNestPosts()
initVotes()
@custom_cli.command('backup_data')
def backup_data():
data = extract_data()
save_data_to_json(data)
backup_database(app.config['SQLALCHEMY_DATABASE_URI'], app.config['SQLALCHEMY_BACKUP_URI'])
@custom_cli.command('restore_data')
def restore_data_command():
data = load_data_from_json()
restore_data(data)
app.cli.add_command(custom_cli)
# DB helpers
def extract_data():
from model.user import User
from model.section import Section
from model.group import Group
from model.channel import Channel
from model.post import Post
data = {}
with app.app_context():
data['users'] = [user.read() for user in User.query.all()]
data['sections'] = [section.read() for section in Section.query.all()]
data['groups'] = [group.read() for group in Group.query.all()]
data['channels'] = [channel.read() for channel in Channel.query.all()]
data['posts'] = [post.read() for post in Post.query.all()]
return data
def save_data_to_json(data, directory='backup'):
if not os.path.exists(directory):
os.makedirs(directory)
for table, records in data.items():
with open(os.path.join(directory, f'{table}.json'), 'w') as f:
json.dump(records, f)
print(f"Data backed up to {directory} directory.")
def load_data_from_json(directory='backup'):
data = {}
for table in ['users', 'sections', 'groups', 'channels']:
with open(os.path.join(directory, f'{table}.json'), 'r') as f:
data[table] = json.load(f)
return data
def restore_data(data):
from model.user import User
from model.section import Section
from model.group import Group
from model.channel import Channel
with app.app_context():
users = User.restore(data['users'])
_ = Section.restore(data['sections'])
_ = Group.restore(data['groups'], users)
_ = Channel.restore(data['channels'])
print("Data restored to the new database.")
def backup_database(src_uri, backup_uri):
if src_uri.startswith("sqlite:///") and backup_uri.startswith("sqlite:///"):
src_path = src_uri.replace("sqlite:///", "")
backup_path = backup_uri.replace("sqlite:///", "")
try:
shutil.copy2(src_path, backup_path)
print("Database file copied successfully.")
except Exception as e:
print("Error backing up database file:", e)
CORS(app, resources={
r"/riskquiz/api/*": {
"origins": [
"http://127.0.0.1:4504",
"https://open-coding-society.github.io"
]
}
})
# Run Flask on port 8504
if __name__ == "__main__":
app.run(debug=True, port=8504)