Problem
Two competing class interfaces exist for live pipelines:
Pipeline ABC: initialize, put_video_frame, get_processed_video_frame, update_params — used by ComfyUI, StreamDiffusion, Noop
@pipeline decorator: transform, on_ready, on_update, on_stop — duck-typed by the decorator
The decorator bridges the gap by generating a wrapper class. This means 3 ways to make a pipeline with 2 different class contracts.
Proposed solution
Collapse to one class interface as suggested by @victorges in PR #900.
- Rewrite
Pipeline ABC to use the simple interface (transform, on_ready, on_update, on_stop, prepare_models). Only transform is abstract; rest have sane defaults.
- Move queue/lock management from the decorator's generated class into the framework's process loop (
process.py). The process loop calls transform() directly and handles output routing.
- Simplify
@pipeline decorator to registration only — no more generated bridge class.
- Migrate ComfyUI, StreamDiffusion, Noop to the new interface.
What breaks
- All existing
Pipeline subclasses — ComfyUI, StreamDiffusion, Noop, and any external pipelines subclassing Pipeline directly. The old methods (initialize, put_video_frame, get_processed_video_frame, update_params, stop) are removed and replaced with the new interface.
- ComfyUI async/sync boundaries — the current ComfyUI pipeline uses async methods (
client.set_prompts, client.get_video_output). With the new transform model (called synchronously by the framework via thread pool), these need sync equivalents or need to be awaited differently.
- StreamDiffusion internal locking — currently manages its own
_pipeline_lock to protect against concurrent transform + reload. The framework's lock replaces this, but the semantics need careful verification (e.g. _reload_pipeline sets self.pipe = None outside the lock, then reloads).
- The
_output_loop in process.py is removed — output routing merges into _input_loop. This changes the concurrency model from "input and output loops run independently" to "transform is synchronous per-frame". For ComfyUI where there's latency between input and output, this means frames are processed sequentially rather than pipelined.
Files affected
runner/src/runner/live/pipelines/interface.py — rewrite ABC
runner/src/runner/live/pipelines/create.py — simplify decorator
runner/src/runner/live/process/process.py — move queue/lock here
runner/src/runner/live/pipelines/noop.py — migrate
live/comfyui/pipeline/pipeline.py — migrate
live/streamdiffusion/pipeline/pipeline.py — migrate
Prototype
Branch feat/unified-pipeline-interface has a working prototype. Needs thorough testing — especially ComfyUI and StreamDiffusion where async/sync boundaries and locking semantics changed.
Problem
Two competing class interfaces exist for live pipelines:
PipelineABC:initialize,put_video_frame,get_processed_video_frame,update_params— used by ComfyUI, StreamDiffusion, Noop@pipelinedecorator:transform,on_ready,on_update,on_stop— duck-typed by the decoratorThe decorator bridges the gap by generating a wrapper class. This means 3 ways to make a pipeline with 2 different class contracts.
Proposed solution
Collapse to one class interface as suggested by @victorges in PR #900.
PipelineABC to use the simple interface (transform,on_ready,on_update,on_stop,prepare_models). Onlytransformis abstract; rest have sane defaults.process.py). The process loop callstransform()directly and handles output routing.@pipelinedecorator to registration only — no more generated bridge class.What breaks
Pipelinesubclasses — ComfyUI, StreamDiffusion, Noop, and any external pipelines subclassingPipelinedirectly. The old methods (initialize,put_video_frame,get_processed_video_frame,update_params,stop) are removed and replaced with the new interface.client.set_prompts,client.get_video_output). With the newtransformmodel (called synchronously by the framework via thread pool), these need sync equivalents or need to be awaited differently._pipeline_lockto protect against concurrent transform + reload. The framework's lock replaces this, but the semantics need careful verification (e.g._reload_pipelinesetsself.pipe = Noneoutside the lock, then reloads)._output_loopin process.py is removed — output routing merges into_input_loop. This changes the concurrency model from "input and output loops run independently" to "transform is synchronous per-frame". For ComfyUI where there's latency between input and output, this means frames are processed sequentially rather than pipelined.Files affected
runner/src/runner/live/pipelines/interface.py— rewrite ABCrunner/src/runner/live/pipelines/create.py— simplify decoratorrunner/src/runner/live/process/process.py— move queue/lock hererunner/src/runner/live/pipelines/noop.py— migratelive/comfyui/pipeline/pipeline.py— migratelive/streamdiffusion/pipeline/pipeline.py— migratePrototype
Branch
feat/unified-pipeline-interfacehas a working prototype. Needs thorough testing — especially ComfyUI and StreamDiffusion where async/sync boundaries and locking semantics changed.