From 28225a54653fb0f6813d3358e7f8904d89ed2fe1 Mon Sep 17 00:00:00 2001 From: danielcopper Date: Mon, 4 May 2026 17:02:45 +0200 Subject: [PATCH] fix(downloads)!: preserve file extension for nested single-file ROMs (#226) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a ROM is stored as a single file inside a per-game folder in RomM (e.g. `Resident Evil/Resident Evil.chd`), the API sets `has_nested_single_file=true` and `fs_name` is the folder name without extension. The plugin used `fs_name` as the local target filename, so the downloaded ROM was stored without its extension and could not be launched. Resolve the on-disk filename from `files[0].file_name` when `has_nested_single_file=true`, falling back to `fs_name` with a warning if `files` is empty or missing. Nested-single-file ROMs land flat in the platform folder (no folder mirroring) — by definition they have no sidecars, so the parent folder adds no value locally, and flat layout matches `has_simple_single_file` behavior. Multi-file ROMs are unaffected. BREAKING CHANGE: Existing installs of nested single-file ROMs were stored locally without their file extension. Affected ROMs must be re-downloaded (or re-synced via the plugin) after updating so the on-disk filename is corrected — buggy entries cannot be patched in place. --- py_modules/services/downloads.py | 24 ++- tests/services/test_downloads.py | 275 ++++++++++++++++++++++++++++- tests/services/test_rom_removal.py | 22 +++ 3 files changed, 311 insertions(+), 10 deletions(-) diff --git a/py_modules/services/downloads.py b/py_modules/services/downloads.py index b48e2ca..55eeaac 100644 --- a/py_modules/services/downloads.py +++ b/py_modules/services/downloads.py @@ -38,6 +38,23 @@ _TMP_EXT = ".tmp" +def _resolve_local_file_name(rom_detail: dict, logger: logging.Logger) -> str: + """Resolve the on-disk filename for a ROM. + + For nested-single-file ROMs RomM reports ``fs_name`` as the parent folder, + so the actual filename (with extension) lives in ``files[0].file_name``. + For all other layouts ``fs_name`` is already the correct filename. + """ + fs_name = rom_detail.get("fs_name", f"rom_{rom_detail.get('id', 'unknown')}") + if not rom_detail.get("has_nested_single_file"): + return fs_name + files = rom_detail.get("files") or [] + if not files: + logger.warning(f"has_nested_single_file=true but files list is empty; falling back to fs_name='{fs_name}'") + return fs_name + return files[0].get("file_name") or fs_name + + class DownloadService: """ROM download engine: downloads and queue management.""" @@ -220,7 +237,7 @@ async def start_download(self, rom_id): system = self._resolve_system(platform_slug, platform_fs_slug) roms_dir = os.path.join(self._get_roms_path() if self._get_roms_path else "", system) - file_name = rom_detail.get("fs_name", f"rom_{rom_id}") + file_name = _resolve_local_file_name(rom_detail, self._logger) # Fix 1: Sanitize fs_name to prevent path traversal safe_name = os.path.basename(file_name) if safe_name != file_name: @@ -245,7 +262,7 @@ async def start_download(self, rom_id): platform_name = rom_detail.get("platform_name", platform_slug) try: - task = self._loop.create_task(self._do_download(rom_id, rom_detail, target_path, system)) + task = self._loop.create_task(self._do_download(rom_id, rom_detail, target_path, system, file_name)) except Exception as e: self._download_in_progress.discard(rom_id) self._logger.error(f"Failed to start download task for ROM {rom_id}: {e}") @@ -377,8 +394,7 @@ def progress_callback(downloaded, total): return progress_callback - async def _do_download(self, rom_id, rom_detail, target_path, system): - file_name = rom_detail.get("fs_name", f"rom_{rom_id}") + async def _do_download(self, rom_id, rom_detail, target_path, system, file_name): rom_name = rom_detail.get("name", file_name) platform_name = rom_detail.get("platform_name", rom_detail.get("platform_slug", "")) has_multiple = rom_detail.get("has_multiple_files", False) diff --git a/tests/services/test_downloads.py b/tests/services/test_downloads.py index 8b4d5de..3e3a0d1 100644 --- a/tests/services/test_downloads.py +++ b/tests/services/test_downloads.py @@ -782,7 +782,7 @@ def fake_download(_rom_id, _filename, dest, _progress_callback=None): plugin._download_service._download_queue[42] = {"rom_id": 42, "status": "downloading", "progress": 0} with patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download): - await plugin._download_service._do_download(42, rom_detail, target_path, "n64") + await plugin._download_service._do_download(42, rom_detail, target_path, "n64", "zelda.z64") # File ends up at target_path (not .tmp) assert os.path.exists(target_path) @@ -850,7 +850,7 @@ def fake_download(_rom_id, _filename, dest, _progress_callback=None): plugin._download_service._download_queue[55] = {"rom_id": 55, "status": "downloading", "progress": 0} with patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download): - await plugin._download_service._do_download(55, rom_detail, target_path, "psx") + await plugin._download_service._do_download(55, rom_detail, target_path, "psx", "FF7.zip") # ZIP is extracted to extract_dir extract_dir = roms_dir / "FF7" @@ -870,6 +870,269 @@ def fake_download(_rom_id, _filename, dest, _progress_callback=None): assert plugin._download_service._download_queue[55]["status"] == "completed" +class TestDoDownloadNestedSingleFile: + """Tests for has_nested_single_file: fs_name is the parent folder, not the file (#226).""" + + @pytest.mark.asyncio + async def test_simple_single_file_unchanged(self, plugin, tmp_path): + """Regression: simple-single-file still uses fs_name as the local filename.""" + from unittest.mock import patch + + import decky + + plugin._download_service._runtime_dir = str(tmp_path) + decky.DECKY_USER_HOME = str(tmp_path) + plugin._download_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms") + plugin._download_service._get_bios_path = lambda: str(tmp_path / "retrodeck" / "bios") + plugin._rom_removal_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms") + + roms_dir = tmp_path / "retrodeck" / "roms" / "gba" + roms_dir.mkdir(parents=True) + target_path = str(roms_dir / "Game.gba") + + rom_detail = { + "id": 1, + "name": "Game", + "fs_name": "Game.gba", + "platform_slug": "gba", + "platform_name": "Game Boy Advance", + "has_simple_single_file": True, + "has_nested_single_file": False, + "has_multiple_files": False, + "files": [{"file_name": "Game.gba"}], + } + + def fake_download(_rom_id, _filename, dest, _progress_callback=None): + with open(dest, "wb") as f: + f.write(b"\x00" * 64) + + plugin._download_service._loop = asyncio.get_event_loop() + plugin._download_service._download_queue[1] = {"rom_id": 1, "status": "downloading", "progress": 0} + + with patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download): + await plugin._download_service._do_download(1, rom_detail, target_path, "gba", "Game.gba") + + assert os.path.exists(target_path) + installed = plugin._state["installed_roms"].get("1") + assert installed is not None + assert installed["file_name"] == "Game.gba" + assert installed["file_path"] == target_path + + @pytest.mark.asyncio + async def test_nested_single_file_uses_files_entry(self, plugin, tmp_path): + """Happy path: has_nested_single_file derives the local filename from files[0].file_name.""" + from unittest.mock import patch + + import decky + + plugin._download_service._runtime_dir = str(tmp_path) + decky.DECKY_USER_HOME = str(tmp_path) + plugin._download_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms") + plugin._download_service._get_bios_path = lambda: str(tmp_path / "retrodeck" / "bios") + plugin._rom_removal_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms") + + roms_dir = tmp_path / "retrodeck" / "roms" / "dc" + roms_dir.mkdir(parents=True) + target_path = str(roms_dir / "My Game.chd") + + rom_detail = { + "id": 7, + "name": "My Game", + "fs_name": "My Game", + "platform_slug": "dc", + "platform_name": "Dreamcast", + "has_nested_single_file": True, + "has_multiple_files": False, + "files": [{"file_name": "My Game.chd"}], + } + + def fake_download(_rom_id, _filename, dest, _progress_callback=None): + with open(dest, "wb") as f: + f.write(b"\x00" * 128) + + plugin._download_service._loop = asyncio.get_event_loop() + plugin._download_service._download_queue[7] = {"rom_id": 7, "status": "downloading", "progress": 0} + + with patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download): + await plugin._download_service._do_download(7, rom_detail, target_path, "dc", "My Game.chd") + + assert os.path.exists(target_path) + installed = plugin._state["installed_roms"].get("7") + assert installed is not None + assert installed["file_name"] == "My Game.chd" + assert installed["file_path"] == target_path + # Must NOT keep the parent-folder name from fs_name as a real on-disk file + assert not os.path.exists(str(roms_dir / "My Game")) + + @pytest.mark.asyncio + async def test_nested_single_file_start_download_uses_files_entry(self, plugin, tmp_path): + """start_download: nested-single-file enters the queue with the resolved filename.""" + from unittest.mock import AsyncMock, patch + + import decky + + decky.DECKY_USER_HOME = str(tmp_path) + plugin._download_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms") + plugin._download_service._get_bios_path = lambda: str(tmp_path / "retrodeck" / "bios") + plugin._rom_removal_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms") + + rom_detail = { + "id": 7, + "name": "Resident Evil", + "fs_name": "Resident Evil", + "fs_size_bytes": 1024, + "platform_slug": "dc", + "platform_name": "Dreamcast", + "has_nested_single_file": True, + "has_multiple_files": False, + "files": [{"file_name": "Resident Evil.chd"}], + } + + plugin._download_service._loop = MagicMock() + plugin._download_service._loop.run_in_executor = AsyncMock(return_value=rom_detail) + + def _close_coro_task(coro): + coro.close() + return MagicMock() + + plugin._download_service._loop.create_task = _close_coro_task + + with patch("shutil.disk_usage", return_value=MagicMock(free=500 * 1024 * 1024)): + result = await plugin.start_download(7) + + assert result["success"] is True + assert plugin._download_service._download_queue[7]["file_name"] == "Resident Evil.chd" + + @pytest.mark.asyncio + async def test_nested_single_file_empty_files_falls_back(self, plugin, tmp_path, caplog): + """Defensive: empty files list falls back to fs_name and logs a warning.""" + import logging + from unittest.mock import AsyncMock, patch + + import decky + + decky.DECKY_USER_HOME = str(tmp_path) + plugin._download_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms") + plugin._download_service._get_bios_path = lambda: str(tmp_path / "retrodeck" / "bios") + plugin._rom_removal_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms") + + rom_detail = { + "id": 8, + "name": "My Game", + "fs_name": "My Game", + "fs_size_bytes": 1024, + "platform_slug": "dc", + "platform_name": "Dreamcast", + "has_nested_single_file": True, + "has_multiple_files": False, + "files": [], + } + + plugin._download_service._loop = MagicMock() + plugin._download_service._loop.run_in_executor = AsyncMock(return_value=rom_detail) + + def _close_coro_task(coro): + coro.close() + return MagicMock() + + plugin._download_service._loop.create_task = _close_coro_task + + with ( + caplog.at_level(logging.WARNING, logger="test_romm"), + patch("shutil.disk_usage", return_value=MagicMock(free=500 * 1024 * 1024)), + ): + result = await plugin.start_download(8) + + assert result["success"] is True + assert plugin._download_service._download_queue[8]["file_name"] == "My Game" + assert any("has_nested_single_file" in rec.message for rec in caplog.records) + + @pytest.mark.asyncio + async def test_nested_single_file_missing_files_key_falls_back(self, plugin, tmp_path, caplog): + """Defensive: missing files key falls back to fs_name and logs a warning.""" + import logging + from unittest.mock import AsyncMock, patch + + import decky + + decky.DECKY_USER_HOME = str(tmp_path) + plugin._download_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms") + plugin._download_service._get_bios_path = lambda: str(tmp_path / "retrodeck" / "bios") + plugin._rom_removal_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms") + + rom_detail = { + "id": 9, + "name": "My Game", + "fs_name": "My Game", + "fs_size_bytes": 1024, + "platform_slug": "dc", + "platform_name": "Dreamcast", + "has_nested_single_file": True, + "has_multiple_files": False, + # no "files" key at all + } + + plugin._download_service._loop = MagicMock() + plugin._download_service._loop.run_in_executor = AsyncMock(return_value=rom_detail) + + def _close_coro_task(coro): + coro.close() + return MagicMock() + + plugin._download_service._loop.create_task = _close_coro_task + + with ( + caplog.at_level(logging.WARNING, logger="test_romm"), + patch("shutil.disk_usage", return_value=MagicMock(free=500 * 1024 * 1024)), + ): + result = await plugin.start_download(9) + + assert result["success"] is True + assert plugin._download_service._download_queue[9]["file_name"] == "My Game" + assert any("has_nested_single_file" in rec.message for rec in caplog.records) + + @pytest.mark.asyncio + async def test_nested_single_file_traversal_sanitized(self, plugin, tmp_path): + """Defensive: path traversal in files[0].file_name is sanitized via os.path.basename.""" + from unittest.mock import AsyncMock, patch + + import decky + + decky.DECKY_USER_HOME = str(tmp_path) + plugin._download_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms") + plugin._download_service._get_bios_path = lambda: str(tmp_path / "retrodeck" / "bios") + plugin._rom_removal_service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms") + + rom_detail = { + "id": 13, + "name": "Evil Nested", + "fs_name": "Evil", + "fs_size_bytes": 1024, + "platform_slug": "dc", + "platform_name": "Dreamcast", + "has_nested_single_file": True, + "has_multiple_files": False, + "files": [{"file_name": "../evil.chd"}], + } + + plugin._download_service._loop = MagicMock() + plugin._download_service._loop.run_in_executor = AsyncMock(return_value=rom_detail) + + def _close_coro_task(coro): + coro.close() + return MagicMock() + + plugin._download_service._loop.create_task = _close_coro_task + + with patch("shutil.disk_usage", return_value=MagicMock(free=500 * 1024 * 1024)): + result = await plugin.start_download(13) + + assert result["success"] is True + queue_entry = plugin._download_service._download_queue[13] + assert queue_entry["file_name"] == "evil.chd" + assert ".." not in queue_entry["file_name"] + + class TestPathTraversalDeleteRomFiles: """Tests for path traversal safety in _delete_rom_files.""" @@ -1045,7 +1308,7 @@ def fake_download_cancel(_rom_id, _filename, dest, _progress_callback=None): patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download_cancel), pytest.raises(asyncio.CancelledError), ): - await plugin._download_service._do_download(42, rom_detail, target_path, "n64") + await plugin._download_service._do_download(42, rom_detail, target_path, "n64", "zelda.z64") assert plugin._download_service._download_queue[42]["status"] == "cancelled" assert not os.path.exists(target_path) @@ -1089,7 +1352,7 @@ def fake_download(_rom_id, _filename, dest, _progress_callback=None): plugin._download_service._download_queue[66] = {"rom_id": 66, "status": "downloading", "progress": 0} with patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download): - await plugin._download_service._do_download(66, rom_detail, target_path, "psx") + await plugin._download_service._do_download(66, rom_detail, target_path, "psx", "game.zip") assert plugin._download_service._download_queue[66]["status"] == "failed" # .zip.tmp should be cleaned up @@ -1295,7 +1558,7 @@ def fake_download(_rom_id, _filename, dest, _progress_callback=None): plugin._download_service._download_queue[99] = {"rom_id": 99, "status": "downloading", "progress": 0} with patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download): - await plugin._download_service._do_download(99, rom_detail, target_path, "psx") + await plugin._download_service._do_download(99, rom_detail, target_path, "psx", "Vagrant Story (USA).zip") extract_dir = roms_dir / "Vagrant Story (USA)" # URL-encoded filenames should be decoded @@ -1349,7 +1612,7 @@ def fake_download(_rom_id, _filename, dest, _progress_callback=None): plugin._download_service._download_queue[55] = {"rom_id": 55, "status": "downloading", "progress": 0} with patch.object(plugin._romm_api, "download_rom_content", side_effect=fake_download): - await plugin._download_service._do_download(55, rom_detail, target_path, "psx") + await plugin._download_service._do_download(55, rom_detail, target_path, "psx", "FF7.zip") extract_dir = roms_dir / "FF7" # Normal filenames should be unchanged diff --git a/tests/services/test_rom_removal.py b/tests/services/test_rom_removal.py index 117691c..1d56714 100644 --- a/tests/services/test_rom_removal.py +++ b/tests/services/test_rom_removal.py @@ -256,6 +256,28 @@ async def test_path_traversal_rejected(self, service, state, tmp_path): assert evil.exists() assert "99" not in state["installed_roms"] + @pytest.mark.asyncio + async def test_removes_nested_single_file_entry(self, service, state, tmp_path): + """Nested-single-file entries (#226) store the resolved filename in file_path with no rom_dir.""" + service._get_roms_path = lambda: str(tmp_path / "retrodeck" / "roms") + rom = tmp_path / "retrodeck" / "roms" / "dc" / "Resident Evil.chd" + rom.parent.mkdir(parents=True) + rom.write_bytes(b"\x00" * 100) + + state["installed_roms"]["42"] = { + "rom_id": 42, + "file_name": "Resident Evil.chd", + "file_path": str(rom), + "system": "dc", + } + + result = await service.remove_rom(42) + assert result["success"] is True + assert not rom.exists() + # Parent system dir should still exist + assert (tmp_path / "retrodeck" / "roms" / "dc").exists() + assert "42" not in state["installed_roms"] + class TestUninstallAllRoms: @pytest.mark.asyncio