diff --git a/config/test.exs b/config/test.exs index 772812f..7c18137 100644 --- a/config/test.exs +++ b/config/test.exs @@ -6,4 +6,6 @@ config :shinkai, :server, enabled: false config :shinkai, :rtmp, port: 0 +config :shinkai, :rtsp, enabled: false + config :shinkai, :hls, storage_dir: "tmp" diff --git a/lib/shinkai.ex b/lib/shinkai.ex index f1d2742..b3ddf16 100644 --- a/lib/shinkai.ex +++ b/lib/shinkai.ex @@ -15,10 +15,7 @@ defmodule Shinkai do To configure the http server responsible for serving HLS streams. - * `enabled` - Enable or disable the HTTP server. - * `port` - Port number for the HTTP server. - * `certfile` - Path to the SSL certificate file (optional). - * `keyfile` - Path to the SSL key file (optional). + #{NimbleOptions.docs(Shinkai.Config.server_schema())} ```elixir config :shinkai, :server, @@ -40,12 +37,7 @@ defmodule Shinkai do To configure HLS streaming options. - * `storage_dir` - Directory to store HLS segments. - * `max_segments` - Maximum number of segments to keep. - * `segment_duration` - Segment duration in milliseconds. - * `part_duration` - Part duration in milliseconds. - * `segment_type` - Type of segments to generate, either `fmp4`, - `mpeg_ts`, or `low_latency`. + #{NimbleOptions.docs(Shinkai.Config.hls_schema())} ```elixir config :shinkai, :hls, @@ -66,8 +58,8 @@ defmodule Shinkai do ### RTMP To configure the RTMP server. - * `enabled` - Enable or disable the RTMP server. - * `port` - Port number for the RTMP server. + + #{NimbleOptions.docs(Shinkai.Config.rtmp_schema())} ```elixir config :shinkai, :rtmp, @@ -81,6 +73,24 @@ defmodule Shinkai do port: 1935 # Port number for the RTMP server (default: 1935) ``` + ### RTSP + + To configure the RTSP server. + + #{NimbleOptions.docs(Shinkai.Config.rtsp_schema())} + + ```elixir + config :shinkai, :rtsp, + enabled: true, + port: 8554 + ``` + + ```yaml + rtsp: + enabled: true # Enable or disable the RTSP server (default: true) + port: 8554 # Port number for the RTSP server (default: 8554) + ``` + ### Paths To configure media source paths. Each source should have a unique alphanumeric ID. diff --git a/lib/shinkai/application.ex b/lib/shinkai/application.ex index 27b43c2..0d4b0cf 100644 --- a/lib/shinkai/application.ex +++ b/lib/shinkai/application.ex @@ -16,8 +16,7 @@ defmodule Shinkai.Application do {Sources.PublishManager, []}, {Registry, name: Sink.Registry, keys: :duplicate}, {Registry, name: Source.Registry, keys: :unique}, - {Task, fn -> Sources.start_all() end}, - {RTSP.Server, handler: Sources.RTSP.Handler, port: 8554} + {Task, fn -> Sources.start_all() end} ] children = @@ -27,6 +26,13 @@ defmodule Shinkai.Application do children end + children = + if config[:rtsp][:enabled] do + children ++ [{RTSP.Server, handler: Sources.RTSP.Handler, port: config[:rtsp][:port]}] + else + children + end + children = if Code.ensure_loaded?(Bandit) and config[:server][:enabled] do children ++ [{Bandit, configure_bandit(config[:server])}] diff --git a/lib/shinkai/config.ex b/lib/shinkai/config.ex index a147e19..2416cee 100644 --- a/lib/shinkai/config.ex +++ b/lib/shinkai/config.ex @@ -3,28 +3,107 @@ defmodule Shinkai.Config do use GenServer - @top_level_keys [:rtmp, :server, :hls] + @top_level_keys [:rtmp, :server, :hls, :rtsp] - @default_config [ - rtmp: [ - enabled: true, - port: 1935 + @rtmp_schema [ + enabled: [ + type: :boolean, + default: true, + doc: "Enable or disable rtmp" ], - server: [ - enabled: true, - port: 8888, - certfile: nil, - keyfile: nil + port: [ + type: {:in, 0..(2 ** 16 - 1)}, + default: 1935, + doc: "RTMP listening port", + type_doc: "`t::socket.port_number/0`", + type_spec: quote(do: :socket.port_number()) + ] + ] + + @rtsp_schema [ + enabled: [ + type: :boolean, + default: true, + doc: "Enable or disable rtsp" + ], + port: [ + type: {:in, 0..(2 ** 16 - 1)}, + default: 8554, + doc: "RTSP listening port", + type_doc: "`t::socket.port_number/0`", + type_spec: quote(do: :socket.port_number()) + ] + ] + + @server_schema [ + enabled: [ + type: :boolean, + default: true, + doc: "Enable or disable http(s) server" + ], + port: [ + type: {:in, 0..(2 ** 16 - 1)}, + default: 8888, + doc: "http port", + type_doc: "`t::socket.port_number/0`", + type_spec: quote(do: :socket.port_number()) + ], + certfile: [ + type: {:or, [:string, nil]}, + default: nil, + doc: "https certificate" + ], + keyfile: [ + type: {:or, [:string, nil]}, + default: nil, + doc: "https private key certificate" + ] + ] + + @hls_schema [ + storage_dir: [ + type: :string, + default: "/tmp/shinkai/hls", + doc: "Directory to store HLS segments" + ], + max_segments: [ + type: :non_neg_integer, + default: 7, + doc: "Max segments to keep in live playlists" + ], + segment_duration: [ + type: :non_neg_integer, + default: 2000, + doc: "Segment duration in milliseconds" ], - hls: [ - storage_dir: "/tmp/shinkai/hls", - max_segments: 7, - segment_duration: 2_000, - part_duration: 500, - segment_type: :fmp4 + part_duration: [ + type: :non_neg_integer, + default: 300, + doc: "Part duration in milliseconds for low-latency HLS" + ], + segment_type: [ + type: {:custom, __MODULE__, :validate_hls_segment_type, []}, + default: :fmp4, + doc: "Type of segments to generate, either `:fmp4`, `:mpeg_ts` or `:low_latency`" ] ] + @doc false + @spec server_schema() :: keyword() + def server_schema, do: @server_schema + + @doc false + @spec rtmp_schema() :: keyword() + def rtmp_schema, do: @rtmp_schema + + @doc false + @spec hls_schema() :: keyword() + def hls_schema, do: @hls_schema + + @doc false + @spec rtsp_schema() :: keyword() + def rtsp_schema, do: @rtsp_schema + def start_link(config) do GenServer.start_link(__MODULE__, config, name: __MODULE__) end @@ -48,9 +127,14 @@ defmodule Shinkai.Config do app_configs = Enum.map(@top_level_keys, &{&1, Application.get_env(:shinkai, &1, [])}) - Enum.map(@default_config, fn {key, config} -> + app_configs = + @top_level_keys + |> Enum.map(&{&1, []}) + |> Keyword.merge(app_configs) + |> parse_and_validate() + + Enum.map(app_configs, fn {key, config} -> config - |> Keyword.merge(app_configs[key]) |> Keyword.merge(user_config[key] || []) |> then(&{key, &1}) end) @@ -95,129 +179,41 @@ defmodule Shinkai.Config do defp parse_and_validate([], acc), do: acc - defp parse_and_validate([{:hls, hls_config} | rest], acc) do - hls_config = parse_and_validate_hls(hls_config) - parse_and_validate(rest, [{:hls, hls_config} | acc]) - end - - defp parse_and_validate([{:server, server_config} | rest], acc) do - server_config = parse_and_validate_server(server_config) - parse_and_validate(rest, [{:server, server_config} | acc]) - end - - defp parse_and_validate([{:rtmp, rtmp_config} | rest], acc) do - rtmp_config = parse_and_validate_rtmp(rtmp_config) - parse_and_validate(rest, [{:rtmp, rtmp_config} | acc]) - end - - defp parse_and_validate_hls(config, acc \\ []) - - defp parse_and_validate_hls(nil, _acc), do: [] - defp parse_and_validate_hls([], acc), do: acc - - defp parse_and_validate_hls(config, acc) when is_map(config) do - parse_and_validate_hls(Map.to_list(config), acc) - end - - defp parse_and_validate_hls([{:segment_type, value} | rest], acc) - when value in [:fmp4, :mpeg_ts, :low_latency] do - parse_and_validate_hls(rest, [{:segment_type, value} | acc]) - end - - defp parse_and_validate_hls([{"segment_type", value} | rest], acc) - when value in ["fmp4", "mpeg_ts", "low_latency"] do - parse_and_validate_hls(rest, [{:segment_type, String.to_atom(value)} | acc]) - end - - defp parse_and_validate_hls([{key, value} | rest], acc) - when key in ["segment_duration", :segment_duration] and is_integer(value) and value >= 1000 do - parse_and_validate_hls(rest, [{:segment_duration, value} | acc]) - end - - defp parse_and_validate_hls([{key, value} | rest], acc) - when key in ["max_segments", :max_segments] and is_integer(value) and value > 3 do - parse_and_validate_hls(rest, [{:max_segments, value} | acc]) - end - - defp parse_and_validate_hls([{key, value} | rest], acc) - when key in ["part_duration", :part_duration] and is_integer(value) and value >= 100 and - value < 1000 do - parse_and_validate_hls(rest, [{:part_duration, value} | acc]) - end - - defp parse_and_validate_hls([{key, value} | rest], acc) - when key in ["storage_dir", :storage_dir] do - parse_and_validate_hls(rest, [{:storage_dir, value} | acc]) - end - - defp parse_and_validate_hls([{key, value} | _rest], _acc) do - raise ArgumentError, """ - Invalid HLS configuration key or value detected. - Key: #{inspect(key)}, Value: #{inspect(value)}. - """ - end + defp parse_and_validate([{key, config} | rest], acc) do + config = + case key do + :hls -> do_parse_and_validate(config, @hls_schema) + :server -> do_parse_and_validate(config, @server_schema) + :rtmp -> do_parse_and_validate(config, @rtmp_schema) + :rtsp -> do_parse_and_validate(config, @rtsp_schema) + end - defp parse_and_validate_hls(config, _acc) do - raise ArgumentError, """ - Invalid HLS configuration format detected. - Config: #{inspect(config)}. - """ + parse_and_validate(rest, [{key, config} | acc]) end - # HTTP server - defp parse_and_validate_server(config, acc \\ []) - defp parse_and_validate_server(nil, _acc), do: [] - defp parse_and_validate_server([], acc), do: acc + defp do_parse_and_validate(config, schema) do + config = config || [] - defp parse_and_validate_server(config, acc) when is_map(config) do - parse_and_validate_server(Map.to_list(config), acc) - end - - defp parse_and_validate_server([{key, value} | rest], acc) - when key in ["enabled", :enabled] and is_boolean(value) do - parse_and_validate_server(rest, [{:enabled, value} | acc]) - end - - defp parse_and_validate_server([{key, value} | rest], acc) - when key in [:port, "port"] and is_integer(value) and value > 0 and value < 65_536 do - parse_and_validate_server(rest, [{:port, value} | acc]) - end - - defp parse_and_validate_server([{key, value} | rest], acc) - when key in ["certfile", "keyfile", :certfile, :keyfile] do - parse_and_validate_server(rest, [{String.to_atom(key), value} | acc]) - end + cond do + Keyword.keyword?(config) -> + NimbleOptions.validate!(config, schema) - defp parse_and_validate_server([{key, value} | _rest], _acc) do - raise ArgumentError, """ - Invalid Server configuration key or value detected. - Key: #{inspect(key)}, Value: #{inspect(value)}. - """ - end + is_map(config) -> + config + |> Keyword.new(fn {k, v} -> {String.to_existing_atom(k), v} end) + |> NimbleOptions.validate!(schema) - # RTMP - defp parse_and_validate_rtmp(config, acc \\ []) - defp parse_and_validate_rtmp(nil, _acc), do: [] - defp parse_and_validate_rtmp([], acc), do: acc - - defp parse_and_validate_rtmp(config, acc) when is_map(config) do - parse_and_validate_rtmp(Map.to_list(config), acc) - end - - defp parse_and_validate_rtmp([{key, value} | rest], acc) - when key in ["enabled", :enabled] and is_boolean(value) do - parse_and_validate_rtmp(rest, [{:enabled, value} | acc]) - end - - defp parse_and_validate_rtmp([{key, value} | rest], acc) - when key in [:port, "port"] and is_integer(value) and value > 0 and value < 65_536 do - parse_and_validate_rtmp(rest, [{:port, value} | acc]) + true -> + raise ArgumentError, "Expected a map or keyword list received: #{inspect(config)}" + end end - defp parse_and_validate_rtmp([{key, value} | _rest], _acc) do - raise ArgumentError, """ - Invalid RTMP configuration key or value detected. - Key: #{inspect(key)}, Value: #{inspect(value)}. - """ + @doc false + def validate_hls_segment_type(value) do + cond do + value in [:mpeg_ts, :fmp4, :low_latency] -> {:ok, value} + value in ["mpeg_ts", "fmp4", "low_latency"] -> {:ok, String.to_atom(value)} + true -> {:error, value} + end end end diff --git a/test/shinkai/config_test.exs b/test/shinkai/config_test.exs index c79b702..f3a95da 100644 --- a/test/shinkai/config_test.exs +++ b/test/shinkai/config_test.exs @@ -15,12 +15,12 @@ defmodule Shinkai.ConfigTest do config = Config.validate(user_config) - assert Keyword.keys(config) == [:rtmp, :server, :hls] + assert Keyword.keys(config) |> Enum.sort() == [:hls, :rtmp, :rtsp, :server] assert %{ segment_type: :low_latency, segment_duration: 2000, - part_duration: 500, + part_duration: 300, max_segments: 10, storage_dir: "/var/shinkai/hls" } == Map.new(config[:hls]) @@ -36,13 +36,13 @@ defmodule Shinkai.ConfigTest do test "raise on invalid values" do user_config = %{"hls" => %{"segment_type" => "unknown_type"}} - assert_raise ArgumentError, ~r/Invalid HLS configuration/, fn -> + assert_raise NimbleOptions.ValidationError, ~r/invalid value for :segment_type/, fn -> Config.validate(user_config) end user_config = %{"hls" => %{"unknown_key" => 1}} - assert_raise ArgumentError, ~r/Invalid HLS configuration/, fn -> + assert_raise ArgumentError, ~r/not an already existing atom/, fn -> Config.validate(user_config) end @@ -54,7 +54,7 @@ defmodule Shinkai.ConfigTest do user_config = %{"hls" => "not_a_map"} - assert_raise ArgumentError, ~r/Invalid HLS configuration format/, fn -> + assert_raise ArgumentError, ~r/Expected a map or keyword list received/, fn -> Config.validate(user_config) end end diff --git a/test/shinkai/pipeline_test.exs b/test/shinkai/pipeline_test.exs index 51adb95..5ea7611 100644 --- a/test/shinkai/pipeline_test.exs +++ b/test/shinkai/pipeline_test.exs @@ -14,6 +14,11 @@ defmodule Shinkai.PipelineTest do "test/fixtures/big_buck_avc_aac.mp4" ] + setup do + {:ok, rtsp_server} = RTSP.Server.start_link(port: 0, handler: Shinkai.Sources.RTSP.Handler) + {:ok, rtsp_server: rtsp_server} + end + for fixture <- @fixtures do describe "hls sink: #{fixture}" do test "Stream from rtsp" do @@ -38,11 +43,14 @@ defmodule Shinkai.PipelineTest do on_exit(fn -> File.rm_rf!(hls_path) end) end - test "Stream from rtsp publish", %{tmp_dir: _dir} do + test "Stream from rtsp publish", %{rtsp_server: server} do id = UUID.uuid4() Phoenix.PubSub.subscribe(Shinkai.PubSub, Utils.sink_topic(id)) - Publisher.new("rtsp://localhost:8554/#{id}", unquote(fixture)) + {:ok, port} = RTSP.Server.port_number(server) + + "rtsp://localhost:#{port}/#{id}" + |> Publisher.new(unquote(fixture)) |> Publisher.publish() assert_receive {:hls, :done}, 5_000