forked from averygan/reclip
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
171 lines (138 loc) · 5.34 KB
/
Copy pathapp.py
File metadata and controls
171 lines (138 loc) · 5.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import os
import uuid
import glob
import json
import subprocess
import threading
from flask import Flask, request, jsonify, send_file, render_template
app = Flask(__name__)
DOWNLOAD_DIR = os.path.join(os.path.dirname(__file__), "downloads")
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
jobs = {}
def run_download(job_id, url, format_choice, format_id):
job = jobs[job_id]
out_template = os.path.join(DOWNLOAD_DIR, f"{job_id}.%(ext)s")
cmd = ["yt-dlp", "--no-playlist", "-o", out_template]
if format_choice == "audio":
cmd += ["-x", "--audio-format", "mp3"]
elif format_id:
cmd += ["-f", f"{format_id}+bestaudio/best", "--merge-output-format", "mp4"]
else:
cmd += ["-f", "bestvideo+bestaudio/best", "--merge-output-format", "mp4"]
cmd.append(url)
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode != 0:
job["status"] = "error"
job["error"] = result.stderr.strip().split("\n")[-1]
return
files = glob.glob(os.path.join(DOWNLOAD_DIR, f"{job_id}.*"))
if not files:
job["status"] = "error"
job["error"] = "Download completed but no file was found"
return
if format_choice == "audio":
target = [f for f in files if f.endswith(".mp3")]
chosen = target[0] if target else files[0]
else:
target = [f for f in files if f.endswith(".mp4")]
chosen = target[0] if target else files[0]
for f in files:
if f != chosen:
try:
os.remove(f)
except OSError:
pass
job["status"] = "done"
job["file"] = chosen
ext = os.path.splitext(chosen)[1]
title = job.get("title", "").strip()
# Sanitize title for filename
if title:
safe_title = "".join(c for c in title if c not in r'\/:*?"<>|').strip()[:20].strip()
job["filename"] = f"{safe_title}{ext}" if safe_title else os.path.basename(chosen)
else:
job["filename"] = os.path.basename(chosen)
except subprocess.TimeoutExpired:
job["status"] = "error"
job["error"] = "Download timed out (5 min limit)"
except Exception as e:
job["status"] = "error"
job["error"] = str(e)
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/info", methods=["POST"])
def get_info():
data = request.json
url = data.get("url", "").strip()
if not url:
return jsonify({"error": "No URL provided"}), 400
cmd = ["yt-dlp", "--no-playlist", "-j", url]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
return jsonify({"error": result.stderr.strip().split("\n")[-1]}), 400
info = json.loads(result.stdout)
# Build quality options — keep best format per resolution
best_by_height = {}
for f in info.get("formats", []):
height = f.get("height")
if height and f.get("vcodec", "none") != "none":
tbr = f.get("tbr") or 0
if height not in best_by_height or tbr > (best_by_height[height].get("tbr") or 0):
best_by_height[height] = f
formats = []
for height, f in best_by_height.items():
formats.append({
"id": f["format_id"],
"label": f"{height}p",
"height": height,
})
formats.sort(key=lambda x: x["height"], reverse=True)
return jsonify({
"title": info.get("title", ""),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration"),
"uploader": info.get("uploader", ""),
"formats": formats,
})
except subprocess.TimeoutExpired:
return jsonify({"error": "Timed out fetching video info"}), 400
except Exception as e:
return jsonify({"error": str(e)}), 400
@app.route("/api/download", methods=["POST"])
def start_download():
data = request.json
url = data.get("url", "").strip()
format_choice = data.get("format", "video")
format_id = data.get("format_id")
title = data.get("title", "")
if not url:
return jsonify({"error": "No URL provided"}), 400
job_id = uuid.uuid4().hex[:10]
jobs[job_id] = {"status": "downloading", "url": url, "title": title}
thread = threading.Thread(target=run_download, args=(job_id, url, format_choice, format_id))
thread.daemon = True
thread.start()
return jsonify({"job_id": job_id})
@app.route("/api/status/<job_id>")
def check_status(job_id):
job = jobs.get(job_id)
if not job:
return jsonify({"error": "Job not found"}), 404
return jsonify({
"status": job["status"],
"error": job.get("error"),
"filename": job.get("filename"),
})
@app.route("/api/file/<job_id>")
def download_file(job_id):
job = jobs.get(job_id)
if not job or job["status"] != "done":
return jsonify({"error": "File not ready"}), 404
return send_file(job["file"], as_attachment=True, download_name=job["filename"])
if __name__ == "__main__":
port = int(os.environ.get("PORT", 8899))
host = os.environ.get("HOST", "127.0.0.1")
app.run(host=host, port=port)