Skip to content

Unify pipeline interfaces: migrate to single class form with @pipeline decorator #901

@rickstaa

Description

@rickstaa

Problem

Two competing class interfaces exist for live pipelines:

  • Pipeline ABC: initialize, put_video_frame, get_processed_video_frame, update_params — used by ComfyUI, StreamDiffusion, Noop
  • @pipeline decorator: transform, on_ready, on_update, on_stop — duck-typed by the decorator

The decorator bridges the gap by generating a wrapper class. This means 3 ways to make a pipeline with 2 different class contracts.

Proposed solution

Collapse to one class interface as suggested by @victorges in PR #900.

  1. Rewrite Pipeline ABC to use the simple interface (transform, on_ready, on_update, on_stop, prepare_models). Only transform is abstract; rest have sane defaults.
  2. Move queue/lock management from the decorator's generated class into the framework's process loop (process.py). The process loop calls transform() directly and handles output routing.
  3. Simplify @pipeline decorator to registration only — no more generated bridge class.
  4. Migrate ComfyUI, StreamDiffusion, Noop to the new interface.

What breaks

  • All existing Pipeline subclasses — ComfyUI, StreamDiffusion, Noop, and any external pipelines subclassing Pipeline directly. The old methods (initialize, put_video_frame, get_processed_video_frame, update_params, stop) are removed and replaced with the new interface.
  • ComfyUI async/sync boundaries — the current ComfyUI pipeline uses async methods (client.set_prompts, client.get_video_output). With the new transform model (called synchronously by the framework via thread pool), these need sync equivalents or need to be awaited differently.
  • StreamDiffusion internal locking — currently manages its own _pipeline_lock to protect against concurrent transform + reload. The framework's lock replaces this, but the semantics need careful verification (e.g. _reload_pipeline sets self.pipe = None outside the lock, then reloads).
  • The _output_loop in process.py is removed — output routing merges into _input_loop. This changes the concurrency model from "input and output loops run independently" to "transform is synchronous per-frame". For ComfyUI where there's latency between input and output, this means frames are processed sequentially rather than pipelined.

Files affected

  • runner/src/runner/live/pipelines/interface.py — rewrite ABC
  • runner/src/runner/live/pipelines/create.py — simplify decorator
  • runner/src/runner/live/process/process.py — move queue/lock here
  • runner/src/runner/live/pipelines/noop.py — migrate
  • live/comfyui/pipeline/pipeline.py — migrate
  • live/streamdiffusion/pipeline/pipeline.py — migrate

Prototype

Branch feat/unified-pipeline-interface has a working prototype. Needs thorough testing — especially ComfyUI and StreamDiffusion where async/sync boundaries and locking semantics changed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions