-
Notifications
You must be signed in to change notification settings - Fork 158
feat: add S3 file extension #2241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
Tomas2D
wants to merge
1
commit into
main
Choose a base branch
from
feat/2136-s3-file-extension
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
5 changes: 5 additions & 0 deletions
5
apps/agentstack-sdk-py/src/agentstack_sdk/a2a/extensions/storage/__init__.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| # Copyright 2025 © BeeAI a Series of LF Projects, LLC | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| from .s3 import * | ||
| from .s3_credentials import * |
156 changes: 156 additions & 0 deletions
156
apps/agentstack-sdk-py/src/agentstack_sdk/a2a/extensions/storage/s3.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,156 @@ | ||
| # Copyright 2025 © BeeAI a Series of LF Projects, LLC | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import httpx | ||
| import pydantic | ||
| from pydantic import BaseModel | ||
|
|
||
| from agentstack_sdk.a2a.extensions.base import ( | ||
| BaseExtensionClient, | ||
| BaseExtensionServer, | ||
| NoParamsBaseExtensionSpec, | ||
| ) | ||
| from agentstack_sdk.a2a.extensions.exceptions import ExtensionError | ||
|
|
||
| __all__ = [ | ||
| "S3_EXTENSION_URI", | ||
| "S3Config", | ||
| "S3ExtensionClient", | ||
| "S3ExtensionMetadata", | ||
| "S3ExtensionServer", | ||
| "S3ExtensionSpec", | ||
| "S3UploadSlot", | ||
| ] | ||
|
|
||
| S3_EXTENSION_URI = "https://a2a-extensions.agentstack.beeai.dev/storage/s3/v1" | ||
|
|
||
|
|
||
| class S3UploadSlot(BaseModel): | ||
| """Pre-signed URLs for one upload slot.""" | ||
|
|
||
| upload_url: str | ||
| """Short-lived presigned PUT URL.""" | ||
|
|
||
| download_url: str | ||
| """Short-lived presigned GET URL for the same key.""" | ||
|
|
||
|
|
||
| class S3ExtensionMetadata(BaseModel): | ||
| """Passed in message.metadata[S3_EXTENSION_URI]""" | ||
|
|
||
| upload_slots: dict[str, S3UploadSlot] = {} | ||
| """Keyed by slot name (e.g. "result").""" | ||
|
|
||
|
|
||
| class S3ExtensionSpec(NoParamsBaseExtensionSpec): | ||
| URI = S3_EXTENSION_URI | ||
|
|
||
|
|
||
| class S3ExtensionServer(BaseExtensionServer[S3ExtensionSpec, S3ExtensionMetadata]): | ||
| """ | ||
| Provides file upload/download to agents via pre-signed URLs. | ||
| No S3 credentials required — URLs are injected by the client/platform. | ||
| """ | ||
|
|
||
| async def upload(self, slot: str, content: bytes, content_type: str = "application/octet-stream") -> str: | ||
| """Upload content to the named slot's presigned PUT URL. Returns the presigned download URL.""" | ||
| if not self.data: | ||
| raise ExtensionError(self.spec, "S3 extension metadata was not provided") | ||
| if slot not in self.data.upload_slots: | ||
| raise ExtensionError(self.spec, f"Upload slot '{slot}' not found in S3 extension metadata") | ||
|
|
||
| upload_slot = self.data.upload_slots[slot] | ||
| async with httpx.AsyncClient() as http: | ||
| resp = await http.put( | ||
| upload_slot.upload_url, | ||
| content=content, | ||
| headers={"Content-Type": content_type}, | ||
| ) | ||
| resp.raise_for_status() | ||
| return upload_slot.download_url | ||
|
|
||
| async def download(self, url: str) -> bytes: | ||
| """Download content from a presigned GET URL.""" | ||
| async with httpx.AsyncClient() as http: | ||
| resp = await http.get(url) | ||
| resp.raise_for_status() | ||
| return resp.content | ||
|
|
||
|
|
||
| class S3Config(pydantic.BaseModel): | ||
| """S3-compatible storage connection configuration.""" | ||
|
|
||
| endpoint_url: str | ||
| bucket: str | ||
| access_key: pydantic.SecretStr | ||
| secret_key: pydantic.SecretStr | ||
| region: str = "us-east-1" | ||
| use_ssl: bool = False | ||
|
|
||
|
|
||
| class S3ExtensionClient(BaseExtensionClient[S3ExtensionSpec, None]): | ||
| """ | ||
| Generates pre-signed upload/download URL pairs, scoped to context + user. | ||
| Used by the client/platform — requires S3 credentials. | ||
| """ | ||
|
|
||
| def __init__(self, config: S3Config, spec: S3ExtensionSpec | None = None) -> None: | ||
| super().__init__(spec or S3ExtensionSpec()) | ||
| self._config = config | ||
|
|
||
| async def create_upload_slot( | ||
| self, | ||
| *, | ||
| slot: str, # kept for naming clarity at call site, not used in URL generation | ||
| context_id: str, | ||
| user_id: str, | ||
| filename: str, | ||
| ttl: int = 3600, | ||
| ) -> S3UploadSlot: | ||
| """Generate a pair of presigned URLs for a specific upload slot, scoped to context + user.""" | ||
| try: | ||
| import aioboto3 | ||
| except ImportError as e: | ||
| raise ImportError( | ||
| "aioboto3 is required for S3ExtensionClient. Install it with: pip install 'agentstack-sdk[s3]'" | ||
| ) from e | ||
|
|
||
| from botocore.config import Config | ||
| from botocore.exceptions import ClientError | ||
|
|
||
| key = f"{context_id}/{user_id}/{filename}" | ||
| session = aioboto3.Session() | ||
| async with session.client( | ||
| "s3", | ||
| endpoint_url=self._config.endpoint_url, | ||
| aws_access_key_id=self._config.access_key.get_secret_value(), | ||
| aws_secret_access_key=self._config.secret_key.get_secret_value(), | ||
| region_name=self._config.region, | ||
| use_ssl=self._config.use_ssl, | ||
| config=Config(signature_version="s3v4", s3={"addressing_style": "path"}), | ||
| ) as s3: | ||
| try: | ||
| await s3.head_bucket(Bucket=self._config.bucket) | ||
| except ClientError as e: | ||
| if e.response["Error"]["Code"] in ("404", "NoSuchBucket"): | ||
| await s3.create_bucket(Bucket=self._config.bucket) | ||
| else: | ||
| raise | ||
|
|
||
| upload_url = await s3.generate_presigned_url( | ||
| "put_object", | ||
| Params={"Bucket": self._config.bucket, "Key": key}, | ||
| ExpiresIn=ttl, | ||
| ) | ||
| download_url = await s3.generate_presigned_url( | ||
| "get_object", | ||
| Params={"Bucket": self._config.bucket, "Key": key}, | ||
| ExpiresIn=ttl, | ||
| ) | ||
| return S3UploadSlot(upload_url=upload_url, download_url=download_url) | ||
|
|
||
| def metadata(self, slots: dict[str, S3UploadSlot]) -> dict: | ||
| """Build message metadata dict containing pre-signed URLs for each slot.""" | ||
| return {S3_EXTENSION_URI: S3ExtensionMetadata(upload_slots=slots).model_dump(mode="json")} | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check for bucket existence is performed on every call to
create_upload_slot. When creating multiple upload slots concurrently (e.g., withasyncio.gather), this can lead to redundanthead_bucketcalls and a potential race condition where multiple coroutines attempt to create the same bucket if it doesn't exist. Whilecreate_bucketis generally idempotent, this approach is inefficient.Consider performing this check only once per
S3ExtensionClientinstance. You could use anasyncio.Lockand a flag to ensure the check is performed only by the first coroutine that needs it.