diff --git a/shelfmark/download/clients/torrent_utils.py b/shelfmark/download/clients/torrent_utils.py index 034afaf5..3478556c 100644 --- a/shelfmark/download/clients/torrent_utils.py +++ b/shelfmark/download/clients/torrent_utils.py @@ -7,12 +7,13 @@ import re from binascii import Error as BinasciiError from dataclasses import dataclass -from urllib.parse import parse_qs, urljoin, urlparse +from urllib.parse import ParseResult, parse_qs, urljoin, urlparse import requests from shelfmark.core.config import config from shelfmark.core.logger import setup_logger +from shelfmark.core.utils import normalize_http_url from shelfmark.download.network import get_ssl_verify logger = setup_logger(__name__) @@ -32,6 +33,7 @@ ValueError, ) _TORRENT_PARSE_ERRORS = (IndexError, KeyError, TypeError, ValueError) +_TRUSTED_TORRENT_FETCH_URL_CONFIG_KEYS = ("PROWLARR_URL", "NEWZNAB_URL") type BencodeValue = dict[str | bytes, BencodeValue] | list[BencodeValue] | int | bytes | str @@ -93,6 +95,9 @@ def extract_torrent_info( # Not a magnet - try to fetch and parse the .torrent file if not fetch_torrent: return TorrentInfo(info_hash=expected_hash, torrent_data=None, is_magnet=False) + if not _is_trusted_torrent_fetch_url(url): + logger.debug("Skipping torrent prefetch for untrusted URL: %s...", url[:80]) + return TorrentInfo(info_hash=expected_hash, torrent_data=None, is_magnet=False) headers: dict[str, str] = {"Accept": "application/x-bittorrent"} # TODO(shelfmark): Move this source-specific Prowlarr auth handling into a source hook. @@ -133,6 +138,12 @@ def resolve_url(current: str, location: str) -> str: is_magnet=True, magnet_url=redirect_url, ) + if not _is_trusted_torrent_fetch_url(redirect_url): + logger.debug( + "Skipping torrent prefetch redirect to untrusted URL: %s...", + redirect_url[:80], + ) + return TorrentInfo(info_hash=expected_hash, torrent_data=None, is_magnet=False) # Not a magnet redirect, follow it manually logger.debug("Following redirect to: %s...", redirect_url[:80]) resp = requests.get( @@ -172,6 +183,36 @@ def resolve_url(current: str, location: str) -> str: return TorrentInfo(info_hash=expected_hash, torrent_data=None, is_magnet=False) +def _is_trusted_torrent_fetch_url(url: str) -> bool: + parsed = urlparse(url) + origin = _url_origin(parsed) + if origin is None: + return False + + for key in _TRUSTED_TORRENT_FETCH_URL_CONFIG_KEYS: + configured_url = str(config.get(key, "") or "").strip() + if not configured_url: + continue + configured_origin = _url_origin(urlparse(normalize_http_url(configured_url))) + if configured_origin == origin: + return True + + return False + + +def _url_origin(parsed_url: ParseResult) -> tuple[str, str, int] | None: + scheme = parsed_url.scheme.lower() + if scheme not in {"http", "https"}: + return None + + hostname = parsed_url.hostname + if not hostname: + return None + + default_port = 443 if scheme == "https" else 80 + return (scheme, hostname.lower(), parsed_url.port or default_port) + + def parse_transmission_url(url: str) -> tuple[str, str, int, str]: """Parse Transmission URL into (protocol, host, port, path).""" parsed = urlparse(url) diff --git a/tests/prowlarr/test_torrent_utils.py b/tests/prowlarr/test_torrent_utils.py index 2d330fbb..42dea44c 100644 --- a/tests/prowlarr/test_torrent_utils.py +++ b/tests/prowlarr/test_torrent_utils.py @@ -10,6 +10,7 @@ import base64 import hashlib +from unittest.mock import MagicMock import pytest @@ -18,6 +19,7 @@ bencode_encode, extract_hash_from_magnet, extract_info_hash_from_torrent, + extract_torrent_info, parse_transmission_url, ) @@ -356,6 +358,121 @@ def test_extract_hash_v2_without_pieces(self): assert extract_info_hash_from_torrent(torrent_bytes) == expected +class TestExtractTorrentInfo: + """Tests for extracting torrent info from user-supplied URLs.""" + + def test_does_not_fetch_untrusted_http_torrent_url(self, monkeypatch): + """Arbitrary HTTP torrent URLs are passed through without backend prefetch.""" + expected_hash = "3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0" + monkeypatch.setattr( + "shelfmark.download.clients.torrent_utils.config.get", + lambda key, default="": "", + ) + mock_get = MagicMock() + monkeypatch.setattr("shelfmark.download.clients.torrent_utils.requests.get", mock_get) + + result = extract_torrent_info( + "https://attacker.example/book.torrent", + fetch_torrent=True, + expected_hash=expected_hash, + ) + + assert result.info_hash == expected_hash + assert result.torrent_data is None + assert result.is_magnet is False + mock_get.assert_not_called() + + def test_fetches_configured_prowlarr_torrent_url(self, monkeypatch): + """Configured Prowlarr download URLs can still be prefetched and parsed.""" + info_dict = { + b"name": b"trusted.txt", + b"length": 100, + b"piece length": 16384, + b"pieces": b"\x00" * 20, + } + torrent_data = bencode_encode({b"info": info_dict}) + expected_hash = hashlib.sha1(bencode_encode(info_dict)).hexdigest().lower() + + config_values = { + "PROWLARR_URL": "https://prowlarr.example", + "PROWLARR_API_KEY": "secret", + } + monkeypatch.setattr( + "shelfmark.download.clients.torrent_utils.config.get", + lambda key, default="": config_values.get(key, default), + ) + response = MagicMock(status_code=200, content=torrent_data) + response.raise_for_status = MagicMock() + mock_get = MagicMock(return_value=response) + monkeypatch.setattr("shelfmark.download.clients.torrent_utils.requests.get", mock_get) + + result = extract_torrent_info( + "https://prowlarr.example/1/download?apikey=secret&indexer=7", + fetch_torrent=True, + ) + + assert result.info_hash == expected_hash + assert result.torrent_data == torrent_data + assert result.is_magnet is False + mock_get.assert_called_once() + + def test_normalizes_configured_origin_before_trusting_torrent_url(self, monkeypatch): + """Configured Prowlarr URLs match the same normalization used by the source.""" + info_dict = { + b"name": b"trusted.txt", + b"length": 100, + b"piece length": 16384, + b"pieces": b"\x00" * 20, + } + torrent_data = bencode_encode({b"info": info_dict}) + expected_hash = hashlib.sha1(bencode_encode(info_dict)).hexdigest().lower() + + config_values = { + "PROWLARR_URL": "prowlarr.example:9696/", + "PROWLARR_API_KEY": "secret", + } + monkeypatch.setattr( + "shelfmark.download.clients.torrent_utils.config.get", + lambda key, default="": config_values.get(key, default), + ) + response = MagicMock(status_code=200, content=torrent_data) + response.raise_for_status = MagicMock() + mock_get = MagicMock(return_value=response) + monkeypatch.setattr("shelfmark.download.clients.torrent_utils.requests.get", mock_get) + + result = extract_torrent_info( + "http://prowlarr.example:9696/1/download?apikey=secret&indexer=7", + fetch_torrent=True, + ) + + assert result.info_hash == expected_hash + assert result.torrent_data == torrent_data + mock_get.assert_called_once() + + def test_does_not_follow_trusted_torrent_url_redirect_to_untrusted_host(self, monkeypatch): + """Trusted HTTP prefetch does not continue through arbitrary redirects.""" + expected_hash = "3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0" + monkeypatch.setattr( + "shelfmark.download.clients.torrent_utils.config.get", + lambda key, default="": "https://prowlarr.example" if key == "PROWLARR_URL" else "", + ) + response = MagicMock(status_code=302) + response.headers = {"Location": "https://attacker.example/book.torrent"} + mock_get = MagicMock(return_value=response) + monkeypatch.setattr("shelfmark.download.clients.torrent_utils.requests.get", mock_get) + + result = extract_torrent_info( + "https://prowlarr.example/1/download?apikey=secret&indexer=7", + fetch_torrent=True, + expected_hash=expected_hash, + ) + + assert result.info_hash == expected_hash + assert result.torrent_data is None + assert result.is_magnet is False + mock_get.assert_called_once() + + class TestExtractHashFromMagnet: """Tests for extracting hash from magnet links."""