I've had trouble searching through the docs for info on how to gracefully terminate a pipeline after all its elements received an EOS. Example:
Mix.install([
{:membrane_generator_plugin, ">= 0.0.0"},
{:membrane_core, "~> 1.0"}
])
defmodule MyPipeline do
use Membrane.Pipeline
@impl true
def handle_init(_ctx, _opts) do
spec = [
child(
:generator,
%Membrane.SilenceGenerator{
stream_format: %Membrane.RawAudio{
channels: 1,
sample_rate: 16_000,
sample_format: :s16le
},
duration: Membrane.Time.milliseconds(100)
}
)
|> child(:sink, Membrane.Fake.Sink)
]
{[spec: spec], nil}
end
end
{:ok, supervisor, pipeline} = Membrane.Pipeline.start_link(MyPipeline, [])
Process.monitor(supervisor)
defmodule Loop do
def loop() do
receive do
{:DOWN, _ref, :process, _pid, _reason} ->
IO.puts("Pipeline down")
other ->
IO.inspect(other, label: "received unknown message")
end
loop()
end
end
Loop.loop()
The solution was to include def handle_element_end_of_stream(:sink, _ctx, state), do: {[terminate: :normal], state} - this would be triggered when the sink receives an EOS, which for this simple case would mean :end_of_stream messages covered the entire pipeline. This might not be the case for more complex pipeline topologies. I was under the impression the framework would automatically detect and terminate when all elements receive EOS, so a section in the guides going through the process of adding termination mechanisms would provide DX value.
I've had trouble searching through the docs for info on how to gracefully terminate a pipeline after all its elements received an EOS. Example:
The solution was to include
def handle_element_end_of_stream(:sink, _ctx, state), do: {[terminate: :normal], state}- this would be triggered when the sink receives an EOS, which for this simple case would mean:end_of_streammessages covered the entire pipeline. This might not be the case for more complex pipeline topologies. I was under the impression the framework would automatically detect and terminate when all elements receive EOS, so a section in the guides going through the process of adding termination mechanisms would provide DX value.