diff --git a/docs/intro.mdx b/docs/intro.mdx index e33d02c5..8f869b16 100644 --- a/docs/intro.mdx +++ b/docs/intro.mdx @@ -237,7 +237,7 @@ async def execute_command(command: str) -> str: stdout, stderr = await process.communicate() # Log additional information - dn.log_param("exit_code", process.returncode) + dn.log_output("exit_code", process.returncode) result = stdout.decode() if process.returncode == 0 else stderr.decode() return result # Automatically logged as output diff --git a/docs/sdk/main.mdx b/docs/sdk/main.mdx index d382da95..463cd670 100644 --- a/docs/sdk/main.mdx +++ b/docs/sdk/main.mdx @@ -1280,12 +1280,7 @@ def log_outputs( ### log\_param ```python -log_param( - key: str, - value: JsonValue, - *, - to: ToObject = "task-or-run", -) -> None +log_param(key: str, value: JsonValue) -> None ``` Log a single parameter to the current task or run. @@ -1309,13 +1304,6 @@ with dreadnode.run("my_run"): * **`value`** (`JsonValue`) –The value of the parameter. -* **`to`** - (`ToObject`, default: - `'task-or-run'` - ) - –The target object to log the parameter to. Can be "task-or-run" or "run". - Defaults to "task-or-run". If "task-or-run", the parameter will be logged - to the current task or run, whichever is the nearest ancestor. ```python @@ -1324,8 +1312,6 @@ def log_param( self, key: str, value: JsonValue, - *, - to: ToObject = "task-or-run", ) -> None: """ Log a single parameter to the current task or run. @@ -1343,11 +1329,8 @@ def log_param( Args: key: The name of the parameter. value: The value of the parameter. - to: The target object to log the parameter to. Can be "task-or-run" or "run". - Defaults to "task-or-run". If "task-or-run", the parameter will be logged - to the current task or run, whichever is the nearest ancestor. """ - self.log_params(to=to, **{key: value}) + self.log_params(**{key: value}) ``` @@ -1356,9 +1339,7 @@ def log_param( ### log\_params ```python -log_params( - to: ToObject = "run", **params: JsonValue -) -> None +log_params(**params: JsonValue) -> None ``` Log multiple parameters to the current task or run. @@ -1379,13 +1360,6 @@ with dreadnode.run("my_run"): **Parameters:** -* **`to`** - (`ToObject`, default: - `'run'` - ) - –The target object to log the parameters to. Can be "task-or-run" or "run". - Defaults to "task-or-run". If "task-or-run", the parameters will be logged - to the current task or run, whichever is the nearest ancestor. * **`**params`** (`JsonValue`, default: `{}` @@ -1395,7 +1369,7 @@ with dreadnode.run("my_run"): ```python @handle_internal_errors() -def log_params(self, to: ToObject = "run", **params: JsonValue) -> None: +def log_params(self, **params: JsonValue) -> None: """ Log multiple parameters to the current task or run. @@ -1413,19 +1387,11 @@ def log_params(self, to: ToObject = "run", **params: JsonValue) -> None: ~~~ Args: - to: The target object to log the parameters to. Can be "task-or-run" or "run". - Defaults to "task-or-run". If "task-or-run", the parameters will be logged - to the current task or run, whichever is the nearest ancestor. **params: The parameters to log. Each parameter is a key-value pair. """ - task = current_task_span.get() - run = current_run_span.get() - - target = (task or run) if to == "task-or-run" else run - if target is None: - raise RuntimeError("log_params() must be called within a run") - - target.log_params(**params) + if (run := current_run_span.get()) is None: + raise RuntimeError("Parameters must be logged within a run") + run.log_params(**params) ``` @@ -1543,7 +1509,7 @@ with dreadnode.run("my_run"): (`bool`, default: `True` ) - –Whether to automatically log task inputs, outputs, and execution metrics if unspecified. + –Whether to automatically log task inputs, outputs, and execution metrics if otherwise unspecified. * **`**attributes`** (`Any`, default: `{}` @@ -1592,7 +1558,7 @@ def run( project: The project name to associate the run with. If not provided, the project passed to `configure()` will be used, or the run will be associated with a default project. - autolog: Whether to automatically log task inputs, outputs, and execution metrics if unspecified. + autolog: Whether to automatically log task inputs, outputs, and execution metrics if otherwise unspecified. **attributes: Additional attributes to attach to the run span. Returns: @@ -1919,11 +1885,11 @@ task( scorers: None = None, name: str | None = None, label: str | None = None, - log_params: Sequence[str] | bool = False, log_inputs: Sequence[str] | bool | Inherited = INHERITED, log_output: bool | Inherited = INHERITED, + log_execution_metrics: bool = False, tags: Sequence[str] | None = None, **attributes: Any, ) -> TaskDecorator @@ -1935,11 +1901,11 @@ task( scorers: Sequence[Scorer[R] | ScorerCallable[R]], name: str | None = None, label: str | None = None, - log_params: Sequence[str] | bool = False, log_inputs: Sequence[str] | bool | Inherited = INHERITED, log_output: bool | Inherited = INHERITED, + log_execution_metrics: bool = False, tags: Sequence[str] | None = None, **attributes: Any, ) -> ScoredTaskDecorator[R] @@ -1952,11 +1918,11 @@ task( | None = None, name: str | None = None, label: str | None = None, - log_params: Sequence[str] | bool = False, log_inputs: Sequence[str] | bool | Inherited = INHERITED, log_output: bool | Inherited = INHERITED, + log_execution_metrics: bool = False, tags: Sequence[str] | None = None, **attributes: Any, ) -> TaskDecorator @@ -1992,21 +1958,21 @@ await my_task(2) `None` ) –The label of the task - useful for filtering in the UI. -* **`log_params`** - (`Sequence[str] | bool`, default: - `False` - ) - –Whether to log all, or specific, incoming arguments to the function as parameters. * **`log_inputs`** (`Sequence[str] | bool | Inherited`, default: `INHERITED` ) - –Whether to log all, or specific, incoming arguments to the function as inputs. + –Log all, or specific, incoming arguments to the function as inputs. * **`log_output`** (`bool | Inherited`, default: `INHERITED` ) - –Whether to log the result of the function as an output. + –Log the result of the function as an output. +* **`log_execution_metrics`** + (`bool`, default: + `False` + ) + –Log execution metrics for the task, such as success rate and run count. * **`tags`** (`Sequence[str] | None`, default: `None` @@ -2031,9 +1997,9 @@ def task( scorers: t.Sequence[Scorer[t.Any] | ScorerCallable[t.Any]] | None = None, name: str | None = None, label: str | None = None, - log_params: t.Sequence[str] | bool = False, log_inputs: t.Sequence[str] | bool | Inherited = INHERITED, log_output: bool | Inherited = INHERITED, + log_execution_metrics: bool = False, tags: t.Sequence[str] | None = None, **attributes: t.Any, ) -> TaskDecorator: @@ -2054,9 +2020,9 @@ def task( of the task and will be passed the task's output. name: The name of the task. label: The label of the task - useful for filtering in the UI. - log_params: Whether to log all, or specific, incoming arguments to the function as parameters. - log_inputs: Whether to log all, or specific, incoming arguments to the function as inputs. - log_output: Whether to log the result of the function as an output. + log_inputs: Log all, or specific, incoming arguments to the function as inputs. + log_output: Log the result of the function as an output. + log_execution_metrics: Log execution metrics for the task, such as success rate and run count. tags: A list of tags to attach to the task span. **attributes: A dictionary of attributes to attach to the task span. @@ -2109,9 +2075,9 @@ def task( for scorer in scorers or [] ], tags=list(tags or []), - log_params=log_params, log_inputs=log_inputs, log_output=log_output, + log_execution_metrics=log_execution_metrics, label=_label, ) @@ -2128,7 +2094,6 @@ task_span( name: str, *, label: str | None = None, - params: AnyDict | None = None, tags: Sequence[str] | None = None, **attributes: Any, ) -> TaskSpan[t.Any] @@ -2150,7 +2115,6 @@ async with dreadnode.task_span("my_task") as task: Args: name: The name of the task. label: The label of the task - useful for filtering in the UI. -params: A dictionary of parameters to attach to the task span. tags: A list of tags to attach to the task span. \*\*attributes: A dictionary of attributes to attach to the task span. @@ -2166,7 +2130,6 @@ def task_span( name: str, *, label: str | None = None, - params: AnyDict | None = None, tags: t.Sequence[str] | None = None, **attributes: t.Any, ) -> TaskSpan[t.Any]: @@ -2185,7 +2148,6 @@ def task_span( Args: name: The name of the task. label: The label of the task - useful for filtering in the UI. - params: A dictionary of parameters to attach to the task span. tags: A list of tags to attach to the task span. **attributes: A dictionary of attributes to attach to the task span. @@ -2200,7 +2162,6 @@ def task_span( name=name, label=label, attributes=attributes, - params=params, tags=tags, run_id=run.run_id, tracer=self._get_tracer(), diff --git a/docs/sdk/serialization.mdx b/docs/sdk/serialization.mdx index 1ddc47f9..19aba1f2 100644 --- a/docs/sdk/serialization.mdx +++ b/docs/sdk/serialization.mdx @@ -6,6 +6,53 @@ title: dreadnode.serialization ::: dreadnode.serialization */} +seems\_useful\_to\_serialize +---------------------------- + +```python +seems_useful_to_serialize(obj: Any) -> bool +``` + +Checks if the object is likely useful to serialize by attempting to +serialize it and checking if the resulting schema indicates a known type. + +**Parameters:** + +* **`obj`** + (`Any`) + –The Python object to check. + +**Returns:** + +* **`bool`** ( `bool` + ) –True if the object is likely useful to serialize, False otherwise. + + +```python +def seems_useful_to_serialize(obj: t.Any) -> bool: + """ + Checks if the object is likely useful to serialize by attempting to + serialize it and checking if the resulting schema indicates a known type. + + Args: + obj: The Python object to check. + + Returns: + bool: True if the object is likely useful to serialize, False otherwise. + """ + if obj is None: + return False + + with contextlib.suppress(Exception): + _, schema = _serialize(obj) + return schema.get("x-python-datatype") != "unknown" + + return False +``` + + + + serialize --------- diff --git a/docs/sdk/task.mdx b/docs/sdk/task.mdx index 6b63cd29..b5610e2f 100644 --- a/docs/sdk/task.mdx +++ b/docs/sdk/task.mdx @@ -18,11 +18,11 @@ Task( func: Callable[P, R], scorers: list[Scorer[R]], tags: list[str], - log_params: Sequence[str] | bool = False, log_inputs: Sequence[str] | bool | Inherited = INHERITED, log_output: bool | Inherited = INHERITED, + log_execution_metrics: bool = False, ) ``` @@ -54,29 +54,29 @@ label: str The label of the task - used to group associated metrics and data together. -### log\_inputs +### log\_execution\_metrics ```python -log_inputs: Sequence[str] | bool | Inherited = INHERITED +log_execution_metrics: bool = False ``` -Whether to log all, or specific, incoming arguments to the function as inputs. +Track execution metrics such as success rate and run count. -### log\_output +### log\_inputs ```python -log_output: bool | Inherited = INHERITED +log_inputs: Sequence[str] | bool | Inherited = INHERITED ``` -Whether to automatically log the result of the function as an output. +Log all, or specific, incoming arguments to the function as inputs. -### log\_params +### log\_output ```python -log_params: Sequence[str] | bool = False +log_output: bool | Inherited = INHERITED ``` -Whether to log all, or specific, incoming arguments to the function as parameters. +Log the result of the function as an output. ### name @@ -132,7 +132,6 @@ def clone(self) -> "Task[P, R]": func=self.func, scorers=[scorer.clone() for scorer in self.scorers], tags=self.tags.copy(), - log_params=self.log_params, log_inputs=self.log_inputs, log_output=self.log_output, ) @@ -296,13 +295,6 @@ async def run(self, *args: P.args, **kwargs: P.kwargs) -> TaskSpan[R]: bound_args = self._bind_args(*args, **kwargs) - params_to_log = ( - bound_args - if self.log_params is True - else {k: v for k, v in bound_args.items() if k in self.log_params} - if self.log_params is not False - else {} - ) inputs_to_log = ( bound_args if log_inputs is True @@ -311,23 +303,24 @@ async def run(self, *args: P.args, **kwargs: P.kwargs) -> TaskSpan[R]: else {} ) + # If log_inputs is inherited, filter out items that don't seem useful + # to serialize like `None` or repr fallbacks. + if isinstance(self.log_inputs, Inherited): + inputs_to_log = {k: v for k, v in inputs_to_log.items() if seems_useful_to_serialize(v)} + with TaskSpan[R]( name=self.name, label=self.label, attributes=self.attributes, - params=params_to_log, tags=self.tags, run_id=run.run_id, tracer=self.tracer, ) as span: - if run.autolog: + if self.log_execution_metrics: span.run.log_metric( "count", 1, prefix=f"{self.label}.exec", mode="count", attributes={"auto": True} ) - for name, value in params_to_log.items(): - span.log_param(name, value) - input_object_hashes: list[str] = [ span.log_input(name, value, label=f"{self.label}.input.{name}", auto=True) for name, value in inputs_to_log.items() @@ -338,7 +331,7 @@ async def run(self, *args: P.args, **kwargs: P.kwargs) -> TaskSpan[R]: if inspect.isawaitable(output): output = await output except Exception: - if run.autolog: + if self.log_execution_metrics: span.run.log_metric( "success_rate", 0, @@ -348,7 +341,7 @@ async def run(self, *args: P.args, **kwargs: P.kwargs) -> TaskSpan[R]: ) raise - if run.autolog: + if self.log_execution_metrics: span.run.log_metric( "success_rate", 1, @@ -358,7 +351,9 @@ async def run(self, *args: P.args, **kwargs: P.kwargs) -> TaskSpan[R]: ) span.output = output - if log_output: + if log_output and ( + not isinstance(self.log_inputs, Inherited) or seems_useful_to_serialize(output) + ): output_object_hash = span.log_output( "output", output, label=f"{self.label}.output", auto=True ) @@ -735,9 +730,9 @@ with_( name: str | None = None, tags: Sequence[str] | None = None, label: str | None = None, - log_params: Sequence[str] | bool | None = None, log_inputs: Sequence[str] | bool | None = None, log_output: bool | None = None, + log_execution_metrics: bool | None = None, append: bool = False, **attributes: Any, ) -> Task[P, R] @@ -767,21 +762,21 @@ Clone a task and modify its attributes. `None` ) –The new label for the task. -* **`log_params`** - (`Sequence[str] | bool | None`, default: - `None` - ) - –Whether to log all, or specific, incoming arguments to the function as parameters. * **`log_inputs`** (`Sequence[str] | bool | None`, default: `None` ) - –Whether to log all, or specific, incoming arguments to the function as inputs. + –Log all, or specific, incoming arguments to the function as inputs. * **`log_output`** (`bool | None`, default: `None` ) - –Whether to automatically log the result of the function as an output. + –Log the result of the function as an output. +* **`log_execution_metrics`** + (`bool | None`, default: + `None` + ) + –Log execution metrics such as success rate and run count. * **`append`** (`bool`, default: `False` @@ -807,9 +802,9 @@ def with_( name: str | None = None, tags: t.Sequence[str] | None = None, label: str | None = None, - log_params: t.Sequence[str] | bool | None = None, log_inputs: t.Sequence[str] | bool | None = None, log_output: bool | None = None, + log_execution_metrics: bool | None = None, append: bool = False, **attributes: t.Any, ) -> "Task[P, R]": @@ -821,9 +816,9 @@ def with_( name: The new name for the task. tags: A list of new tags to set or append to the task. label: The new label for the task. - log_params: Whether to log all, or specific, incoming arguments to the function as parameters. - log_inputs: Whether to log all, or specific, incoming arguments to the function as inputs. - log_output: Whether to automatically log the result of the function as an output. + log_inputs: Log all, or specific, incoming arguments to the function as inputs. + log_output: Log the result of the function as an output. + log_execution_metrics: Log execution metrics such as success rate and run count. append: If True, appends the new scorers and tags to the existing ones. If False, replaces them. **attributes: Additional attributes to set or update in the task. @@ -833,9 +828,13 @@ def with_( task = self.clone() task.name = name or task.name task.label = label or task.label - task.log_params = log_params if log_params is not None else task.log_params task.log_inputs = log_inputs if log_inputs is not None else task.log_inputs task.log_output = log_output if log_output is not None else task.log_output + task.log_execution_metrics = ( + log_execution_metrics + if log_execution_metrics is not None + else task.log_execution_metrics + ) new_scorers = [Scorer.from_callable(self.tracer, scorer) for scorer in (scorers or [])] new_tags = list(tags or []) diff --git a/docs/usage/data-tracking.mdx b/docs/usage/data-tracking.mdx index 4eae174f..489ac3a7 100644 --- a/docs/usage/data-tracking.mdx +++ b/docs/usage/data-tracking.mdx @@ -15,7 +15,7 @@ Parameters are lightweight key-value pairs typically used for configuration valu ```python import dreadnode as dn -@dreadnode.task(log_params=["learning_rate", "batch_size"]) +@dreadnode.task(log_inputs=["learning_rate", "batch_size"]) async def train_model(learning_rate: float, batch_size: int) -> None: # ... @@ -45,6 +45,10 @@ Parameters are stored efficiently, making it easy to filter and compare runs qui Parameters do not store multiple values over time. If you need to track changes to a parameter over the lifetime of a run, consider using the parameter inside a task and call it multiple times. + +In previous versions, parameters could be associated with both runs and tasks. This has been simplified to only allow parameters at the run level. For task-specific configurations, we recommend using task inputs. + + ## Inputs and Outputs Often times the interesting data for runs is not the execution graph, but the data that flows through the system. For rich data that you have available during execution, arguments to tasks, or results of calling functions, Strikes provides input and output storage: diff --git a/docs/usage/tasks.mdx b/docs/usage/tasks.mdx index f9d7384c..fc38c2ca 100644 --- a/docs/usage/tasks.mdx +++ b/docs/usage/tasks.mdx @@ -73,9 +73,9 @@ import dreadnode as dn @dn.task( name="File Analysis", # Human-readable name (default: function name) label="file_analysis", # Machine-readable label for grouping (default: function name) - log_params=False, # Do not log any arguments as parameters log_inputs=["path"], # Log specific arguments as inputs (True for all, False for none) log_output=True, # Log the return value as an output + log_execution_metrics=False, # Log success rate and execution count as metrics tags=["security", "static"], # Tags to categorize this task later scorers=[score_vuln] # Functions to score the output ) @@ -132,9 +132,6 @@ Within tasks, you can explicitly log data using several methods: ```python @dn.task() async def process_document(doc_id: str) -> dict: - # Log parameters (key-value pairs for configuration) - dn.log_param("batch_size", 32) - # Log input objects (structured data used by the task) document = fetch_document(doc_id) dn.log_input("document", document) diff --git a/dreadnode/main.py b/dreadnode/main.py index 8575f508..a31dc88b 100644 --- a/dreadnode/main.py +++ b/dreadnode/main.py @@ -524,9 +524,9 @@ def task( scorers: None = None, name: str | None = None, label: str | None = None, - log_params: t.Sequence[str] | bool = False, log_inputs: t.Sequence[str] | bool | Inherited = INHERITED, log_output: bool | Inherited = INHERITED, + log_execution_metrics: bool = False, tags: t.Sequence[str] | None = None, **attributes: t.Any, ) -> TaskDecorator: ... @@ -538,9 +538,9 @@ def task( scorers: t.Sequence[Scorer[R] | ScorerCallable[R]], name: str | None = None, label: str | None = None, - log_params: t.Sequence[str] | bool = False, log_inputs: t.Sequence[str] | bool | Inherited = INHERITED, log_output: bool | Inherited = INHERITED, + log_execution_metrics: bool = False, tags: t.Sequence[str] | None = None, **attributes: t.Any, ) -> ScoredTaskDecorator[R]: ... @@ -551,9 +551,9 @@ def task( scorers: t.Sequence[Scorer[t.Any] | ScorerCallable[t.Any]] | None = None, name: str | None = None, label: str | None = None, - log_params: t.Sequence[str] | bool = False, log_inputs: t.Sequence[str] | bool | Inherited = INHERITED, log_output: bool | Inherited = INHERITED, + log_execution_metrics: bool = False, tags: t.Sequence[str] | None = None, **attributes: t.Any, ) -> TaskDecorator: @@ -574,9 +574,9 @@ async def my_task(x: int) -> int: of the task and will be passed the task's output. name: The name of the task. label: The label of the task - useful for filtering in the UI. - log_params: Whether to log all, or specific, incoming arguments to the function as parameters. - log_inputs: Whether to log all, or specific, incoming arguments to the function as inputs. - log_output: Whether to log the result of the function as an output. + log_inputs: Log all, or specific, incoming arguments to the function as inputs. + log_output: Log the result of the function as an output. + log_execution_metrics: Log execution metrics for the task, such as success rate and run count. tags: A list of tags to attach to the task span. **attributes: A dictionary of attributes to attach to the task span. @@ -629,9 +629,9 @@ def make_task( for scorer in scorers or [] ], tags=list(tags or []), - log_params=log_params, log_inputs=log_inputs, log_output=log_output, + log_execution_metrics=log_execution_metrics, label=_label, ) @@ -642,7 +642,6 @@ def task_span( name: str, *, label: str | None = None, - params: AnyDict | None = None, tags: t.Sequence[str] | None = None, **attributes: t.Any, ) -> TaskSpan[t.Any]: @@ -661,7 +660,6 @@ def task_span( Args: name: The name of the task. label: The label of the task - useful for filtering in the UI. - params: A dictionary of parameters to attach to the task span. tags: A list of tags to attach to the task span. **attributes: A dictionary of attributes to attach to the task span. @@ -676,7 +674,6 @@ def task_span( name=name, label=label, attributes=attributes, - params=params, tags=tags, run_id=run.run_id, tracer=self._get_tracer(), @@ -761,7 +758,7 @@ def run( project: The project name to associate the run with. If not provided, the project passed to `configure()` will be used, or the run will be associated with a default project. - autolog: Whether to automatically log task inputs, outputs, and execution metrics if unspecified. + autolog: Whether to automatically log task inputs, outputs, and execution metrics if otherwise unspecified. **attributes: Additional attributes to attach to the run span. Returns: @@ -885,8 +882,6 @@ def log_param( self, key: str, value: JsonValue, - *, - to: ToObject = "task-or-run", ) -> None: """ Log a single parameter to the current task or run. @@ -904,14 +899,11 @@ def log_param( Args: key: The name of the parameter. value: The value of the parameter. - to: The target object to log the parameter to. Can be "task-or-run" or "run". - Defaults to "task-or-run". If "task-or-run", the parameter will be logged - to the current task or run, whichever is the nearest ancestor. """ - self.log_params(to=to, **{key: value}) + self.log_params(**{key: value}) @handle_internal_errors() - def log_params(self, to: ToObject = "run", **params: JsonValue) -> None: + def log_params(self, **params: JsonValue) -> None: """ Log multiple parameters to the current task or run. @@ -929,19 +921,11 @@ def log_params(self, to: ToObject = "run", **params: JsonValue) -> None: ``` Args: - to: The target object to log the parameters to. Can be "task-or-run" or "run". - Defaults to "task-or-run". If "task-or-run", the parameters will be logged - to the current task or run, whichever is the nearest ancestor. **params: The parameters to log. Each parameter is a key-value pair. """ - task = current_task_span.get() - run = current_run_span.get() - - target = (task or run) if to == "task-or-run" else run - if target is None: - raise RuntimeError("log_params() must be called within a run") - - target.log_params(**params) + if (run := current_run_span.get()) is None: + raise RuntimeError("Parameters must be logged within a run") + run.log_params(**params) @t.overload def log_metric( diff --git a/dreadnode/serialization.py b/dreadnode/serialization.py index 2b15a9f3..fcafc7fa 100644 --- a/dreadnode/serialization.py +++ b/dreadnode/serialization.py @@ -585,6 +585,27 @@ def _serialize(obj: t.Any, seen: set[int] | None = None) -> tuple[JsonValue, Jso } +def seems_useful_to_serialize(obj: t.Any) -> bool: + """ + Checks if the object is likely useful to serialize by attempting to + serialize it and checking if the resulting schema indicates a known type. + + Args: + obj: The Python object to check. + + Returns: + bool: True if the object is likely useful to serialize, False otherwise. + """ + if obj is None: + return False + + with contextlib.suppress(Exception): + _, schema = _serialize(obj) + return schema.get("x-python-datatype") != "unknown" + + return False + + @dataclasses.dataclass class Serialized: data: JsonValue | None diff --git a/dreadnode/task.py b/dreadnode/task.py index 8e23c908..7c381e3e 100644 --- a/dreadnode/task.py +++ b/dreadnode/task.py @@ -8,6 +8,7 @@ from opentelemetry.trace import Tracer from dreadnode.metric import Scorer, ScorerCallable +from dreadnode.serialization import seems_useful_to_serialize from dreadnode.tracing.span import Span, TaskSpan, current_run_span from dreadnode.types import INHERITED, Inherited @@ -113,12 +114,12 @@ class Task(t.Generic[P, R]): tags: list[str] "A list of tags to attach to the task span." - log_params: t.Sequence[str] | bool = False - "Whether to log all, or specific, incoming arguments to the function as parameters." log_inputs: t.Sequence[str] | bool | Inherited = INHERITED - "Whether to log all, or specific, incoming arguments to the function as inputs." + "Log all, or specific, incoming arguments to the function as inputs." log_output: bool | Inherited = INHERITED - "Whether to automatically log the result of the function as an output." + "Log the result of the function as an output." + log_execution_metrics: bool = False + "Track execution metrics such as success rate and run count." def __post_init__(self) -> None: self.__signature__ = getattr( @@ -143,7 +144,6 @@ def __get__(self, obj: t.Any, objtype: t.Any) -> "Task[P, R]": func=bound_func, scorers=[scorer.clone() for scorer in self.scorers], tags=self.tags.copy(), - log_params=self.log_params, log_inputs=self.log_inputs, log_output=self.log_output, ) @@ -169,7 +169,6 @@ def clone(self) -> "Task[P, R]": func=self.func, scorers=[scorer.clone() for scorer in self.scorers], tags=self.tags.copy(), - log_params=self.log_params, log_inputs=self.log_inputs, log_output=self.log_output, ) @@ -181,9 +180,9 @@ def with_( name: str | None = None, tags: t.Sequence[str] | None = None, label: str | None = None, - log_params: t.Sequence[str] | bool | None = None, log_inputs: t.Sequence[str] | bool | None = None, log_output: bool | None = None, + log_execution_metrics: bool | None = None, append: bool = False, **attributes: t.Any, ) -> "Task[P, R]": @@ -195,9 +194,9 @@ def with_( name: The new name for the task. tags: A list of new tags to set or append to the task. label: The new label for the task. - log_params: Whether to log all, or specific, incoming arguments to the function as parameters. - log_inputs: Whether to log all, or specific, incoming arguments to the function as inputs. - log_output: Whether to automatically log the result of the function as an output. + log_inputs: Log all, or specific, incoming arguments to the function as inputs. + log_output: Log the result of the function as an output. + log_execution_metrics: Log execution metrics such as success rate and run count. append: If True, appends the new scorers and tags to the existing ones. If False, replaces them. **attributes: Additional attributes to set or update in the task. @@ -207,9 +206,13 @@ def with_( task = self.clone() task.name = name or task.name task.label = label or task.label - task.log_params = log_params if log_params is not None else task.log_params task.log_inputs = log_inputs if log_inputs is not None else task.log_inputs task.log_output = log_output if log_output is not None else task.log_output + task.log_execution_metrics = ( + log_execution_metrics + if log_execution_metrics is not None + else task.log_execution_metrics + ) new_scorers = [Scorer.from_callable(self.tracer, scorer) for scorer in (scorers or [])] new_tags = list(tags or []) @@ -245,13 +248,6 @@ async def run(self, *args: P.args, **kwargs: P.kwargs) -> TaskSpan[R]: bound_args = self._bind_args(*args, **kwargs) - params_to_log = ( - bound_args - if self.log_params is True - else {k: v for k, v in bound_args.items() if k in self.log_params} - if self.log_params is not False - else {} - ) inputs_to_log = ( bound_args if log_inputs is True @@ -260,23 +256,24 @@ async def run(self, *args: P.args, **kwargs: P.kwargs) -> TaskSpan[R]: else {} ) + # If log_inputs is inherited, filter out items that don't seem useful + # to serialize like `None` or repr fallbacks. + if isinstance(self.log_inputs, Inherited): + inputs_to_log = {k: v for k, v in inputs_to_log.items() if seems_useful_to_serialize(v)} + with TaskSpan[R]( name=self.name, label=self.label, attributes=self.attributes, - params=params_to_log, tags=self.tags, run_id=run.run_id, tracer=self.tracer, ) as span: - if run.autolog: + if self.log_execution_metrics: span.run.log_metric( "count", 1, prefix=f"{self.label}.exec", mode="count", attributes={"auto": True} ) - for name, value in params_to_log.items(): - span.log_param(name, value) - input_object_hashes: list[str] = [ span.log_input(name, value, label=f"{self.label}.input.{name}", auto=True) for name, value in inputs_to_log.items() @@ -287,7 +284,7 @@ async def run(self, *args: P.args, **kwargs: P.kwargs) -> TaskSpan[R]: if inspect.isawaitable(output): output = await output except Exception: - if run.autolog: + if self.log_execution_metrics: span.run.log_metric( "success_rate", 0, @@ -297,7 +294,7 @@ async def run(self, *args: P.args, **kwargs: P.kwargs) -> TaskSpan[R]: ) raise - if run.autolog: + if self.log_execution_metrics: span.run.log_metric( "success_rate", 1, @@ -307,7 +304,9 @@ async def run(self, *args: P.args, **kwargs: P.kwargs) -> TaskSpan[R]: ) span.output = output - if log_output: + if log_output and ( + not isinstance(self.log_inputs, Inherited) or seems_useful_to_serialize(output) + ): output_object_hash = span.log_output( "output", output, label=f"{self.label}.output", auto=True ) diff --git a/dreadnode/tracing/span.py b/dreadnode/tracing/span.py index 37770573..665d219c 100644 --- a/dreadnode/tracing/span.py +++ b/dreadnode/tracing/span.py @@ -741,11 +741,9 @@ def __init__( tracer: Tracer, *, label: str | None = None, - params: AnyDict | None = None, metrics: MetricsDict | None = None, tags: t.Sequence[str] | None = None, ) -> None: - self._params = params or {} self._metrics = metrics or {} self._inputs: list[ObjectRef] = [] self._outputs: list[ObjectRef] = [] @@ -756,7 +754,6 @@ def __init__( attributes = { SPAN_ATTRIBUTE_RUN_ID: str(run_id), - SPAN_ATTRIBUTE_PARAMS: self._params, SPAN_ATTRIBUTE_INPUTS: self._inputs, SPAN_ATTRIBUTE_METRICS: self._metrics, SPAN_ATTRIBUTE_OUTPUTS: self._outputs, @@ -782,7 +779,6 @@ def __exit__( exc_value: BaseException | None, traceback: types.TracebackType | None, ) -> None: - self.set_attribute(SPAN_ATTRIBUTE_PARAMS, self._params) self.set_attribute(SPAN_ATTRIBUTE_INPUTS, self._inputs, schema=False) self.set_attribute(SPAN_ATTRIBUTE_METRICS, self._metrics, schema=False) self.set_attribute(SPAN_ATTRIBUTE_OUTPUTS, self._outputs, schema=False) @@ -835,16 +831,6 @@ def log_output( self._outputs.append(ObjectRef(name, label=label, hash=hash_, attributes=attributes)) return hash_ - @property - def params(self) -> AnyDict: - return self._params - - def log_param(self, key: str, value: t.Any) -> None: - self.log_params(**{key: value}) - - def log_params(self, **params: t.Any) -> None: - self._params.update(params) - @property def inputs(self) -> AnyDict: return {ref.name: self.run.get_object(ref.hash) for ref in self._inputs}