Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions docs/usage/runs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,46 @@ with dn.run("risky-experiment"):
dn.log_metric("error", 1.0)
```

## Task Hierarchy and Relationships

Every run maintains a list of its top-level tasks (tasks that are called directly within the run context, not as children of other tasks):

```python
with dn.run("data-processing") as run:
# These tasks are added to the run's task list
cleaning_result = await clean_data.run(raw_data)
analysis_result = await analyze_data.run(cleaned_data)

# Access all top-level tasks in the run
print(f"Run has {len(run.tasks)} top-level tasks")

for task in run.tasks:
print(f"Task: {task}")
print(f"Run: {task.run}")
print(f"Child tasks: {len(task.tasks)}")
```

You can recursively gather all tasks within a run using `.all_tasks` and perform analysis on them:

```python
with dn.run("comprehensive-analysis") as run:
await complex_workflow(data)

all_tasks = run.all_tasks
print(f"Run '{run.run_id}' executed {len(all_tasks)} total tasks")

# Analyze execution times, success rates, etc.
successful_tasks = [t for t in all_tasks if not t.failed]
success_rate = len(successful_tasks) / len(all_tasks) if all_tasks else 0.0
print(f"Success rate: {success_rate * 100:.1f}%")
dn.log_metric("success_rate", success_rate)
```

## Best Practices

1. **Use meaningful names**: Give your runs descriptive names that indicate their purpose.
2. **Use parameters**: Parameters are a great way to filter and compare runs later, so use them frequently.
3. **Create separate runs for separate experiments**: Don't try to jam multiple experiments into a single run—you can create multiple runs inside your code.
4. **Use projects for organization**: Group related runs into projects.
5. **Create comparison runs**: When testing different approaches, ensure parameters and metrics are consistent to enable meaningful comparison.
6. **Leverage task hierarchy**: Organize complex workflows using hierarchical tasks within runs to maintain clear execution structure.
44 changes: 43 additions & 1 deletion docs/usage/tasks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,53 @@ When this task logs a metric named `token_count`, that metric is:
1. Stored with the task span as `token_count`
2. Mirrored at the run level with the prefix `tokenize.token_count`

## Task Hierarchy and Relationships

Every task maintains relationships with its parent task (if any) and subtasks (if any). These relationships are automatically established when tasks are called within other tasks:

```python
import dreadnode as dn

@dn.task()
async def process(data: str) -> str:
return f"processed: {data}"

@dn.task()
async def finalize(data: str) -> str:
return f"finalized: {data}"

@dn.task()
async def parent_task(data: str) -> str:
processed = await process(data)
finalized = await finalize(processed)
return finalized

with dn.run("workflow-example"):
parent = await parent_task.run("input_data")

print(len(parent.tasks)) # 2

# Iterate through child tasks
for task in parent.tasks:
print(f"{task!r}")

# TaskSpan(name='process', label='process', run_id='...', parent_task='...', ...)
# TaskSpan(name='finalize', label='finalize', run_id='...', parent_task='...', ...)
```

The available hierarchy properties include:

- **`task_span.tasks`**: List of child `TaskSpan` objects
- **`task_span.all_tasks`**: Flat list of all tasks under this task, including subtasks
- **`task_span.parent_task`**: Reference to parent `TaskSpan` (or `None` for top-level tasks)
- **`task_span.run`**: Reference to the `RunSpan` containing this task

## Best Practices

1. **Keep tasks focused**: Each task should do one thing well, making it easier to trace and debug.
2. **Use meaningful names**: Task names appear in the UI, so make them human-readable.
3. **Log relevant data**: Be intentional about what you log as inputs, outputs, and metrics.
4. **Handle errors appropriately**: Use `try_run()` and similar methods to handle task failures gracefully.
5. **Use tasks to structure your code**: Tasks help create natural boundaries in your application.
6. **Combine with [Rigging tools](/open-source/rigging/topics/tools)**: Tasks work seamlessly with Rigging tools for LLM agents.
6. **Leverage task hierarchy**: Use parent-child relationships to organize complex workflows and enable detailed analysis.
7. **Combine with [Rigging tools](/open-source/rigging/topics/tools)**: Tasks work seamlessly with Rigging tools for LLM agents.
6 changes: 6 additions & 0 deletions dreadnode/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from dreadnode import convert, data_types
from dreadnode.data_types import Audio, Image, Object3D, Table, Video
from dreadnode.main import DEFAULT_INSTANCE, Dreadnode
from dreadnode.metric import Metric, MetricDict, Scorer
Expand Down Expand Up @@ -33,6 +34,7 @@
__version__ = VERSION

__all__ = [
"DEFAULT_INSTANCE",
"Audio",
"Dreadnode",
"Image",
Expand All @@ -51,6 +53,10 @@
"__version__",
"api",
"configure",
"continue_run",
"convert",
"data_types",
"get_run_context",
"link_objects",
"log_artifact",
"log_input",
Expand Down
42 changes: 42 additions & 0 deletions dreadnode/convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import typing as t

if t.TYPE_CHECKING:
import networkx as nx # type: ignore [import-untyped]

from dreadnode.tracing.span import RunSpan


def run_span_to_graph(run: "RunSpan") -> "nx.DiGraph":
try:
import networkx as nx
except ImportError as e:
raise RuntimeError("The `networkx` package is required for graph conversion") from e

graph = nx.DiGraph()

graph.add_node(
run.run_id,
name=run.name,
label=run.label,
start_time=run.start_time,
end_time=run.end_time,
duration=run.duration,
status="failed" if run.failed else "running" if run.is_recording else "completed",
tags=run.tags,
)

for task in run.all_tasks:
graph.add_node(
task.span_id,
name=task.name,
label=task.label,
start_time=task.start_time,
end_time=task.end_time,
duration=task.duration,
status="failed" if task.failed else "running" if task.active else "completed",
tags=task.tags,
)

graph.add_edge(task.parent_task.span_id if task.parent_task else run.run_id, task.span_id)

return graph
Loading