Skip to content
Closed
25 changes: 25 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
*.py text eol=lf
*.md text eol=lf
*.toml text eol=lf
*.yml text eol=lf
*.yaml text eol=lf
*.sh text eol=lf
*.txt text eol=lf
*.ini text eol=lf
*.mako text eol=lf
*.iml text eol=lf
*.js text eol=lf
*.ts text eol=lf
*.json text eol=lf
*.css text eol=lf
*.svelte text eol=lf

Makefile text eol=lf
Dockerfile text eol=lf

# Binary files
*.png binary
*.jpg binary
*.jpeg binary
*.gif binary
*.ico binary
4 changes: 2 additions & 2 deletions config.dev.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ name = "Documentary"
path = "/data/movies/documentary" # Change this to match your actual movies location

[database]
host = "db"
host = "postgres"
port = 5432
user = "MediaManager"
password = "MediaManager"
Expand Down Expand Up @@ -172,4 +172,4 @@ rule_names = ["prefer_h265", "avoid_cam", "reject_non_freeleech"]
tmdb_relay_url = "https://metadata-relay.dorninger.co/tmdb"

[metadata.tvdb]
tvdb_relay_url = "https://metadata-relay.dorninger.co/tvdb"
tvdb_relay_url = "https://metadata-relay.dorninger.co/tvdb"
2 changes: 1 addition & 1 deletion media_manager/indexer/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class IndexerQueryResult(BaseModel):
@computed_field
@property
def quality(self) -> Quality:
high_quality_pattern = r"\b(4k|2160p|uhd)\b"
high_quality_pattern = r"\b(4k|2160p|uhd|ultra[ ._-]?hd)\b"
medium_quality_pattern = r"\b(1080p|full[ ._-]?hd)\b"
low_quality_pattern = r"\b(720p|(?<!full[ ._-])hd(?![a-z]))\b"
very_low_quality_pattern = r"\b(480p|360p|sd)\b"
Expand Down
2 changes: 1 addition & 1 deletion media_manager/indexer/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from media_manager.indexer.repository import IndexerRepository
from media_manager.indexer.schemas import IndexerQueryResult, IndexerQueryResultId
from media_manager.movies.schemas import Movie
from media_manager.torrent.utils import remove_special_chars_and_parentheses
from media_manager.tv.schemas import Show
from media_manager.utils.file_handler import remove_special_chars_and_parentheses

log = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion media_manager/movies/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
)
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.schemas import Torrent
from media_manager.torrent.utils import get_importable_media_directories
from media_manager.utils.file_handler import get_importable_media_directories

router = APIRouter()

Expand Down
39 changes: 18 additions & 21 deletions media_manager/movies/service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import re
import shutil
from pathlib import Path
from typing import overload
Expand Down Expand Up @@ -29,16 +28,18 @@
from media_manager.notification.service import NotificationService
from media_manager.schemas import MediaImportSuggestion
from media_manager.torrent.schemas import (
Quality,
QualityStrings,
Torrent,
TorrentStatus,
)
from media_manager.torrent.service import TorrentService
from media_manager.torrent.utils import (
from media_manager.utils.file_handler import (
extract_external_id_from_string,
extract_quality_video_file,
get_files_for_import,
get_importable_media_directories,
import_file,
import_subtitle,
remove_special_characters,
remove_special_chars_and_parentheses,
)
Expand Down Expand Up @@ -212,9 +213,11 @@ def get_all_available_torrents_for_movie(
:return: A list of indexer query results.
"""
if search_query_override:
return self.indexer_service.search(query=search_query_override, is_tv=False)

torrents = self.indexer_service.search_movie(movie=movie)
torrents = self.indexer_service.search(
query=search_query_override, is_tv=False
)
else:
torrents = self.indexer_service.search_movie(movie=movie)

return evaluate_indexer_query_results(
is_tv=False, query_results=torrents, media=movie
Expand Down Expand Up @@ -466,21 +469,12 @@ def import_movie(
import_file(target_file=target_video_file, source_file=video_files[0])
success = True

target_subtitle_file = movie_root_path / f"{movie_file_name}"
# import subtitles
for subtitle_file in subtitle_files:
language_code_match = re.search(
r"[. ]([a-z]{2})\.srt$", subtitle_file.name, re.IGNORECASE
)
if not language_code_match:
log.warning(
f"Subtitle file {subtitle_file.name} does not match expected format, can't extract language code, skipping."
)
continue
language_code = language_code_match.group(1)
target_subtitle_file = (
movie_root_path / f"{movie_file_name}.{language_code}.srt"
import_subtitle(
subtitle_file=subtitle_file, target_file=target_subtitle_file
)
import_file(target_file=target_subtitle_file, source_file=subtitle_file)

return success

Expand Down Expand Up @@ -573,19 +567,22 @@ def import_existing_movie(self, movie: Movie, source_directory: Path) -> bool:
directory=new_source_path
)

file_quality = extract_quality_video_file(video_files[0])
file_suffix = f"{QualityStrings.get_label(file_quality)} - IMPORTED"

success = self.import_movie(
movie=movie,
video_files=video_files,
subtitle_files=subtitle_files,
file_path_suffix="IMPORTED",
file_path_suffix=file_suffix,
)
Comment on lines +570 to 578
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential IndexError if no video files found in source directory.

video_files[0] will raise an IndexError if get_files_for_import returns an empty list. Unlike import_torrent_files (line 488) which validates len(video_files) != 1, this method lacks a guard.

🛡️ Proposed fix to add validation
     video_files, subtitle_files, _all_files = get_files_for_import(
         directory=new_source_path
     )

+    if not video_files:
+        log.error(f"No video files found in {new_source_path}")
+        return False
+
     file_quality = extract_quality_video_file(video_files[0])
     file_suffix = f"{QualityStrings.get_label(file_quality)} - IMPORTED"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
file_quality = extract_quality_video_file(video_files[0])
file_suffix = f"{QualityStrings.get_label(file_quality)} - IMPORTED"
success = self.import_movie(
movie=movie,
video_files=video_files,
subtitle_files=subtitle_files,
file_path_suffix="IMPORTED",
file_path_suffix=file_suffix,
)
video_files, subtitle_files, _all_files = get_files_for_import(
directory=new_source_path
)
if not video_files:
log.error(f"No video files found in {new_source_path}")
return False
file_quality = extract_quality_video_file(video_files[0])
file_suffix = f"{QualityStrings.get_label(file_quality)} - IMPORTED"
success = self.import_movie(
movie=movie,
video_files=video_files,
subtitle_files=subtitle_files,
file_path_suffix=file_suffix,
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@media_manager/movies/service.py` around lines 568 - 576, The code calls
extract_quality_video_file(video_files[0]) without checking that video_files is
non-empty, which can raise IndexError; update the surrounding logic in the
method to validate video_files (from get_files_for_import) before using
video_files[0], e.g., check len(video_files) > 0 (or handle len(video_files) !=
1 if that is the intended invariant), and if empty log or raise a clear
exception or return early instead of proceeding to call
extract_quality_video_file and import_movie; make the check near where
file_quality is computed and ensure import_movie is only invoked when a valid
video file list exists.

if success:
self.movie_repository.add_movie_file(
MovieFile(
movie_id=movie.id,
file_path_suffix="IMPORTED",
file_path_suffix=file_suffix,
torrent_id=None,
quality=Quality.unknown,
quality=file_quality,
)
)

Expand Down
4 changes: 4 additions & 0 deletions media_manager/torrent/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class QualityStrings(Enum):
sd = "400p"
unknown = "unknown"

@staticmethod
def get_label(quality: Quality) -> str:
return QualityStrings[quality.name].value
Comment on lines +25 to +27
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

get_label() now surfaces the wrong SD label.

Line 27 returns QualityStrings.sd.value, but that enum member is still "400p". Both quality parsers map 480p to Quality.sd, so this helper will label SD imports inconsistently.

Proposed fix
 class QualityStrings(Enum):
     uhd = "4K"
     fullhd = "1080p"
     hd = "720p"
-    sd = "400p"
+    sd = "480p"
     unknown = "unknown"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@media_manager/torrent/schemas.py` around lines 25 - 27, get_label currently
returns QualityStrings[quality.name].value but QualityStrings.sd still has value
"400p", causing SD to be labeled incorrectly; fix by updating the QualityStrings
enum so the sd member value is "480p" (i.e., change QualityStrings.sd from
"400p" to "480p") or, alternatively, modify get_label in the get_label method to
special-case Quality.sd and return the correct "480p" label from the appropriate
QualityStrings member; reference get_label, Quality, and QualityStrings.sd when
making the change.



class TorrentStatus(Enum):
finished = 1
Expand Down
3 changes: 1 addition & 2 deletions media_manager/torrent/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def get_torrent_status(self, torrent: Torrent) -> Torrent:
self.torrent_repository.save_torrent(torrent=torrent)
return torrent

def cancel_download(self, torrent: Torrent, delete_files: bool = False) -> Torrent:
def cancel_download(self, torrent: Torrent, delete_files: bool = False) -> None:
"""
cancels download of a torrent

Expand All @@ -65,7 +65,6 @@ def cancel_download(self, torrent: Torrent, delete_files: bool = False) -> Torre
"""
log.info(f"Cancelling download for torrent: {torrent.title}")
self.download_manager.remove_torrent(torrent, delete_data=delete_files)
return self.get_torrent_status(torrent=torrent)

def pause_download(self, torrent: Torrent) -> Torrent:
"""
Expand Down
181 changes: 0 additions & 181 deletions media_manager/torrent/utils.py
Original file line number Diff line number Diff line change
@@ -1,130 +1,19 @@
import hashlib
import logging
import mimetypes
import re
import shutil
from pathlib import Path, UnsupportedOperation

import bencoder
import libtorrent
import patoolib
import requests
from pathvalidate import sanitize_filename
from requests.exceptions import InvalidSchema

from media_manager.config import MediaManagerConfig
from media_manager.indexer.schemas import IndexerQueryResult
from media_manager.indexer.utils import follow_redirects_to_final_torrent_url
from media_manager.torrent.schemas import Torrent

log = logging.getLogger(__name__)


def list_files_recursively(path: Path = Path()) -> list[Path]:
files = list(path.glob("**/*"))
log.debug(f"Found {len(files)} entries via glob")
valid_files = []
for x in files:
if x.is_dir():
log.debug(f"'{x}' is a directory")
elif x.is_symlink():
log.debug(f"'{x}' is a symlink")
else:
valid_files.append(x)
log.debug(f"Returning {len(valid_files)} files after filtering")
return valid_files


def extract_archives(files: list) -> None:
archive_types = {
"application/zip",
"application/x-zip-compressedapplication/x-compressed",
"application/vnd.rar",
"application/x-7z-compressed",
"application/x-freearc",
"application/x-bzip",
"application/x-bzip2",
"application/gzip",
"application/x-gzip",
"application/x-tar",
}
for file in files:
file_type = mimetypes.guess_type(file)
log.debug(f"File: {file}, Size: {file.stat().st_size} bytes, Type: {file_type}")

if file_type[0] in archive_types:
log.info(
f"File {file} is a compressed file, extracting it into directory {file.parent}"
)
try:
patoolib.extract_archive(str(file), outdir=str(file.parent))
except patoolib.util.PatoolError:
log.exception(f"Failed to extract archive {file}")


def get_torrent_filepath(torrent: Torrent) -> Path:
return MediaManagerConfig().misc.torrent_directory / torrent.title


def import_file(target_file: Path, source_file: Path) -> None:
if target_file.exists():
target_file.unlink()

try:
target_file.hardlink_to(source_file)
except FileExistsError:
log.exception(f"File already exists at {target_file}.")
except (OSError, UnsupportedOperation, NotImplementedError):
log.exception(
f"Failed to create hardlink from {source_file} to {target_file}. Falling back to copying the file."
)
shutil.copy(src=source_file, dst=target_file)


def get_files_for_import(
torrent: Torrent | None = None, directory: Path | None = None
) -> tuple[list[Path], list[Path], list[Path]]:
"""
Extracts all files from the torrent download directory, including extracting archives.
Returns a tuple containing: seperated video files, subtitle files, and all files found in the torrent directory.
"""
if torrent:
log.info(f"Importing torrent {torrent}")
search_directory = get_torrent_filepath(torrent=torrent)
elif directory:
log.info(f"Importing files from directory {directory}")
search_directory = directory
else:
msg = "Either torrent or directory must be provided."
raise ValueError(msg)

all_files: list[Path] = list_files_recursively(path=search_directory)
log.debug(f"Found {len(all_files)} files downloaded by the torrent")
extract_archives(all_files)
all_files = list_files_recursively(path=search_directory)

video_files: list[Path] = []
subtitle_files: list[Path] = []
for file in all_files:
file_type, _ = mimetypes.guess_type(str(file))
if file_type is not None:
if file_type.startswith("video"):
video_files.append(file)
log.debug(f"File is a video, it will be imported: {file}")
elif file_type.startswith("text") and Path(file).suffix == ".srt":
subtitle_files.append(file)
log.debug(f"File is a subtitle, it will be imported: {file}")
else:
log.debug(
f"File is neither a video nor a subtitle, will not be imported: {file}"
)

log.info(
f"Found {len(all_files)} files ({len(video_files)} video files, {len(subtitle_files)} subtitle files) for further processing."
)
return video_files, subtitle_files, all_files


def get_torrent_hash(torrent: IndexerQueryResult) -> str:
"""
Helper method to get the torrent hash from the torrent object.
Expand Down Expand Up @@ -179,73 +68,3 @@ def get_torrent_hash(torrent: IndexerQueryResult) -> str:
return torrent_hash


def remove_special_characters(filename: str) -> str:
"""
Removes special characters from the filename to ensure it works with Jellyfin.

:param filename: The original filename.
:return: A sanitized version of the filename.
"""
# Remove invalid characters
sanitized = re.sub(r"([<>:\"/\\|?*])", "", filename)

# Remove leading and trailing dots or spaces
return sanitized.strip(" .")


def remove_special_chars_and_parentheses(title: str) -> str:
"""
Removes special characters and bracketed information from the title.

:param title: The original title.
:return: A sanitized version of the title.
"""

# Remove content within brackets
sanitized = re.sub(r"\[.*?\]", "", title)

# Remove content within curly brackets
sanitized = re.sub(r"\{.*?\}", "", sanitized)

# Remove year within parentheses
sanitized = re.sub(r"\(\d{4}\)", "", sanitized)

# Remove special characters
sanitized = remove_special_characters(sanitized)

# Collapse multiple whitespace characters and trim the result
return re.sub(r"\s+", " ", sanitized).strip()


def get_importable_media_directories(path: Path) -> list[Path]:
libraries = [
*MediaManagerConfig().misc.movie_libraries,
*MediaManagerConfig().misc.tv_libraries,
]

library_paths = {Path(library.path).absolute() for library in libraries}

unfiltered_dirs = [d for d in path.glob("*") if d.is_dir()]

return [
media_dir
for media_dir in unfiltered_dirs
if media_dir.absolute() not in library_paths
and not media_dir.name.startswith(".")
]


def extract_external_id_from_string(input_string: str) -> tuple[str | None, int | None]:
"""
Extracts an external ID (tmdb/tvdb ID) from the given string.

:param input_string: The string to extract the ID from.
:return: The extracted Metadata Provider and ID or None if not found.
"""
match = re.search(
r"\b(tmdb|tvdb)(?:id)?[-_]?([0-9]+)\b", input_string, re.IGNORECASE
)
if match:
return match.group(1).lower(), int(match.group(2))

return None, None
Loading
Loading