Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions py_modules/services/downloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,23 @@
_TMP_EXT = ".tmp"


def _resolve_local_file_name(rom_detail: dict, logger: logging.Logger) -> str:
"""Resolve the on-disk filename for a ROM.

For nested-single-file ROMs RomM reports ``fs_name`` as the parent folder,
so the actual filename (with extension) lives in ``files[0].file_name``.
For all other layouts ``fs_name`` is already the correct filename.
"""
fs_name = rom_detail.get("fs_name", f"rom_{rom_detail.get('id', 'unknown')}")
if not rom_detail.get("has_nested_single_file"):
return fs_name
files = rom_detail.get("files") or []
if not files:
logger.warning(f"has_nested_single_file=true but files list is empty; falling back to fs_name='{fs_name}'")
return fs_name
return files[0].get("file_name") or fs_name


class DownloadService:
"""ROM download engine: downloads and queue management."""

Expand Down Expand Up @@ -220,7 +237,7 @@ async def start_download(self, rom_id):
system = self._resolve_system(platform_slug, platform_fs_slug)

roms_dir = os.path.join(self._get_roms_path() if self._get_roms_path else "", system)
file_name = rom_detail.get("fs_name", f"rom_{rom_id}")
file_name = _resolve_local_file_name(rom_detail, self._logger)
# Fix 1: Sanitize fs_name to prevent path traversal
safe_name = os.path.basename(file_name)
if safe_name != file_name:
Expand All @@ -245,7 +262,7 @@ async def start_download(self, rom_id):
platform_name = rom_detail.get("platform_name", platform_slug)

try:
task = self._loop.create_task(self._do_download(rom_id, rom_detail, target_path, system))
task = self._loop.create_task(self._do_download(rom_id, rom_detail, target_path, system, file_name))
except Exception as e:
self._download_in_progress.discard(rom_id)
self._logger.error(f"Failed to start download task for ROM {rom_id}: {e}")
Expand Down Expand Up @@ -377,8 +394,7 @@ def progress_callback(downloaded, total):

return progress_callback

async def _do_download(self, rom_id, rom_detail, target_path, system):
file_name = rom_detail.get("fs_name", f"rom_{rom_id}")
async def _do_download(self, rom_id, rom_detail, target_path, system, file_name):
rom_name = rom_detail.get("name", file_name)
platform_name = rom_detail.get("platform_name", rom_detail.get("platform_slug", ""))
has_multiple = rom_detail.get("has_multiple_files", False)
Expand Down
275 changes: 269 additions & 6 deletions tests/services/test_downloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ def fake_download(_rom_id, _filename, dest, _progress_callback=None):
plugin._download_service._download_queue[42] = {"rom_id": 42, "status": "downloading", "progress": 0}

with patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download):
await plugin._download_service._do_download(42, rom_detail, target_path, "n64")
await plugin._download_service._do_download(42, rom_detail, target_path, "n64", "zelda.z64")

# File ends up at target_path (not .tmp)
assert os.path.exists(target_path)
Expand Down Expand Up @@ -850,7 +850,7 @@ def fake_download(_rom_id, _filename, dest, _progress_callback=None):
plugin._download_service._download_queue[55] = {"rom_id": 55, "status": "downloading", "progress": 0}

with patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download):
await plugin._download_service._do_download(55, rom_detail, target_path, "psx")
await plugin._download_service._do_download(55, rom_detail, target_path, "psx", "FF7.zip")

# ZIP is extracted to extract_dir
extract_dir = roms_dir / "FF7"
Expand All @@ -870,6 +870,269 @@ def fake_download(_rom_id, _filename, dest, _progress_callback=None):
assert plugin._download_service._download_queue[55]["status"] == "completed"


class TestDoDownloadNestedSingleFile:
"""Tests for has_nested_single_file: fs_name is the parent folder, not the file (#226)."""

@pytest.mark.asyncio
async def test_simple_single_file_unchanged(self, plugin, tmp_path):
"""Regression: simple-single-file still uses fs_name as the local filename."""
from unittest.mock import patch

import decky

plugin._download_service._runtime_dir = str(tmp_path)
decky.DECKY_USER_HOME = str(tmp_path)
plugin._download_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms")
plugin._download_service._get_bios_path = lambda: str(tmp_path / "retrodeck" / "bios")
plugin._rom_removal_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms")

roms_dir = tmp_path / "retrodeck" / "roms" / "gba"
roms_dir.mkdir(parents=True)
target_path = str(roms_dir / "Game.gba")

rom_detail = {
"id": 1,
"name": "Game",
"fs_name": "Game.gba",
"platform_slug": "gba",
"platform_name": "Game Boy Advance",
"has_simple_single_file": True,
"has_nested_single_file": False,
"has_multiple_files": False,
"files": [{"file_name": "Game.gba"}],
}

def fake_download(_rom_id, _filename, dest, _progress_callback=None):
with open(dest, "wb") as f:
f.write(b"\x00" * 64)

plugin._download_service._loop = asyncio.get_event_loop()
plugin._download_service._download_queue[1] = {"rom_id": 1, "status": "downloading", "progress": 0}

with patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download):
await plugin._download_service._do_download(1, rom_detail, target_path, "gba", "Game.gba")

assert os.path.exists(target_path)
installed = plugin._state["installed_roms"].get("1")
assert installed is not None
assert installed["file_name"] == "Game.gba"
assert installed["file_path"] == target_path

@pytest.mark.asyncio
async def test_nested_single_file_uses_files_entry(self, plugin, tmp_path):
"""Happy path: has_nested_single_file derives the local filename from files[0].file_name."""
from unittest.mock import patch

import decky

plugin._download_service._runtime_dir = str(tmp_path)
decky.DECKY_USER_HOME = str(tmp_path)
plugin._download_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms")
plugin._download_service._get_bios_path = lambda: str(tmp_path / "retrodeck" / "bios")
plugin._rom_removal_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms")

roms_dir = tmp_path / "retrodeck" / "roms" / "dc"
roms_dir.mkdir(parents=True)
target_path = str(roms_dir / "My Game.chd")

rom_detail = {
"id": 7,
"name": "My Game",
"fs_name": "My Game",
"platform_slug": "dc",
"platform_name": "Dreamcast",
"has_nested_single_file": True,
"has_multiple_files": False,
"files": [{"file_name": "My Game.chd"}],
}

def fake_download(_rom_id, _filename, dest, _progress_callback=None):
with open(dest, "wb") as f:
f.write(b"\x00" * 128)

plugin._download_service._loop = asyncio.get_event_loop()
plugin._download_service._download_queue[7] = {"rom_id": 7, "status": "downloading", "progress": 0}

with patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download):
await plugin._download_service._do_download(7, rom_detail, target_path, "dc", "My Game.chd")

assert os.path.exists(target_path)
installed = plugin._state["installed_roms"].get("7")
assert installed is not None
assert installed["file_name"] == "My Game.chd"
assert installed["file_path"] == target_path
# Must NOT keep the parent-folder name from fs_name as a real on-disk file
assert not os.path.exists(str(roms_dir / "My Game"))

@pytest.mark.asyncio
async def test_nested_single_file_start_download_uses_files_entry(self, plugin, tmp_path):
"""start_download: nested-single-file enters the queue with the resolved filename."""
from unittest.mock import AsyncMock, patch

import decky

decky.DECKY_USER_HOME = str(tmp_path)
plugin._download_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms")
plugin._download_service._get_bios_path = lambda: str(tmp_path / "retrodeck" / "bios")
plugin._rom_removal_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms")

rom_detail = {
"id": 7,
"name": "Resident Evil",
"fs_name": "Resident Evil",
"fs_size_bytes": 1024,
"platform_slug": "dc",
"platform_name": "Dreamcast",
"has_nested_single_file": True,
"has_multiple_files": False,
"files": [{"file_name": "Resident Evil.chd"}],
}

plugin._download_service._loop = MagicMock()
plugin._download_service._loop.run_in_executor = AsyncMock(return_value=rom_detail)

def _close_coro_task(coro):
coro.close()
return MagicMock()

plugin._download_service._loop.create_task = _close_coro_task

with patch("shutil.disk_usage", return_value=MagicMock(free=500 * 1024 * 1024)):
result = await plugin.start_download(7)

assert result["success"] is True
assert plugin._download_service._download_queue[7]["file_name"] == "Resident Evil.chd"

@pytest.mark.asyncio
async def test_nested_single_file_empty_files_falls_back(self, plugin, tmp_path, caplog):
"""Defensive: empty files list falls back to fs_name and logs a warning."""
import logging
from unittest.mock import AsyncMock, patch

import decky

decky.DECKY_USER_HOME = str(tmp_path)
plugin._download_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms")
plugin._download_service._get_bios_path = lambda: str(tmp_path / "retrodeck" / "bios")
plugin._rom_removal_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms")

rom_detail = {
"id": 8,
"name": "My Game",
"fs_name": "My Game",
"fs_size_bytes": 1024,
"platform_slug": "dc",
"platform_name": "Dreamcast",
"has_nested_single_file": True,
"has_multiple_files": False,
"files": [],
}

plugin._download_service._loop = MagicMock()
plugin._download_service._loop.run_in_executor = AsyncMock(return_value=rom_detail)

def _close_coro_task(coro):
coro.close()
return MagicMock()

plugin._download_service._loop.create_task = _close_coro_task

with (
caplog.at_level(logging.WARNING, logger="test_romm"),
patch("shutil.disk_usage", return_value=MagicMock(free=500 * 1024 * 1024)),
):
result = await plugin.start_download(8)

assert result["success"] is True
assert plugin._download_service._download_queue[8]["file_name"] == "My Game"
assert any("has_nested_single_file" in rec.message for rec in caplog.records)

@pytest.mark.asyncio
async def test_nested_single_file_missing_files_key_falls_back(self, plugin, tmp_path, caplog):
"""Defensive: missing files key falls back to fs_name and logs a warning."""
import logging
from unittest.mock import AsyncMock, patch

import decky

decky.DECKY_USER_HOME = str(tmp_path)
plugin._download_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms")
plugin._download_service._get_bios_path = lambda: str(tmp_path / "retrodeck" / "bios")
plugin._rom_removal_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms")

rom_detail = {
"id": 9,
"name": "My Game",
"fs_name": "My Game",
"fs_size_bytes": 1024,
"platform_slug": "dc",
"platform_name": "Dreamcast",
"has_nested_single_file": True,
"has_multiple_files": False,
# no "files" key at all
}

plugin._download_service._loop = MagicMock()
plugin._download_service._loop.run_in_executor = AsyncMock(return_value=rom_detail)

def _close_coro_task(coro):
coro.close()
return MagicMock()

plugin._download_service._loop.create_task = _close_coro_task

with (
caplog.at_level(logging.WARNING, logger="test_romm"),
patch("shutil.disk_usage", return_value=MagicMock(free=500 * 1024 * 1024)),
):
result = await plugin.start_download(9)

assert result["success"] is True
assert plugin._download_service._download_queue[9]["file_name"] == "My Game"
assert any("has_nested_single_file" in rec.message for rec in caplog.records)

@pytest.mark.asyncio
async def test_nested_single_file_traversal_sanitized(self, plugin, tmp_path):
"""Defensive: path traversal in files[0].file_name is sanitized via os.path.basename."""
from unittest.mock import AsyncMock, patch

import decky

decky.DECKY_USER_HOME = str(tmp_path)
plugin._download_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms")
plugin._download_service._get_bios_path = lambda: str(tmp_path / "retrodeck" / "bios")
plugin._rom_removal_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms")

rom_detail = {
"id": 13,
"name": "Evil Nested",
"fs_name": "Evil",
"fs_size_bytes": 1024,
"platform_slug": "dc",
"platform_name": "Dreamcast",
"has_nested_single_file": True,
"has_multiple_files": False,
"files": [{"file_name": "../evil.chd"}],
}

plugin._download_service._loop = MagicMock()
plugin._download_service._loop.run_in_executor = AsyncMock(return_value=rom_detail)

def _close_coro_task(coro):
coro.close()
return MagicMock()

plugin._download_service._loop.create_task = _close_coro_task

with patch("shutil.disk_usage", return_value=MagicMock(free=500 * 1024 * 1024)):
result = await plugin.start_download(13)

assert result["success"] is True
queue_entry = plugin._download_service._download_queue[13]
assert queue_entry["file_name"] == "evil.chd"
assert ".." not in queue_entry["file_name"]


class TestPathTraversalDeleteRomFiles:
"""Tests for path traversal safety in _delete_rom_files."""

Expand Down Expand Up @@ -1045,7 +1308,7 @@ def fake_download_cancel(_rom_id, _filename, dest, _progress_callback=None):
patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download_cancel),
pytest.raises(asyncio.CancelledError),
):
await plugin._download_service._do_download(42, rom_detail, target_path, "n64")
await plugin._download_service._do_download(42, rom_detail, target_path, "n64", "zelda.z64")

assert plugin._download_service._download_queue[42]["status"] == "cancelled"
assert not os.path.exists(target_path)
Expand Down Expand Up @@ -1089,7 +1352,7 @@ def fake_download(_rom_id, _filename, dest, _progress_callback=None):
plugin._download_service._download_queue[66] = {"rom_id": 66, "status": "downloading", "progress": 0}

with patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download):
await plugin._download_service._do_download(66, rom_detail, target_path, "psx")
await plugin._download_service._do_download(66, rom_detail, target_path, "psx", "game.zip")

assert plugin._download_service._download_queue[66]["status"] == "failed"
# .zip.tmp should be cleaned up
Expand Down Expand Up @@ -1295,7 +1558,7 @@ def fake_download(_rom_id, _filename, dest, _progress_callback=None):
plugin._download_service._download_queue[99] = {"rom_id": 99, "status": "downloading", "progress": 0}

with patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download):
await plugin._download_service._do_download(99, rom_detail, target_path, "psx")
await plugin._download_service._do_download(99, rom_detail, target_path, "psx", "Vagrant Story (USA).zip")

extract_dir = roms_dir / "Vagrant Story (USA)"
# URL-encoded filenames should be decoded
Expand Down Expand Up @@ -1349,7 +1612,7 @@ def fake_download(_rom_id, _filename, dest, _progress_callback=None):
plugin._download_service._download_queue[55] = {"rom_id": 55, "status": "downloading", "progress": 0}

with patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download):
await plugin._download_service._do_download(55, rom_detail, target_path, "psx")
await plugin._download_service._do_download(55, rom_detail, target_path, "psx", "FF7.zip")

extract_dir = roms_dir / "FF7"
# Normal filenames should be unchanged
Expand Down
Loading
Loading