Skip to content

Commit dbdcad8

Browse files
committed
fix: 🐛 "Future attached to a different loop" Fehler bei async_list_backups behoben
Response-Body-Streams von aiobotocore sind an den Worker-Thread-Event-Loop gebunden und können nicht im Home Assistant Event-Loop gelesen werden. - get_object_body(): Liest Body vollständig im Worker-Thread (Metadaten) - get_object_stream(): Streamt Body über thread-sichere Queue (Backups) - Behebt auch latenten Fehler in async_download_backup() Fixes #3
1 parent 78c278a commit dbdcad8

4 files changed

Lines changed: 65 additions & 5 deletions

File tree

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,18 @@ Alle wichtigen Änderungen an diesem Projekt werden in dieser Datei dokumentiert
55
Das Format basiert auf [Keep a Changelog](https://keepachangelog.com/de/1.0.0/),
66
und dieses Projekt folgt [Semantic Versioning](https://semver.org/lang/de/).
77

8+
## [0.1.6] - 2026-03-03
9+
10+
### 🐛 Fixed
11+
12+
- **"Future attached to a different loop" Fehler behoben** ([#3](https://github.com/bauer-group/IP-HomeassistantS3CompatibleBackup/issues/3))
13+
- `async_list_backups()` schlug fehl, weil der aiohttp-Response-Body-Stream an den Worker-Thread-Event-Loop gebunden war, aber im Home Assistant Event-Loop gelesen wurde
14+
- Neue Methode `get_object_body()` liest den gesamten Body im Worker-Thread (für kleine Objekte wie Metadaten-JSON)
15+
- Neue Methode `get_object_stream()` streamt Body-Chunks über eine thread-sichere Queue zwischen Worker- und Main-Event-Loop (für große Backup-Dateien)
16+
- Behebt auch einen latenten Fehler in `async_download_backup()`, der beim Wiederherstellen von Backups aufgetreten wäre
17+
18+
---
19+
820
## [0.1.5] - 2026-01-13
921

1022
### 🐛 Fixed

custom_components/bauergroup_s3compatiblebackup/__init__.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import asyncio
66
import logging
7+
from collections.abc import AsyncIterator
8+
from queue import Queue
79
from threading import Thread
810
from typing import Any, cast
911

@@ -140,6 +142,52 @@ async def get_object(self, **kwargs: Any) -> dict[str, Any]:
140142
"""Get an object from a bucket."""
141143
return await self._dispatch(self._client.get_object(**kwargs))
142144

145+
async def get_object_body(self, **kwargs: Any) -> bytes:
146+
"""Get an object and read its full body in the worker thread.
147+
148+
This avoids the 'Future attached to a different loop' error that occurs
149+
when trying to read a response body stream from a different event loop.
150+
Use this for small objects (e.g., metadata JSON files).
151+
"""
152+
153+
async def _get_and_read() -> bytes:
154+
response = await self._client.get_object(**kwargs)
155+
return await response["Body"].read()
156+
157+
return await self._dispatch(_get_and_read())
158+
159+
async def get_object_stream(self, **kwargs: Any) -> AsyncIterator[bytes]:
160+
"""Get an object and stream its body from the worker thread.
161+
162+
Uses a thread-safe queue to bridge the worker thread's event loop
163+
and the caller's event loop for streaming large objects (e.g., backups).
164+
"""
165+
data_queue: Queue[bytes | None] = Queue()
166+
error_holder: list[BaseException | None] = [None]
167+
168+
async def _stream_body() -> None:
169+
try:
170+
response = await self._client.get_object(**kwargs)
171+
async for chunk in response["Body"].iter_chunks():
172+
data_queue.put(chunk)
173+
except BaseException as exc:
174+
error_holder[0] = exc
175+
finally:
176+
data_queue.put(None) # Sentinel to signal completion
177+
178+
if self._loop is None:
179+
raise RuntimeError("Worker loop not started")
180+
asyncio.run_coroutine_threadsafe(_stream_body(), self._loop)
181+
182+
loop = asyncio.get_running_loop()
183+
while True:
184+
chunk = await loop.run_in_executor(None, data_queue.get)
185+
if chunk is None:
186+
if error_holder[0] is not None:
187+
raise error_holder[0]
188+
break
189+
yield chunk
190+
143191
async def put_object(self, **kwargs: Any) -> dict[str, Any]:
144192
"""Put an object into a bucket."""
145193
return await self._dispatch(self._client.put_object(**kwargs))

custom_components/bauergroup_s3compatiblebackup/backup.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,9 @@ async def async_download_backup(
148148
backup = await self._find_backup_by_id(backup_id)
149149
tar_filename, _ = suggested_filenames(backup)
150150

151-
response = await self._client.get_object(
151+
return self._client.get_object_stream(
152152
Bucket=self._bucket, Key=self._get_key(tar_filename)
153153
)
154-
return response["Body"].iter_chunks()
155154

156155
async def async_upload_backup(
157156
self,
@@ -363,10 +362,11 @@ async def _list_backups(self) -> dict[str, AgentBackup]:
363362
for metadata_file in metadata_files:
364363
try:
365364
# Download and parse metadata file
366-
metadata_response = await self._client.get_object(
365+
# Use get_object_body to read entirely in the worker thread,
366+
# avoiding 'Future attached to a different loop' errors
367+
metadata_content = await self._client.get_object_body(
367368
Bucket=self._bucket, Key=metadata_file["Key"]
368369
)
369-
metadata_content = await metadata_response["Body"].read()
370370
metadata_json = json.loads(metadata_content)
371371
except (BotoCoreError, json.JSONDecodeError) as err:
372372
_LOGGER.warning(

custom_components/bauergroup_s3compatiblebackup/manifest.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@
1616
"requirements": [
1717
"aiobotocore>=2.6.0,<3.0.0"
1818
],
19-
"version": "0.1.5"
19+
"version": "0.1.6"
2020
}

0 commit comments

Comments
 (0)