Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions docs/sdk/api.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -728,26 +728,41 @@ def get_user(self) -> UserResponse:
### get\_user\_data\_credentials

```python
get_user_data_credentials() -> UserDataCredentials
get_user_data_credentials(
duration: int = DEFAULT_FS_CREDENTIAL_DURATION,
) -> UserDataCredentials
```

Retrieves user data credentials for secondary storage access.

**Parameters:**

* **`duration`**
(`int`, default:
`DEFAULT_FS_CREDENTIAL_DURATION`
)
–Credential lifetime in seconds (default: 4 hours)

**Returns:**

* `UserDataCredentials`
–The user data credentials object.

<Accordion title="Source code in dreadnode/api/client.py" icon="code">
```python
def get_user_data_credentials(self) -> UserDataCredentials:
def get_user_data_credentials(
self, duration: int = DEFAULT_FS_CREDENTIAL_DURATION
) -> UserDataCredentials:
"""
Retrieves user data credentials for secondary storage access.

Args:
duration: Credential lifetime in seconds (default: 4 hours)

Returns:
The user data credentials object.
"""
response = self.request("GET", "/user-data/credentials")
response = self._request("GET", "/user-data/credentials", params={"duration": duration})
return UserDataCredentials(**response.json())
```

Expand Down
19 changes: 17 additions & 2 deletions docs/sdk/artifact.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ ArtifactStorage
---------------

```python
ArtifactStorage(file_system: AbstractFileSystem)
ArtifactStorage(
file_system: AbstractFileSystem,
credential_refresher: Callable[[], bool] | None = None,
)
```

Storage for artifacts with efficient handling of large files and directories.
Expand All @@ -260,17 +263,28 @@ Initialize artifact storage with a file system and prefix path.
* **`file_system`**
(`AbstractFileSystem`)
–FSSpec-compatible file system
* **`credential_refresher`**
(`Callable[[], bool] | None`, default:
`None`
)
–Optional function to refresh credentials when it's about to expire

<Accordion title="Source code in dreadnode/artifact/storage.py" icon="code">
```python
def __init__(self, file_system: fsspec.AbstractFileSystem):
def __init__(
self,
file_system: fsspec.AbstractFileSystem,
credential_refresher: t.Callable[[], bool] | None = None,
):
"""
Initialize artifact storage with a file system and prefix path.

Args:
file_system: FSSpec-compatible file system
credential_refresher: Optional function to refresh credentials when it's about to expire
"""
self._file_system = file_system
self._credential_refresher = credential_refresher
```


Expand Down Expand Up @@ -464,6 +478,7 @@ Store a file in the storage system, using multipart upload for large files.

<Accordion title="Source code in dreadnode/artifact/storage.py" icon="code">
```python
@with_credential_refresh
def store_file(self, file_path: Path, target_key: str) -> str:
"""
Store a file in the storage system, using multipart upload for large files.
Expand Down
22 changes: 15 additions & 7 deletions docs/sdk/main.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def __init__(
self._fs_prefix: str = ".dreadnode/storage/"

self._initialized = False

self._credentials: UserDataCredentials | None = None
self._credentials_expiry: datetime | None = None
```


Expand Down Expand Up @@ -380,6 +383,7 @@ def continue_run(self, run_context: RunContext) -> RunSpan:
tracer=self._get_tracer(),
file_system=self._fs,
prefix_path=self._fs_prefix,
credential_refresher=self._refresh_storage_credentials if self._credentials else None,
)
```

Expand Down Expand Up @@ -524,18 +528,21 @@ def initialize(self) -> None:
# )
# )

credentials = self._api.get_user_data_credentials()
resolved_endpoint = resolve_endpoint(credentials.endpoint)
self._credentials = self._api.get_user_data_credentials(
duration=DEFAULT_FS_CREDENTIAL_DURATION
)
self._credentials_expiry = self._credentials.expiration
resolved_endpoint = self._resolve_endpoint(self._credentials.endpoint)
self._fs = S3FileSystem(
key=credentials.access_key_id,
secret=credentials.secret_access_key,
token=credentials.session_token,
key=self._credentials.access_key_id,
secret=self._credentials.secret_access_key,
token=self._credentials.session_token,
client_kwargs={
"endpoint_url": resolved_endpoint,
"region_name": credentials.region,
"region_name": self._credentials.region,
},
)
self._fs_prefix = f"{credentials.bucket}/{credentials.prefix}/"
self._fs_prefix = f"{self._credentials.bucket}/{self._credentials.prefix}/"

self._logfire = logfire.configure(
local=not self.is_default,
Expand Down Expand Up @@ -1723,6 +1730,7 @@ def run(
file_system=self._fs,
prefix_path=self._fs_prefix,
autolog=autolog,
credential_refresher=self._refresh_storage_credentials if self._credentials else None,
)
```

Expand Down
4 changes: 2 additions & 2 deletions docs/sdk/metric.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ Metric
Metric(
value: float,
step: int = 0,
timestamp: datetime = lambda: datetime.now(
timezone.utc
timestamp: datetime = (
lambda: datetime.now(timezone.utc)
)(),
attributes: JsonDict = dict(),
)
Expand Down
25 changes: 20 additions & 5 deletions dreadnode/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
process_run,
process_task,
)
from dreadnode.constants import DEFAULT_MAX_POLL_TIME, DEFAULT_POLL_INTERVAL
from dreadnode.constants import (
DEFAULT_FS_CREDENTIAL_DURATION,
DEFAULT_MAX_POLL_TIME,
DEFAULT_POLL_INTERVAL,
)
from dreadnode.util import logger
from dreadnode.version import VERSION

Expand Down Expand Up @@ -306,7 +310,9 @@ def get_run(self, run: str | ULID) -> Run:
TraceFormat = t.Literal["tree", "flat"]

@t.overload
def get_run_tasks(self, run: str | ULID, *, format: t.Literal["tree"]) -> list[TaskTree]: ...
def get_run_tasks(
self, run: str | ULID, *, format: t.Literal["tree"]
) -> list[TaskTree]: ...

@t.overload
def get_run_tasks(
Expand Down Expand Up @@ -334,7 +340,9 @@ def get_run_tasks(
return tasks if format == "flat" else convert_flat_tasks_to_tree(tasks)

@t.overload
def get_run_trace(self, run: str | ULID, *, format: t.Literal["tree"]) -> list[TraceTree]: ...
def get_run_trace(
self, run: str | ULID, *, format: t.Literal["tree"]
) -> list[TraceTree]: ...

@t.overload
def get_run_trace(
Expand Down Expand Up @@ -517,12 +525,19 @@ def export_timeseries(

# User data access

def get_user_data_credentials(self) -> UserDataCredentials:
def get_user_data_credentials(
self, duration: int = DEFAULT_FS_CREDENTIAL_DURATION
) -> UserDataCredentials:
"""
Retrieves user data credentials for secondary storage access.

Args:
duration: Credential lifetime in seconds (default: 4 hours)

Returns:
The user data credentials object.
"""
response = self.request("GET", "/user-data/credentials")
response = self._request(
"GET", "/user-data/credentials", params={"duration": duration}
)
return UserDataCredentials(**response.json())
16 changes: 15 additions & 1 deletion dreadnode/artifact/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
"""

import hashlib
import typing as t
from pathlib import Path

import fsspec # type: ignore[import-untyped]

from dreadnode.storage_utils import with_credential_refresh
from dreadnode.util import logger

CHUNK_SIZE = 8 * 1024 * 1024 # 8MB
Expand All @@ -22,15 +24,27 @@ class ArtifactStorage:
- Batch uploads for directories handled by fsspec
"""

def __init__(self, file_system: fsspec.AbstractFileSystem):
def __init__(
self,
file_system: fsspec.AbstractFileSystem,
credential_refresher: t.Callable[[], bool] | None = None,
):
"""
Initialize artifact storage with a file system and prefix path.

Args:
file_system: FSSpec-compatible file system
credential_refresher: Optional function to refresh credentials when it's about to expire
"""
self._file_system = file_system
self._credential_refresher = credential_refresher

def _refresh_credentials_if_needed(self) -> None:
"""Refresh credentials if refresher is available."""
if self._credential_refresher:
self._credential_refresher()

@with_credential_refresh
def store_file(self, file_path: Path, target_key: str) -> str:
"""
Store a file in the storage system, using multipart upload for large files.
Expand Down
7 changes: 6 additions & 1 deletion dreadnode/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,10 @@
# path to the user configuration file
USER_CONFIG_PATH = pathlib.Path(
# allow overriding the user config file via env variable
os.getenv("DREADNODE_USER_CONFIG_FILE") or pathlib.Path.home() / ".dreadnode" / "config"
os.getenv("DREADNODE_USER_CONFIG_FILE")
or pathlib.Path.home() / ".dreadnode" / "config"
)

# Default values for the file system credential management
DEFAULT_FS_CREDENTIAL_DURATION = 14400 # 4 hours in seconds
FS_CREDENTIAL_REFRESH_BUFFER = 300 # 5 minutes in seconds
Loading
Loading