Skip to content

Datasets: First-Class Directory Outputs #184

@daniel-thom

Description

@daniel-thom

Datasets: First-Class Directory Outputs

Created in collaboration with Opus 4.6 via Claude Code.

Problem Statement

Torc's current FileModel represents individual files as job inputs/outputs for dependency
tracking. However, many workflows produce directory-based outputs such as:

  • Hive-partitioned Parquet datasets (thousands of files)
  • Sharded model checkpoints
  • Multi-file archives or bundles

These have different semantics than individual files:

  1. Multiple jobs contribute to one dataset - Parameterized jobs write files to the same
    directory structure, often embedding job IDs in filenames for uniqueness
  2. Completion is aggregate - The dataset isn't "complete" until all contributing jobs finish
  3. File counts are unpredictable - Due to compaction, dynamic partitioning, etc.
  4. Integrity verification is different - Hashing thousands of files individually is impractical;
    a manifest-based approach is preferred

Long-Term Vision

DatasetModel

A new database table to represent directory-based outputs:

CREATE TABLE datasets (
    id INTEGER PRIMARY KEY,
    workflow_id INTEGER NOT NULL REFERENCES workflows(id) ON DELETE CASCADE,
    name TEXT NOT NULL,
    path TEXT NOT NULL,
    description TEXT,

    -- Computed when dataset is finalized:
    file_count INTEGER,
    total_size_bytes INTEGER,
    manifest_hash TEXT,
    hash_mode TEXT,  -- 'manifest', 'content', or 'none'
    finalized_at REAL,

    UNIQUE(workflow_id, name)
);

-- Track which jobs contribute to which datasets
CREATE TABLE job_dataset_outputs (
    job_id INTEGER NOT NULL REFERENCES jobs(id) ON DELETE CASCADE,
    dataset_id INTEGER NOT NULL REFERENCES datasets(id) ON DELETE CASCADE,
    PRIMARY KEY (job_id, dataset_id)
);

CREATE TABLE job_dataset_inputs (
    job_id INTEGER NOT NULL REFERENCES jobs(id) ON DELETE CASCADE,
    dataset_id INTEGER NOT NULL REFERENCES datasets(id) ON DELETE CASCADE,
    PRIMARY KEY (job_id, dataset_id)
);

Workflow Specification Syntax

Datasets would be defined similarly to files, with analogous reference syntax:

name: distributed_training
enable_ro_crate: true
ro_crate_hash_mode: manifest  # default for datasets

datasets:
  - name: training_output
    path: output/training.parquet/
    description: "Hive-partitioned training results"

  - name: checkpoints
    path: output/checkpoints/

files:
  - name: config
    path: input/config.json
    st_mtime: 1709567890

jobs:
  # Parameterized jobs write to the dataset
  - name: train_chunk_{i}
    command: >
      python train.py
        --config ${files.input.config}
        --chunk {i}
        --output ${datasets.output.training_output}/chunk_{i}/
    parameters:
      i: "0:99"

  # Aggregation job depends on all chunks completing
  - name: aggregate_results
    command: >
      python aggregate.py
        --input ${datasets.input.training_output}
        --output ${files.output.summary}

Dependency Semantics

Dataset dependencies follow the same pattern as file dependencies:

  • ${datasets.output.X} - Job writes to dataset X (recorded in job_dataset_outputs)
  • ${datasets.input.X} - Job reads from dataset X (recorded in job_dataset_inputs)

Dependency resolution:

  • A job using ${datasets.input.X} depends on all jobs that use ${datasets.output.X}
  • This naturally handles fan-in from parameterized jobs

Example:

train_chunk_0  ─┐
train_chunk_1  ─┼─→ training_output ─→ aggregate_results
train_chunk_2  ─┤
...            ─┘

Trait Abstraction

To avoid code duplication, files and datasets should share a common interface:

/// Common behavior for workflow artifacts (files and datasets)
pub trait WorkflowArtifact {
    fn id(&self) -> i64;
    fn workflow_id(&self) -> i64;
    fn name(&self) -> &str;
    fn path(&self) -> &str;

    /// Check if the artifact exists on the filesystem
    fn exists(&self) -> bool;

    /// Get the artifact type for RO-Crate
    fn ro_crate_type(&self) -> &'static str;

    /// Build RO-Crate metadata for this artifact
    fn build_ro_crate_metadata(&self, hash_mode: HashMode) -> serde_json::Value;
}

impl WorkflowArtifact for FileModel { /* ... */ }
impl WorkflowArtifact for DatasetModel { /* ... */ }

Implementation Scope

This is a significant change touching:

Server:

  • New datasets table and API endpoints
  • Dependency resolution in unblock_jobs_waiting_for must handle dataset dependencies
  • Job initialization must resolve dataset references

Client:

  • workflow_spec.rs - Parse datasets section, handle ${datasets.*} references
  • workflow_manager.rs - Create datasets, track contributions, finalize on completion
  • job_runner.rs - No changes needed (jobs just write to paths)
  • RO-Crate export - Include Dataset entities

Migration:

  • Database migration for new tables
  • Existing workflows unaffected (no datasets defined)

Short-Term Solution

Until the full implementation is complete, a CLI command provides manual dataset support:

torc ro-crate add-dataset \
  --workflow-id 123 \
  --name training_output \
  --path output/training.parquet/ \
  --hash-mode manifest

This allows users to:

  1. Run their workflow (with manual dependency management)
  2. After completion, add Dataset entities to the RO-Crate metadata

See the torc ro-crate add-dataset command for details.

Hash Modes

Three modes for dataset integrity verification:

manifest (recommended for large datasets)

Hash a sorted list of (relative_path, size, mtime) tuples:

file1.parquet|1048576|1709567890.123
file2.parquet|2097152|1709567891.456
...
  • Fast: Only reads filesystem metadata, not file contents
  • Detects: File additions, deletions, size changes, modification time changes
  • Does not detect: Content changes without size/mtime change (rare)

content

Compute SHA256 of all file contents (Merkle tree style):

  • Thorough: Detects any content change
  • Slow: Must read all file contents
  • Use for: Small datasets where integrity is critical

none

No hash computed. Only record file count and total size.

  • Fastest: Just stat() calls
  • Use for: Ephemeral datasets, very large datasets where hashing is impractical

RO-Crate Representation

Datasets are represented as schema:Dataset entities:

{
  "@id": "output/training.parquet/",
  "@type": "Dataset",
  "name": "training_output",
  "description": "Hive-partitioned training results",
  "contentSize": 15032385536,
  "fileCount": 2847,
  "sha256": "a1b2c3...",
  "hashMode": "manifest",
  "encodingFormat": "application/vnd.apache.parquet",
  "wasGeneratedBy": [
    {"@id": "#job-1-attempt-1"},
    {"@id": "#job-2-attempt-1"},
    ...
  ]
}

Open Questions

  1. Partial dataset reads - Can a job depend on a subset of a dataset? (Probably not needed
    initially)

  2. Dataset versioning - If a workflow is restarted and jobs re-run, how do we handle the
    dataset? This could be deferred to the user application.

  3. External datasets - Can a dataset be an input that exists before the workflow runs? (Yes,
    similar to input files with st_mtime set)

  4. Nested datasets - Can datasets contain subdatasets? (Probably not needed)

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions