|
| 1 | +from flask import render_template, request, send_file, jsonify, redirect, url_for, flash, session |
| 2 | +from flask_login import login_required, login_user, logout_user, current_user |
| 3 | +from flask_mail import Message |
| 4 | +from sqlalchemy.exc import IntegrityError |
| 5 | +import csv |
| 6 | +import io |
| 7 | +import openpyxl |
| 8 | +import pyotp |
| 9 | +import json |
| 10 | +import logging |
| 11 | +from collections import Counter |
| 12 | +from flask_paginate import Pagination |
| 13 | +from datetime import datetime |
| 14 | + |
| 15 | +from .models import db, User, Domain |
| 16 | +from .domain_utils import extract_domains, validate_domain, categorize_domain, add_custom_rule, remove_custom_rule, load_custom_rules |
| 17 | + |
| 18 | +def save_domains(domains, current_user): |
| 19 | + new_domains = 0 |
| 20 | + for domain in domains: |
| 21 | + if validate_domain(domain): |
| 22 | + category = categorize_domain(domain) |
| 23 | + try: |
| 24 | + new_domain = Domain(name=domain, category=category, user_id=current_user.id) |
| 25 | + db.session.add(new_domain) |
| 26 | + db.session.commit() |
| 27 | + new_domains += 1 |
| 28 | + except IntegrityError: |
| 29 | + db.session.rollback() |
| 30 | + existing_domain = Domain.query.filter_by(name=domain).first() |
| 31 | + if existing_domain: |
| 32 | + existing_domain.category = category |
| 33 | + db.session.commit() |
| 34 | + return new_domains |
| 35 | + |
| 36 | +def register_routes(app, mail, cache, limiter): |
| 37 | + @app.route('/', methods=['GET', 'POST']) |
| 38 | + @login_required |
| 39 | + def index(): |
| 40 | + if request.method == 'POST': |
| 41 | + text = request.form['text'] |
| 42 | + extracted_domains = extract_domains(text) |
| 43 | + new_domains = save_domains(extracted_domains, current_user) |
| 44 | + flash(f'Successfully extracted and saved {new_domains} new domains.') |
| 45 | + return redirect(url_for('index')) |
| 46 | + |
| 47 | + domains = Domain.query.filter_by(user_id=current_user.id).all() |
| 48 | + category_stats = Counter(domain.category for domain in domains) |
| 49 | + return render_template('index.html', domains=domains, category_stats=category_stats) |
| 50 | + |
| 51 | + @app.route('/search', methods=['GET']) |
| 52 | + @login_required |
| 53 | + @cache.cached(timeout=300, query_string=True) |
| 54 | + def search(): |
| 55 | + query = request.args.get('query', '') |
| 56 | + domains = Domain.query.filter(Domain.name.ilike(f'%{query}%')).all() |
| 57 | + return render_template('index.html', domains=domains, search_query=query) |
| 58 | + |
| 59 | + @app.route('/export', methods=['GET']) |
| 60 | + @login_required |
| 61 | + @cache.cached(timeout=300, query_string=True) |
| 62 | + def export(): |
| 63 | + format = request.args.get('format', 'csv') |
| 64 | + domains = Domain.query.filter_by(user_id=current_user.id).all() |
| 65 | + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| 66 | + |
| 67 | + if format == 'csv': |
| 68 | + output = io.StringIO() |
| 69 | + writer = csv.writer(output) |
| 70 | + writer.writerow(['Domain', 'Category', 'Hashtags']) |
| 71 | + writer.writerows([[domain.name, domain.category, domain.hashtags] for domain in domains]) |
| 72 | + return send_file( |
| 73 | + io.BytesIO(output.getvalue().encode('utf-8')), |
| 74 | + mimetype='text/csv', |
| 75 | + as_attachment=True, |
| 76 | + download_name=f'domains_export_{timestamp}.csv' |
| 77 | + ) |
| 78 | + elif format == 'json': |
| 79 | + data = [{'domain': domain.name, 'category': domain.category, 'hashtags': domain.hashtags} for domain in domains] |
| 80 | + return send_file( |
| 81 | + io.BytesIO(json.dumps(data, indent=2).encode('utf-8')), |
| 82 | + mimetype='application/json', |
| 83 | + as_attachment=True, |
| 84 | + download_name=f'domains_export_{timestamp}.json' |
| 85 | + ) |
| 86 | + elif format == 'excel': |
| 87 | + wb = openpyxl.Workbook() |
| 88 | + ws = wb.active |
| 89 | + ws.append(['Domain', 'Category', 'Hashtags']) |
| 90 | + for domain in domains: |
| 91 | + ws.append([domain.name, domain.category, domain.hashtags]) |
| 92 | + output = io.BytesIO() |
| 93 | + wb.save(output) |
| 94 | + output.seek(0) |
| 95 | + return send_file( |
| 96 | + output, |
| 97 | + mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', |
| 98 | + as_attachment=True, |
| 99 | + download_name=f'domains_export_{timestamp}.xlsx' |
| 100 | + ) |
| 101 | + |
| 102 | + @app.route('/api/domains', methods=['GET']) |
| 103 | + @login_required |
| 104 | + @limiter.limit("100 per day") |
| 105 | + @cache.cached(timeout=300) |
| 106 | + def api_domains(): |
| 107 | + domains = Domain.query.all() |
| 108 | + return jsonify([{'domain': domain.name, 'category': domain.category} for domain in domains]) |
| 109 | + |
| 110 | + @app.route('/login', methods=['GET', 'POST']) |
| 111 | + def login(): |
| 112 | + if request.method == 'POST': |
| 113 | + username = request.form['username'] |
| 114 | + password = request.form['password'] |
| 115 | + user = User.query.filter_by(username=username).first() |
| 116 | + if user and user.check_password(password): |
| 117 | + if user.two_factor_secret: |
| 118 | + session['user_id'] = user.id |
| 119 | + return redirect(url_for('two_factor_auth')) |
| 120 | + login_user(user) |
| 121 | + return redirect(url_for('index')) |
| 122 | + flash('Invalid username or password') |
| 123 | + return render_template('login.html') |
| 124 | + |
| 125 | + @app.route('/two_factor_auth', methods=['GET', 'POST']) |
| 126 | + def two_factor_auth(): |
| 127 | + if 'user_id' not in session: |
| 128 | + return redirect(url_for('login')) |
| 129 | + |
| 130 | + user = User.query.get(session['user_id']) |
| 131 | + if not user: |
| 132 | + return redirect(url_for('login')) |
| 133 | + |
| 134 | + if request.method == 'POST': |
| 135 | + token = request.form['token'] |
| 136 | + totp = pyotp.TOTP(user.two_factor_secret) |
| 137 | + if totp.verify(token): |
| 138 | + login_user(user) |
| 139 | + session.pop('user_id', None) |
| 140 | + return redirect(url_for('index')) |
| 141 | + flash('Invalid token') |
| 142 | + |
| 143 | + return render_template('two_factor_auth.html') |
| 144 | + |
| 145 | + @app.route('/enable_2fa', methods=['GET', 'POST']) |
| 146 | + @login_required |
| 147 | + def enable_2fa(): |
| 148 | + if request.method == 'POST': |
| 149 | + secret = pyotp.random_base32() |
| 150 | + totp = pyotp.TOTP(secret) |
| 151 | + token = request.form['token'] |
| 152 | + if totp.verify(token): |
| 153 | + current_user.set_two_factor_secret(secret) |
| 154 | + flash('Two-factor authentication enabled successfully') |
| 155 | + return redirect(url_for('index')) |
| 156 | + flash('Invalid token') |
| 157 | + |
| 158 | + secret = pyotp.random_base32() |
| 159 | + totp = pyotp.TOTP(secret) |
| 160 | + qr_code = totp.provisioning_uri(current_user.username, issuer_name="Domain Extractor") |
| 161 | + return render_template('enable_2fa.html', qr_code=qr_code, secret=secret) |
| 162 | + |
| 163 | + @app.route('/logout') |
| 164 | + @login_required |
| 165 | + def logout(): |
| 166 | + logout_user() |
| 167 | + return redirect(url_for('login')) |
| 168 | + |
| 169 | + @app.route('/bulk_import', methods=['GET', 'POST']) |
| 170 | + @login_required |
| 171 | + def bulk_import(): |
| 172 | + if request.method == 'POST': |
| 173 | + file = request.files['file'] |
| 174 | + if file and file.filename.endswith('.csv'): |
| 175 | + try: |
| 176 | + stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None) |
| 177 | + csv_reader = csv.reader(stream) |
| 178 | + domains = [row[0] for row in csv_reader if row] |
| 179 | + save_domains(domains, current_user) |
| 180 | + flash(f'Successfully imported {len(domains)} domains') |
| 181 | + |
| 182 | + msg = Message("Bulk Import Results", |
| 183 | + recipients=[current_user.email]) |
| 184 | + msg.body = f"Your bulk import has been completed. {len(domains)} domains were successfully imported." |
| 185 | + mail.send(msg) |
| 186 | + |
| 187 | + except Exception as e: |
| 188 | + logging.error(f'Error during bulk import: {str(e)}') |
| 189 | + flash('Error during import. Please check the file format.') |
| 190 | + else: |
| 191 | + flash('Please upload a CSV file') |
| 192 | + return render_template('bulk_import.html') |
| 193 | + |
| 194 | + @app.route('/admin') |
| 195 | + @login_required |
| 196 | + def admin(): |
| 197 | + if not current_user.is_admin: |
| 198 | + flash('You do not have permission to access this page.') |
| 199 | + return redirect(url_for('index')) |
| 200 | + return render_template('admin.html') |
| 201 | + |
| 202 | + @app.route('/statistics') |
| 203 | + @login_required |
| 204 | + @cache.cached(timeout=300) |
| 205 | + def statistics(): |
| 206 | + category_stats = Counter(domain.category for domain in Domain.query.all()) |
| 207 | + total_domains = Domain.query.count() |
| 208 | + return render_template('statistics.html', category_stats=category_stats, total_domains=total_domains) |
| 209 | + |
| 210 | + @app.route('/custom_rules', methods=['GET', 'POST']) |
| 211 | + @login_required |
| 212 | + def custom_rules(): |
| 213 | + if request.method == 'POST': |
| 214 | + rule = request.form['rule'] |
| 215 | + category = request.form['category'] |
| 216 | + add_custom_rule(rule, category) |
| 217 | + flash('Custom rule added successfully') |
| 218 | + |
| 219 | + rules = load_custom_rules() |
| 220 | + return render_template('custom_rules.html', rules=rules) |
| 221 | + |
| 222 | + @app.route('/remove_rule/<rule>') |
| 223 | + @login_required |
| 224 | + def remove_rule(rule): |
| 225 | + if remove_custom_rule(rule): |
| 226 | + flash('Custom rule removed successfully') |
| 227 | + else: |
| 228 | + flash('Rule not found') |
| 229 | + return redirect(url_for('custom_rules')) |
| 230 | + |
| 231 | + @app.route('/register', methods=['GET', 'POST']) |
| 232 | + def register(): |
| 233 | + if request.method == 'POST': |
| 234 | + username = request.form['username'] |
| 235 | + password = request.form['password'] |
| 236 | + email = request.form['email'] |
| 237 | + |
| 238 | + if User.query.filter_by(username=username).first(): |
| 239 | + flash('Username already exists. Please choose a different one.') |
| 240 | + return redirect(url_for('register')) |
| 241 | + |
| 242 | + new_user = User.create(username, password, email) |
| 243 | + if new_user: |
| 244 | + flash('Registration successful. Please log in.') |
| 245 | + return redirect(url_for('login')) |
| 246 | + else: |
| 247 | + flash('Registration failed. Please try again.') |
| 248 | + |
| 249 | + return render_template('register.html') |
| 250 | + |
| 251 | + @app.route('/domains', methods=['GET']) |
| 252 | + @login_required |
| 253 | + def list_domains(): |
| 254 | + search_query = request.args.get('search', '') |
| 255 | + category_filter = request.args.get('category', '') |
| 256 | + sort_by = request.args.get('sort', 'name') |
| 257 | + sort_order = request.args.get('order', 'asc') |
| 258 | + |
| 259 | + query = Domain.query.filter_by(user_id=current_user.id) |
| 260 | + if search_query: |
| 261 | + query = query.filter(Domain.name.ilike(f'%{search_query}%')) |
| 262 | + if category_filter: |
| 263 | + query = query.filter(Domain.category == category_filter) |
| 264 | + |
| 265 | + if sort_by == 'name': |
| 266 | + query = query.order_by(Domain.name.asc() if sort_order == 'asc' else Domain.name.desc()) |
| 267 | + elif sort_by == 'category': |
| 268 | + query = query.order_by(Domain.category.asc() if sort_order == 'asc' else Domain.category.desc()) |
| 269 | + |
| 270 | + domains = query.all() |
| 271 | + total_domains = len(domains) |
| 272 | + |
| 273 | + categories = Domain.query.with_entities(Domain.category).distinct().all() |
| 274 | + categories = [category[0] for category in categories] |
| 275 | + |
| 276 | + return render_template('list_domains.html', |
| 277 | + domains=domains, |
| 278 | + search_query=search_query, |
| 279 | + category_filter=category_filter, |
| 280 | + categories=categories, |
| 281 | + total_domains=total_domains, |
| 282 | + sort_by=sort_by, |
| 283 | + sort_order=sort_order) |
0 commit comments