Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion shelfmark/download/clients/torrent_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
import re
from binascii import Error as BinasciiError
from dataclasses import dataclass
from urllib.parse import parse_qs, urljoin, urlparse
from urllib.parse import ParseResult, parse_qs, urljoin, urlparse

import requests

from shelfmark.core.config import config
from shelfmark.core.logger import setup_logger
from shelfmark.core.utils import normalize_http_url
from shelfmark.download.network import get_ssl_verify

logger = setup_logger(__name__)
Expand All @@ -32,6 +33,7 @@
ValueError,
)
_TORRENT_PARSE_ERRORS = (IndexError, KeyError, TypeError, ValueError)
_TRUSTED_TORRENT_FETCH_URL_CONFIG_KEYS = ("PROWLARR_URL", "NEWZNAB_URL")

type BencodeValue = dict[str | bytes, BencodeValue] | list[BencodeValue] | int | bytes | str

Expand Down Expand Up @@ -93,6 +95,9 @@ def extract_torrent_info(
# Not a magnet - try to fetch and parse the .torrent file
if not fetch_torrent:
return TorrentInfo(info_hash=expected_hash, torrent_data=None, is_magnet=False)
if not _is_trusted_torrent_fetch_url(url):
logger.debug("Skipping torrent prefetch for untrusted URL: %s...", url[:80])
return TorrentInfo(info_hash=expected_hash, torrent_data=None, is_magnet=False)

headers: dict[str, str] = {"Accept": "application/x-bittorrent"}
# TODO(shelfmark): Move this source-specific Prowlarr auth handling into a source hook.
Expand Down Expand Up @@ -133,6 +138,12 @@ def resolve_url(current: str, location: str) -> str:
is_magnet=True,
magnet_url=redirect_url,
)
if not _is_trusted_torrent_fetch_url(redirect_url):
logger.debug(
"Skipping torrent prefetch redirect to untrusted URL: %s...",
redirect_url[:80],
)
return TorrentInfo(info_hash=expected_hash, torrent_data=None, is_magnet=False)
# Not a magnet redirect, follow it manually
logger.debug("Following redirect to: %s...", redirect_url[:80])
resp = requests.get(
Expand Down Expand Up @@ -172,6 +183,36 @@ def resolve_url(current: str, location: str) -> str:
return TorrentInfo(info_hash=expected_hash, torrent_data=None, is_magnet=False)


def _is_trusted_torrent_fetch_url(url: str) -> bool:
parsed = urlparse(url)
origin = _url_origin(parsed)
if origin is None:
return False

for key in _TRUSTED_TORRENT_FETCH_URL_CONFIG_KEYS:
configured_url = str(config.get(key, "") or "").strip()
if not configured_url:
continue
configured_origin = _url_origin(urlparse(normalize_http_url(configured_url)))
if configured_origin == origin:
return True

return False


def _url_origin(parsed_url: ParseResult) -> tuple[str, str, int] | None:
scheme = parsed_url.scheme.lower()
if scheme not in {"http", "https"}:
return None

hostname = parsed_url.hostname
if not hostname:
return None

default_port = 443 if scheme == "https" else 80
return (scheme, hostname.lower(), parsed_url.port or default_port)


def parse_transmission_url(url: str) -> tuple[str, str, int, str]:
"""Parse Transmission URL into (protocol, host, port, path)."""
parsed = urlparse(url)
Expand Down
117 changes: 117 additions & 0 deletions tests/prowlarr/test_torrent_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import base64
import hashlib
from unittest.mock import MagicMock

import pytest

Expand All @@ -18,6 +19,7 @@
bencode_encode,
extract_hash_from_magnet,
extract_info_hash_from_torrent,
extract_torrent_info,
parse_transmission_url,
)

Expand Down Expand Up @@ -356,6 +358,121 @@ def test_extract_hash_v2_without_pieces(self):
assert extract_info_hash_from_torrent(torrent_bytes) == expected


class TestExtractTorrentInfo:
"""Tests for extracting torrent info from user-supplied URLs."""

def test_does_not_fetch_untrusted_http_torrent_url(self, monkeypatch):
"""Arbitrary HTTP torrent URLs are passed through without backend prefetch."""
expected_hash = "3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0"
monkeypatch.setattr(
"shelfmark.download.clients.torrent_utils.config.get",
lambda key, default="": "",
)
mock_get = MagicMock()
monkeypatch.setattr("shelfmark.download.clients.torrent_utils.requests.get", mock_get)

result = extract_torrent_info(
"https://attacker.example/book.torrent",
fetch_torrent=True,
expected_hash=expected_hash,
)

assert result.info_hash == expected_hash
assert result.torrent_data is None
assert result.is_magnet is False
mock_get.assert_not_called()

def test_fetches_configured_prowlarr_torrent_url(self, monkeypatch):
"""Configured Prowlarr download URLs can still be prefetched and parsed."""
info_dict = {
b"name": b"trusted.txt",
b"length": 100,
b"piece length": 16384,
b"pieces": b"\x00" * 20,
}
torrent_data = bencode_encode({b"info": info_dict})
expected_hash = hashlib.sha1(bencode_encode(info_dict)).hexdigest().lower()

config_values = {
"PROWLARR_URL": "https://prowlarr.example",
"PROWLARR_API_KEY": "secret",
}
monkeypatch.setattr(
"shelfmark.download.clients.torrent_utils.config.get",
lambda key, default="": config_values.get(key, default),
)
response = MagicMock(status_code=200, content=torrent_data)
response.raise_for_status = MagicMock()
mock_get = MagicMock(return_value=response)
monkeypatch.setattr("shelfmark.download.clients.torrent_utils.requests.get", mock_get)

result = extract_torrent_info(
"https://prowlarr.example/1/download?apikey=secret&indexer=7",
fetch_torrent=True,
)

assert result.info_hash == expected_hash
assert result.torrent_data == torrent_data
assert result.is_magnet is False
mock_get.assert_called_once()

def test_normalizes_configured_origin_before_trusting_torrent_url(self, monkeypatch):
"""Configured Prowlarr URLs match the same normalization used by the source."""
info_dict = {
b"name": b"trusted.txt",
b"length": 100,
b"piece length": 16384,
b"pieces": b"\x00" * 20,
}
torrent_data = bencode_encode({b"info": info_dict})
expected_hash = hashlib.sha1(bencode_encode(info_dict)).hexdigest().lower()

config_values = {
"PROWLARR_URL": "prowlarr.example:9696/",
"PROWLARR_API_KEY": "secret",
}
monkeypatch.setattr(
"shelfmark.download.clients.torrent_utils.config.get",
lambda key, default="": config_values.get(key, default),
)
response = MagicMock(status_code=200, content=torrent_data)
response.raise_for_status = MagicMock()
mock_get = MagicMock(return_value=response)
monkeypatch.setattr("shelfmark.download.clients.torrent_utils.requests.get", mock_get)

result = extract_torrent_info(
"http://prowlarr.example:9696/1/download?apikey=secret&indexer=7",
fetch_torrent=True,
)

assert result.info_hash == expected_hash
assert result.torrent_data == torrent_data
mock_get.assert_called_once()

def test_does_not_follow_trusted_torrent_url_redirect_to_untrusted_host(self, monkeypatch):
"""Trusted HTTP prefetch does not continue through arbitrary redirects."""
expected_hash = "3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0"
monkeypatch.setattr(
"shelfmark.download.clients.torrent_utils.config.get",
lambda key, default="": "https://prowlarr.example" if key == "PROWLARR_URL" else "",
)
response = MagicMock(status_code=302)
response.headers = {"Location": "https://attacker.example/book.torrent"}
mock_get = MagicMock(return_value=response)
monkeypatch.setattr("shelfmark.download.clients.torrent_utils.requests.get", mock_get)

result = extract_torrent_info(
"https://prowlarr.example/1/download?apikey=secret&indexer=7",
fetch_torrent=True,
expected_hash=expected_hash,
)

assert result.info_hash == expected_hash
assert result.torrent_data is None
assert result.is_magnet is False
mock_get.assert_called_once()


class TestExtractHashFromMagnet:
"""Tests for extracting hash from magnet links."""

Expand Down
Loading