Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions app/server/fireshare/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,17 @@ def handle_cancel(signum, frame):
# Write initial transcoding status with our PID so the API can track us
util.write_transcoding_status(paths['data'], 0, total_jobs, pid=os.getpid())

# Remove any leftover *.mp4.tmp files from a previous run that crashed
# before the temp file could be renamed to its final location.
derived_root = Path(processed_root, "derived")
if derived_root.exists():
for tmp_file in derived_root.glob('**/*.tmp.mp4'):
try:
tmp_file.unlink()
logger.info(f"Removed stale temp transcode file: {tmp_file}")
except OSError as ex:
logger.warning(f"Could not remove stale temp file {tmp_file}: {ex}")

# Track corrupt videos to skip remaining heights for that video
corrupt_video_ids = set()

Expand Down
95 changes: 69 additions & 26 deletions app/server/fireshare/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import glob
import shutil
import re
import threading
from datetime import datetime

# Corruption indicators to detect during video validation
Expand Down Expand Up @@ -606,6 +607,8 @@ def run_ffmpeg_with_progress(cmd, total_duration, timeout_seconds=None, data_pat

If data_path is provided, reads the existing status file and updates it with
percent/speed. Progress is throttled to every 0.5 seconds to avoid I/O overhead.
stderr is drained in a background thread to prevent pipe buffer deadlock and
is logged at warning level if the process exits with a non-zero code.
"""
# Insert -progress pipe:1 before output file (last arg)
cmd_with_progress = cmd[:-1] + ['-progress', 'pipe:1'] + [cmd[-1]]
Expand All @@ -616,6 +619,15 @@ def run_ffmpeg_with_progress(cmd, total_duration, timeout_seconds=None, data_pat
percent = None
current_seconds = 0

# Drain stderr in a background thread to prevent pipe buffer from filling
# and blocking ffmpeg. Collected lines are logged on failure.
stderr_lines = []
def _drain_stderr():
for line in process.stderr:
stderr_lines.append(line.rstrip())
stderr_thread = threading.Thread(target=_drain_stderr, daemon=True)
stderr_thread.start()

# -progress outputs clean key=value lines:
# out_time_us=83450000
# speed=1.5x
Expand Down Expand Up @@ -671,6 +683,13 @@ def run_ffmpeg_with_progress(cmd, total_duration, timeout_seconds=None, data_pat
process.kill()
process.wait()
raise
finally:
stderr_thread.join(timeout=5)

if process.returncode != 0 and stderr_lines:
logger.warning(f"FFmpeg exited with code {process.returncode}. stderr output:")
for line in stderr_lines[-100:]:
logger.warning(f" ffmpeg: {line}")

return process

Expand Down Expand Up @@ -737,23 +756,36 @@ def transcode_video_quality(video_path, out_path, height, use_gpu=False, timeout

# Determine output container based on codec
out_path_str = str(out_path)


# Write to a temp path during transcoding; only rename to the final path on
# success. This ensures a partially-written file from a crashed ffmpeg process
# is never picked up and served as a valid transcode output.
tmp_path = out_path.parent / (out_path.stem + '.tmp.mp4')
tmp_path_str = str(tmp_path)
if tmp_path.exists():
try:
tmp_path.unlink()
logger.debug(f"Cleaned up leftover temp file: {tmp_path_str}")
except OSError as ex:
logger.debug(f"Could not remove leftover temp file: {ex}")

mode = 'gpu' if use_gpu else 'cpu'

# Use cached encoder if available to avoid redundant encoder detection during bulk transcoding.
if _working_encoder_cache[mode] is not None:
encoder = _working_encoder_cache[mode]
logger.debug(f"Using cached {mode.upper()} encoder: {encoder['name']}")

# Build ffmpeg command using the cached encoder
logger.info(f"Transcoding video to {height}p using {encoder['name']}")
cmd = _build_transcode_command(video_path, out_path, height, encoder)
cmd = _build_transcode_command(video_path, tmp_path, height, encoder)

logger.debug(f"$: {' '.join(cmd)}")

try:
result = run_ffmpeg_with_progress(cmd, total_duration, timeout_seconds, data_path)
if result.returncode == 0:
tmp_path.rename(out_path)
e = time.time()
logger.info(f'Transcoded {str(out_path)} to {height}p in {e-s:.2f}s')
return (True, None)
Expand All @@ -762,22 +794,32 @@ def transcode_video_quality(video_path, out_path, height, use_gpu=False, timeout
logger.warning(f"Cached encoder {encoder['name']} failed with exit code {result.returncode}")
logger.info("Clearing encoder cache and retrying with all available encoders...")
_working_encoder_cache[mode] = None
if tmp_path.exists():
try:
tmp_path.unlink()
except OSError as cleanup_ex:
logger.debug(f"Could not clean up temp file: {cleanup_ex}")
except sp.TimeoutExpired:
logger.warning(f"Cached encoder {encoder['name']} timed out after {timeout_seconds} seconds")
logger.info("Clearing encoder cache and retrying with all available encoders...")
_working_encoder_cache[mode] = None
# Clean up the process and any partial output
if os.path.exists(out_path_str):
if tmp_path.exists():
try:
os.remove(out_path_str)
logger.debug(f"Cleaned up timed out output file: {out_path_str}")
tmp_path.unlink()
logger.debug(f"Cleaned up timed out temp file: {tmp_path_str}")
except OSError as cleanup_ex:
logger.debug(f"Could not clean up timed out output: {cleanup_ex}")
logger.debug(f"Could not clean up timed out temp file: {cleanup_ex}")
except Exception as ex:
# Cached encoder failed - clear cache and fall through to try all encoders
logger.warning(f"Cached encoder {encoder['name']} failed: {ex}")
logger.info("Clearing encoder cache and retrying with all available encoders...")
_working_encoder_cache[mode] = None
if tmp_path.exists():
try:
tmp_path.unlink()
except OSError as cleanup_ex:
logger.debug(f"Could not clean up temp file: {cleanup_ex}")

# No cached encoder - need to detect a working encoder
# Check if GPU is requested but NVENC is not available in ffmpeg
Expand Down Expand Up @@ -868,20 +910,21 @@ def transcode_video_quality(video_path, out_path, height, use_gpu=False, timeout
# as fallback, so CPU transcoding will be attempted automatically if GPU fails
logger.info(f"Detecting working {mode.upper()} encoder by attempting transcode...")
encoders = _get_encoder_candidates(use_gpu, encoder_preference)

last_exception = None
for encoder in encoders:
logger.info(f"Trying {encoder['name']}...")

# Build ffmpeg command
cmd = _build_transcode_command(video_path, out_path, height, encoder)
# Build ffmpeg command targeting the temp path
cmd = _build_transcode_command(video_path, tmp_path, height, encoder)

logger.debug(f"$: {' '.join(cmd)}")

try:
result = run_ffmpeg_with_progress(cmd, total_duration, timeout_seconds, data_path)
if result.returncode == 0:
# Success! Cache this encoder and return
# Success! Move temp file to final location, cache encoder, and return.
tmp_path.rename(out_path)
logger.info(f"✓ {encoder['name']} works! Using it for all transcodes this session.")
_working_encoder_cache[mode] = encoder
e = time.time()
Expand All @@ -890,33 +933,33 @@ def transcode_video_quality(video_path, out_path, height, use_gpu=False, timeout
else:
logger.warning(f"✗ {encoder['name']} failed with exit code {result.returncode}")
last_exception = Exception(f"Transcode failed with exit code {result.returncode}")
# Clean up failed output file before trying next encoder
if os.path.exists(out_path_str):
# Clean up temp file before trying next encoder
if tmp_path.exists():
try:
os.remove(out_path_str)
logger.debug(f"Cleaned up failed output file: {out_path_str}")
tmp_path.unlink()
logger.debug(f"Cleaned up failed temp file: {tmp_path_str}")
except OSError as cleanup_ex:
logger.debug(f"Could not clean up failed output: {cleanup_ex}")
logger.debug(f"Could not clean up failed temp file: {cleanup_ex}")
except sp.TimeoutExpired:
logger.warning(f"✗ {encoder['name']} timed out after {timeout_seconds} seconds")
last_exception = Exception(f"Transcode timed out after {timeout_seconds} seconds")
# Clean up failed output file before trying next encoder
if os.path.exists(out_path_str):
# Clean up temp file before trying next encoder
if tmp_path.exists():
try:
os.remove(out_path_str)
logger.debug(f"Cleaned up timed out output file: {out_path_str}")
tmp_path.unlink()
logger.debug(f"Cleaned up timed out temp file: {tmp_path_str}")
except OSError as cleanup_ex:
logger.debug(f"Could not clean up timed out output: {cleanup_ex}")
logger.debug(f"Could not clean up timed out temp file: {cleanup_ex}")
except Exception as ex:
logger.warning(f"✗ {encoder['name']} failed: {ex}")
last_exception = ex
# Clean up failed output file before trying next encoder
if os.path.exists(out_path_str):
# Clean up temp file before trying next encoder
if tmp_path.exists():
try:
os.remove(out_path_str)
logger.debug(f"Cleaned up failed output file: {out_path_str}")
tmp_path.unlink()
logger.debug(f"Cleaned up failed temp file: {tmp_path_str}")
except OSError as cleanup_ex:
logger.debug(f"Could not clean up failed output: {cleanup_ex}")
logger.debug(f"Could not clean up failed temp file: {cleanup_ex}")

# If we get here, no encoder worked
error_msg = f"No working {mode.upper()} encoder found for video. Tried: {', '.join([e['name'] for e in encoders])}"
Expand Down
Loading