diff --git a/lib/shinkai/application.ex b/lib/shinkai/application.ex index 803f151..27b43c2 100644 --- a/lib/shinkai/application.ex +++ b/lib/shinkai/application.ex @@ -16,7 +16,8 @@ defmodule Shinkai.Application do {Sources.PublishManager, []}, {Registry, name: Sink.Registry, keys: :duplicate}, {Registry, name: Source.Registry, keys: :unique}, - {Task, fn -> Sources.start_all() end} + {Task, fn -> Sources.start_all() end}, + {RTSP.Server, handler: Sources.RTSP.Handler, port: 8554} ] children = diff --git a/lib/shinkai/sink/hls.ex b/lib/shinkai/sink/hls.ex index 0661846..9a44803 100644 --- a/lib/shinkai/sink/hls.ex +++ b/lib/shinkai/sink/hls.ex @@ -42,10 +42,13 @@ defmodule Shinkai.Sink.Hls do File.rm_rf!(hls_config[:storage_dir]) :ok = Phoenix.PubSub.subscribe(Shinkai.PubSub, tracks_topic(id)) + :ok = Phoenix.PubSub.subscribe(Shinkai.PubSub, packets_topic(id)) :ok = Phoenix.PubSub.subscribe(Shinkai.PubSub, state_topic(id)) {:ok, _} = RequestHolder.start_link(:"request_holder_#{id}") + Process.flag(:trap_exit, true) + {:ok, %{ writer: Writer.new!(hls_config), @@ -71,6 +74,8 @@ defmodule Shinkai.Sink.Hls do ) end + Logger.info("[#{state.source_id}] [hls] start muxing") + audio_track = Enum.find(hls_tracks, fn t -> t.type == :audio end) video_track = Enum.find(hls_tracks, fn t -> t.type == :video end) @@ -89,7 +94,10 @@ defmodule Shinkai.Sink.Hls do end buffer? = length(hls_tracks) > 1 and Enum.any?(hls_tracks, &(&1.type == :video)) - :ok = PubSub.subscribe(Shinkai.PubSub, packets_topic(state.source_id)) + + if hls_tracks == [] do + :ok = PubSub.unsubscribe(Shinkai.PubSub, packets_topic(state.source_id)) + end {:noreply, %{ @@ -100,30 +108,45 @@ defmodule Shinkai.Sink.Hls do }} end - def handle_info({:packet, packets}, state) when is_list(packets) do - {:noreply, Enum.reduce(packets, state, &do_handle_packet/2)} - end - @impl true - def handle_info({:packet, packet}, state) do + def handle_info({:packet, %Shinkai.Packet{} = packet}, state) do {:noreply, do_handle_packet(packet, state)} end + def handle_info({:packet, packets}, state) do + {:noreply, Enum.reduce(packets, state, &do_handle_packet/2)} + end + @impl true def handle_info(:disconnected, state) do :ok = Writer.close(state.writer) - :ok = PubSub.unsubscribe(Shinkai.PubSub, packets_topic(state.source_id)) :ok = PubSub.local_broadcast(Shinkai.PubSub, sink_topic(state.source_id), {:hls, :done}) {:noreply, %{state | writer: Writer.new!(state.config), last_sample: %{}, packets: [], buffer?: false}} end - defp do_handle_packet(%{track_id: id}, state) when not is_map_key(state.tracks, id) do - state + defp do_handle_packet(packet, %{buffer?: false} = state) + when is_map_key(state.tracks, packet.track_id) do + case Map.fetch(state.last_sample, packet.track_id) do + :error -> + last_samples = Map.put(state.last_sample, packet.track_id, packet_to_sample(packet)) + %{state | last_sample: last_samples} + + {:ok, last_sample} -> + variant_name = state.tracks[packet.track_id].type |> to_string() + sample = packet_to_sample(packet) + last_sample = %{last_sample | duration: sample.dts - last_sample.dts} + + %{ + state + | writer: Writer.write_sample(state.writer, variant_name, last_sample), + last_sample: Map.put(state.last_sample, packet.track_id, sample) + } + end end - defp do_handle_packet(packet, %{buffer?: true} = state) do + defp do_handle_packet(packet, state) when is_map_key(state.tracks, packet.track_id) do # buffer until we get a video packet # and then drop all packets with dts < dts of that video packet track = state.tracks[packet.track_id] @@ -143,24 +166,7 @@ defmodule Shinkai.Sink.Hls do end end - defp do_handle_packet(packet, state) do - case Map.fetch(state.last_sample, packet.track_id) do - :error -> - last_samples = Map.put(state.last_sample, packet.track_id, packet_to_sample(packet)) - %{state | last_sample: last_samples} - - {:ok, last_sample} -> - variant_name = state.tracks[packet.track_id].type |> to_string() - sample = packet_to_sample(packet) - last_sample = %{last_sample | duration: sample.dts - last_sample.dts} - - %{ - state - | writer: Writer.write_sample(state.writer, variant_name, last_sample), - last_sample: Map.put(state.last_sample, packet.track_id, sample) - } - end - end + defp do_handle_packet(_packet, state), do: state defp reject?(packet, state, max_dts) do track = state.tracks[packet.track_id] diff --git a/lib/shinkai/sink/rtmp.ex b/lib/shinkai/sink/rtmp.ex index b3a87dd..d56a9ed 100644 --- a/lib/shinkai/sink/rtmp.ex +++ b/lib/shinkai/sink/rtmp.ex @@ -56,7 +56,7 @@ defmodule Shinkai.Sink.RTMP do if unsupported_tracks != [] do Logger.warning( - "[#{state.source_id}] Ignore unsupported tracks: #{Enum.map_join(unsupported_tracks, ", ", & &1.codec)}" + "[#{state.source_id}] rtmp sink: ignore unsupported tracks: #{Enum.map_join(unsupported_tracks, ", ", & &1.codec)}" ) end @@ -79,14 +79,13 @@ defmodule Shinkai.Sink.RTMP do def handle_info({:packet, packets}, state) do Registry.dispatch(Sink.Registry, {:rtmp, state.source_id}, fn entries -> packets = List.wrap(packets) - track = state.tracks[hd(packets).track_id] - tags = Enum.map(packets, &packet_to_tag(track, &1)) - for {pid, _} <- entries, {timestamp, data} <- tags do - case track.type do - :video -> ClientSession.send_video_data(pid, timestamp, data) - :audio -> ClientSession.send_audio_data(pid, timestamp, data) - end + case state.tracks[hd(packets).track_id] do + nil -> + :ok + + track -> + dispatch_packets(entries, packets, track) end end) @@ -106,6 +105,18 @@ defmodule Shinkai.Sink.RTMP do {:noreply, state} end + defp dispatch_packets(entries, packets, track) do + tags = Enum.map(packets, &packet_to_tag(track, &1)) + + for {pid, _} <- entries, {timestamp, data} <- tags do + # credo:disable-for-next-line + case track.type do + :video -> ClientSession.send_video_data(pid, timestamp, data) + :audio -> ClientSession.send_audio_data(pid, timestamp, data) + end + end + end + defp packet_to_tag(track, packet) do dts = div(packet.dts * @timescale, track.timescale) cts = div((packet.pts - packet.dts) * @timescale, track.timescale) diff --git a/lib/shinkai/sources.ex b/lib/shinkai/sources.ex index 6afb719..6f0aae6 100644 --- a/lib/shinkai/sources.ex +++ b/lib/shinkai/sources.ex @@ -26,7 +26,7 @@ defmodule Shinkai.Sources do [] -> if source.type == :publish do :ok = PublishManager.monitor(source, self()) - :ets.insert(:sources, {source.id, source}) + :ets.insert(:sources, {source.id, %{source | status: :streaming}}) end DynamicSupervisor.start_child( diff --git a/lib/shinkai/sources/rtsp.ex b/lib/shinkai/sources/rtsp.ex index 453cb79..c5ed39a 100644 --- a/lib/shinkai/sources/rtsp.ex +++ b/lib/shinkai/sources/rtsp.ex @@ -5,10 +5,8 @@ defmodule Shinkai.Sources.RTSP do require Logger - import Shinkai.Utils - - alias MediaCodecs.MPEG4 - alias Shinkai.{Sources, Track} + alias Shinkai.Sources + alias Shinkai.Sources.RTSP.MediaProcessor @timeout 6_000 @reconnect_timeout 5_000 @@ -33,7 +31,7 @@ defmodule Shinkai.Sources.RTSP do id: source.id, rtsp_pid: pid, tracks: %{}, - packets_topic: packets_topic(source.id) + media_processor: nil } {:ok, state, {:continue, :connect}} @@ -46,21 +44,15 @@ defmodule Shinkai.Sources.RTSP do def handle_info(:reconnect, state), do: do_connect(state) def handle_info({:rtsp, _pid, {id, sample_or_samples}}, state) do - :ok = - Phoenix.PubSub.broadcast( - Shinkai.PubSub, - state.packets_topic, - {:packet, to_packets(sample_or_samples, state.tracks[id].id)} - ) - - {:noreply, state} + media_processor = MediaProcessor.handle_sample(id, sample_or_samples, state.media_processor) + {:noreply, %{state | media_processor: media_processor}} end @impl true def handle_info({:rtsp, pid, :session_closed}, %{rtsp_pid: pid} = state) do Logger.error("[#{state.id}] rtsp client disconnected") - Phoenix.PubSub.broadcast!(Shinkai.PubSub, state_topic(state.id), :disconnected) - Sources.update_source_status(state.id, :failed) + Phoenix.PubSub.broadcast!(Shinkai.PubSub, Shinkai.Utils.state_topic(state.id), :disconnected) + update_status(state, :failed) Process.send_after(self(), :reconnect, @reconnect_timeout) {:noreply, state} end @@ -73,21 +65,10 @@ defmodule Shinkai.Sources.RTSP do defp do_connect(state) do with {:ok, tracks} <- RTSP.connect(state.rtsp_pid, @timeout), - tracks <- build_tracks(tracks), :ok <- RTSP.play(state.rtsp_pid) do - codecs = tracks |> Map.values() |> Enum.map_join(", ", & &1.codec) - Logger.info("[#{state.id}] start reading from #{map_size(tracks)} tracks (#{codecs})") - update_status(state, :streaming) - - :ok = - Phoenix.PubSub.broadcast( - Shinkai.PubSub, - tracks_topic(state.id), - {:tracks, Map.values(tracks)} - ) - - {:noreply, %{state | tracks: tracks}} + media_processor = MediaProcessor.new(state.id, tracks) + {:noreply, %{state | media_processor: media_processor}} else {:error, reason} -> Logger.error("[#{state.id}] rtsp connection failed: #{inspect(reason)}") @@ -97,49 +78,5 @@ defmodule Shinkai.Sources.RTSP do end end - defp build_tracks(tracks) do - tracks - |> Enum.with_index(1) - |> Map.new(fn {track, id} -> - codec = codec(String.downcase(track.rtpmap.encoding)) - - {track.control_path, - Track.new( - id: id, - type: track.type, - codec: codec, - timescale: track.rtpmap.clock_rate, - priv_data: priv_data(codec, track.fmtp) - )} - end) - end - - defp codec("mpeg4-generic"), do: :aac - defp codec(other), do: String.to_atom(other) - - defp priv_data(:aac, fmtp), do: MPEG4.AudioSpecificConfig.parse(fmtp.config) - defp priv_data(:h264, %{sprop_parameter_sets: nil}), do: nil - defp priv_data(:h264, %{sprop_parameter_sets: pps}), do: {pps.sps, [pps.pps]} - defp priv_data(:h265, %{sprop_vps: nil}), do: nil - defp priv_data(:h265, fmtp), do: {hd(fmtp.sprop_vps), hd(fmtp.sprop_sps), fmtp.sprop_pps} - defp priv_data(_codec, _fmtp), do: nil - - defp to_packets(samples, track_id) when is_list(samples) do - Enum.map(samples, &packet_from_sample(track_id, &1)) - end - - defp to_packets(sample, track_id), do: packet_from_sample(track_id, sample) - - defp packet_from_sample(track_id, {payload, pts, sync?, _timestamp}) do - Shinkai.Packet.new(payload, - track_id: track_id, - dts: pts, - pts: pts, - sync?: sync? - ) - end - - defp update_status(state, status) do - Sources.update_source_status(state.id, status) - end + defp update_status(state, status), do: Sources.update_source_status(state.id, status) end diff --git a/lib/shinkai/sources/rtsp/handler.ex b/lib/shinkai/sources/rtsp/handler.ex new file mode 100644 index 0000000..3bf576b --- /dev/null +++ b/lib/shinkai/sources/rtsp/handler.ex @@ -0,0 +1,38 @@ +defmodule Shinkai.Sources.RTSP.Handler do + @moduledoc false + + use RTSP.Server.ClientHandler + + require Logger + + alias Shinkai.Sources + alias Shinkai.Sources.RTSP.MediaProcessor + + @impl true + def init(_options) do + nil + end + + @impl true + def handle_record(path, tracks, _state) do + with {:ok, source_id} <- source_id(path), + source <- %Sources.Source{id: source_id, type: :publish}, + {:ok, _pid} <- Sources.start(source) do + Logger.info("[RTSP] is publishing to: #{path}") + {:ok, MediaProcessor.new(source_id, tracks)} + end + end + + @impl true + def handle_media(control_path, sample, state) do + MediaProcessor.handle_sample(control_path, sample, state) + end + + @impl true + def handle_closed_connection(state) do + MediaProcessor.close(state) + end + + defp source_id("/"), do: {:error, :missing_path} + defp source_id(<<"/", path::binary>>), do: {:ok, String.replace(path, "/", "-")} +end diff --git a/lib/shinkai/sources/rtsp/media_processor.ex b/lib/shinkai/sources/rtsp/media_processor.ex new file mode 100644 index 0000000..ec39ebb --- /dev/null +++ b/lib/shinkai/sources/rtsp/media_processor.ex @@ -0,0 +1,165 @@ +defmodule Shinkai.Sources.RTSP.MediaProcessor do + @moduledoc false + + require Logger + + alias MediaCodecs.AV1 + alias Shinkai.{Sources, Track} + + @max_buffer 1000 + + @type t :: %{ + source_id: Sources.Source.id(), + tracks: %{String.t() => Track.t()}, + buffer?: boolean(), + packets: [Shinkai.Packet.t()], + buffered_packets: non_neg_integer() + } + + @spec new(Sources.Source.id(), [map()]) :: t() + def new(source_id, tracks) do + tracks = + tracks + |> Enum.with_index(1) + |> Map.new(fn {track, id} -> {track.control_path, Track.from_rtsp_track(id, track)} end) + + ready? = all_tracks_initialized?(Map.values(tracks)) + + Logger.info( + "[#{source_id}] reading #{map_size(tracks)} track(s) (#{Enum.map_join(Map.values(tracks), ", ", & &1.codec)})" + ) + + if ready? do + Phoenix.PubSub.broadcast!( + Shinkai.PubSub, + Shinkai.Utils.tracks_topic(source_id), + {:tracks, Map.values(tracks)} + ) + end + + %{ + source_id: source_id, + tracks: tracks, + buffer?: not ready?, + packets: [], + packets_topic: Shinkai.Utils.packets_topic(source_id), + buffered_packets: 0 + } + end + + @spec handle_sample(String.t(), tuple(), t()) :: t() + def handle_sample(id, sample, %{buffer?: false} = state) do + track = state.tracks[id] + packet = to_packet(track, sample) + Phoenix.PubSub.broadcast!(Shinkai.PubSub, state.packets_topic, {:packet, packet}) + state + end + + def handle_sample(id, sample, state) do + track = state.tracks[id] + + case maybe_init_track(track, sample) do + {:ok, updated_track} -> + tracks = Map.put(state.tracks, id, updated_track) + + state = %{ + state + | tracks: tracks, + packets: [to_packet(updated_track, sample) | state.packets], + buffered_packets: state.buffered_packets + 1 + } + + cond do + all_tracks_initialized?(Map.values(tracks)) -> unbuffer(state) + state.buffered_packets > @max_buffer -> %{state | packets: [], buffered_packets: 0} + true -> state + end + + :discard -> + state + end + end + + @spec close(t()) :: :ok + def close(state) do + Phoenix.PubSub.broadcast!( + Shinkai.PubSub, + Shinkai.Utils.state_topic(state.source_id), + :disconnected + ) + end + + defp all_tracks_initialized?(tracks) do + Enum.all?(tracks, fn + %{type: :video, priv_data: nil} -> false + _track -> true + end) + end + + defp maybe_init_track(%{type: :video, priv_data: nil}, {_payload, _pts, false, _timestamp}) do + :discard + end + + defp maybe_init_track( + %{type: :video, priv_data: nil} = track, + {payload, _pts, true, _timestamp} + ) do + with {:ok, priv_data} <- look_for_parameter_sets(track.codec, payload) do + {:ok, %{track | priv_data: priv_data}} + end + end + + defp maybe_init_track(track, _sample), do: {:ok, track} + + defp look_for_parameter_sets(:h264, payload) do + {{sps, pps}, _rest} = MediaCodecs.H264.pop_parameter_sets(payload) + if sps != [] and pps != [], do: {:ok, {List.first(sps), pps}}, else: :discard + end + + defp look_for_parameter_sets(:h265, payload) do + {{vps, sps, pps}, _rest} = MediaCodecs.H265.pop_parameter_sets(payload) + + if vps != [] and sps != [] and pps != [], + do: {:ok, {List.first(vps), List.first(sps), pps}}, + else: :discard + end + + defp look_for_parameter_sets(:av1, payload) do + case Enum.find(payload, &(AV1.OBU.type(&1) == :sequence_header)) do + nil -> :discard + seq_header_obu -> {:ok, seq_header_obu} + end + end + + defp unbuffer(state) do + Phoenix.PubSub.broadcast!( + Shinkai.PubSub, + Shinkai.Utils.tracks_topic(state.source_id), + {:tracks, Map.values(state.tracks)} + ) + + state.packets + |> Enum.reverse() + |> List.flatten() + |> Enum.each(&Phoenix.PubSub.broadcast!(Shinkai.PubSub, state.packets_topic, {:packet, &1})) + + %{state | buffer?: false, packets: []} + end + + defp to_packet(track, {payload, pts, sync?, _timestamp}) do + payload = + case track.codec do + :av1 -> Enum.map(payload, &AV1.OBU.set_size_flag/1) + _ -> payload + end + + Shinkai.Packet.new(payload, + track_id: track.id, + dts: pts, + pts: pts, + sync?: sync? + ) + end + + defp to_packet(track, samples), do: Enum.map(samples, &to_packet(track, &1)) +end diff --git a/lib/shinkai/track.ex b/lib/shinkai/track.ex index 83232ca..d330e40 100644 --- a/lib/shinkai/track.ex +++ b/lib/shinkai/track.ex @@ -28,6 +28,19 @@ defmodule Shinkai.Track do struct(__MODULE__, opts) end + @doc false + def from_rtsp_track(id, track) do + codec = rtpmap_codec(String.downcase(track.rtpmap.encoding)) + + %__MODULE__{ + id: id, + type: track.type, + codec: codec, + timescale: track.rtpmap.clock_rate, + priv_data: fmtp_priv_data(codec, track.fmtp) + } + end + @doc false @spec to_hls_track(t()) :: HLX.Track.t() def to_hls_track(track) do @@ -80,9 +93,18 @@ defmodule Shinkai.Track do end defp rtmp_init_data(:av1, config_obu) do - <<_header::binary-size(8), dcr::binary>> = - Box.Av1c.new(config_obu) |> Box.serialize() |> IO.iodata_to_binary() - + <<_header::binary-size(8), dcr::binary>> = config_obu |> Box.Av1c.new() |> Box.serialize() dcr end + + defp rtpmap_codec("mpeg4-generic"), do: :aac + defp rtpmap_codec(other), do: String.to_atom(other) + + defp fmtp_priv_data(:aac, %{config: nil}), do: raise("Missing AAC config in FMTP") + defp fmtp_priv_data(:aac, fmtp), do: MPEG4.AudioSpecificConfig.parse(fmtp.config) + defp fmtp_priv_data(:h264, %{sprop_parameter_sets: nil}), do: nil + defp fmtp_priv_data(:h264, %{sprop_parameter_sets: pps}), do: {pps.sps, [pps.pps]} + defp fmtp_priv_data(:h265, %{sprop_vps: nil}), do: nil + defp fmtp_priv_data(:h265, fmtp), do: {hd(fmtp.sprop_vps), hd(fmtp.sprop_sps), fmtp.sprop_pps} + defp fmtp_priv_data(_codec, _fmtp), do: nil end diff --git a/mix.exs b/mix.exs index 040dbcd..8ebb00f 100644 --- a/mix.exs +++ b/mix.exs @@ -32,7 +32,7 @@ defmodule Shinkai.MixProject do defp deps do [ {:phoenix_pubsub, "~> 2.2"}, - {:rtsp, "~> 0.7.0"}, + {:rtsp, "~> 0.8.0"}, {:hlx, "~> 0.5.0"}, {:ex_rtmp, "~> 0.4.1"}, {:yaml_elixir, "~> 2.12"}, diff --git a/mix.lock b/mix.lock index 144f74c..3e1d075 100644 --- a/mix.lock +++ b/mix.lock @@ -1,15 +1,15 @@ %{ - "bandit": {:hex, :bandit, "1.10.1", "6b1f8609d947ae2a74da5bba8aee938c94348634e54e5625eef622ca0bbbb062", [:mix], [{:hpax, "~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.18", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "4b4c35f273030e44268ace53bf3d5991dfc385c77374244e2f960876547671aa"}, + "bandit": {:hex, :bandit, "1.10.2", "d15ea32eb853b5b42b965b24221eb045462b2ba9aff9a0bda71157c06338cbff", [:mix], [{:hpax, "~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.18", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "27b2a61b647914b1726c2ced3601473be5f7aa6bb468564a688646a689b3ee45"}, "bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "coerce": {:hex, :coerce, "1.0.2", "5ef791040c92baaa5dd344887563faaeac6e6742573a167493294f8af3672bbe", [:mix], [], "hexpm", "0b3451c729571234fdac478636c298e71d1f2ce1243abed5fa43fa3181b980eb"}, "credo": {:hex, :credo, "1.7.15", "283da72eeb2fd3ccf7248f4941a0527efb97afa224bcdef30b4b580bc8258e1c", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "291e8645ea3fea7481829f1e1eb0881b8395db212821338e577a90bf225c5607"}, "earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, - "ex_doc": {:hex, :ex_doc, "0.39.3", "519c6bc7e84a2918b737aec7ef48b96aa4698342927d080437f61395d361dcee", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "0590955cf7ad3b625780ee1c1ea627c28a78948c6c0a9b0322bd976a079996e1"}, + "ex_doc": {:hex, :ex_doc, "0.40.0", "2635974389b80fd3ca61b0f993d459dad05b4a8f9b069dcfbbc5f6a8a6aef60e", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "c040735250e2752b6e1102eeb4aa3f1dca74c316db873ae09f955d42136e7e5b"}, "ex_flv": {:hex, :ex_flv, "0.4.0", "9e43c833b5cbe3c6e21bb2651ae7650f3ec939eac8079f34efed8f813bf9133d", [:mix], [], "hexpm", "484f6990791e0c8862a88e4150f004deb067c7342e40b779c82d5c9a9c057969"}, "ex_m3u8": {:hex, :ex_m3u8, "0.15.4", "66f6ec7e4fb7372c48032db1c2d4a3e6c2bbbde2d1d9a1098986e3caa0ab7a55", [:mix], [{:nimble_parsec, "~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "ec03aa516919e0c8ec202da55f609b763bd7960195a3388900090fcad270c873"}, - "ex_mp4": {:hex, :ex_mp4, "0.14.1", "6152b9b981d5d6d5069d8dccd3d60f4f4be135f758f9b4bae039018119fdf089", [:mix], [{:media_codecs, "~> 0.10.0", [hex: :media_codecs, repo: "hexpm", optional: true]}, {:ratio, "~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:table_rex, "~> 4.0", [hex: :table_rex, repo: "hexpm", optional: true]}], "hexpm", "f9f9630406371d19691673acea83f378b96e5c1961eb28436aa5d8ded148188c"}, + "ex_mp4": {:hex, :ex_mp4, "0.14.2", "c362b27c50fa8d5a16e4f5652963fcc47d5a61215eb729a0d6f8ec521575ed6d", [:mix], [{:media_codecs, "~> 0.10.0", [hex: :media_codecs, repo: "hexpm", optional: true]}, {:ratio, "~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:table_rex, "~> 4.0", [hex: :table_rex, repo: "hexpm", optional: true]}], "hexpm", "3712c62a93ddde83419bb22e382a145c6527c8b002d8a22348202828022e1041"}, "ex_rtcp": {:hex, :ex_rtcp, "0.4.1", "e0f0c0baf329de92059e2afdc34d66d61f8b983f0801daa10f1a712360919e45", [:mix], [], "hexpm", "83ab3dffcffc6149eb404f1e1b1b62f755efd54c818e27c2c88e6ba3341c5d41"}, "ex_rtmp": {:hex, :ex_rtmp, "0.4.1", "1be8a1f75f2940d59ae07939218d1cdddac85de118370f0f001f816b8bac4576", [:mix], [{:ex_flv, "~> 0.4.0", [hex: :ex_flv, repo: "hexpm", optional: false]}], "hexpm", "58e1f993c575b6604e8256ed89887fcb7f7b80d0627e0bd71e9eea98bbe79766"}, "ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"}, @@ -25,7 +25,7 @@ "membrane_rtsp": {:hex, :membrane_rtsp, "0.11.0", "887b1c0cd4f40f6ce93880bfa1a1e8c9e250aabb24810a8fe2a7556bb54c29c4", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 0.17.0 or ~> 1.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3", [hex: :mockery, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.4.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "69252d77ad3df48e6cb21fc16b0c5730607709714ad7849b7635813f9741ee2f"}, "mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"}, "mockery": {:hex, :mockery, "2.5.0", "a87acd74fd733aa3b9cb5663d6f690178b056608f2652f18e4ec423ddd5496ed", [:mix], [], "hexpm", "52492b2eba61055df1c626e894663b624b5e6fdfaaaba1d9a8596236fbf4da69"}, - "mpeg_ts": {:hex, :mpeg_ts, "3.3.8", "3ef910d840cc2c7cb8998c5982b53d166ed207473c918cec3d4988ebf14fdd00", [:mix], [], "hexpm", "9e3ecbadcad9bfcae3c7be53e78b620068f361daac9114115c9490c79050aeab"}, + "mpeg_ts": {:hex, :mpeg_ts, "3.3.11", "77d69c5599fcd6eadef926b03cf6fe990dd76301ec41ce77de71bc84ad53412c", [:mix], [], "hexpm", "e1554e7b2ffe5692effca19173200fdee0959bd40d201ec920a950054c27cb76"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, @@ -34,7 +34,7 @@ "plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"}, "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, "ratio": {:hex, :ratio, "4.0.1", "3044166f2fc6890aa53d3aef0c336f84b2bebb889dc57d5f95cc540daa1912f8", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "c60cbb3ccdff9ffa56e7d6d1654b5c70d9f90f4d753ab3a43a6bf40855b881ce"}, - "rtsp": {:hex, :rtsp, "0.7.0", "b98706e0d89dc555bee1b33266a1bf4a5c593c3b39c2be1e9781025f6305d691", [:mix], [{:ex_mp4, "~> 0.14.0", [hex: :ex_mp4, repo: "hexpm", optional: true]}, {:ex_rtcp, "~> 0.4.0", [hex: :ex_rtcp, repo: "hexpm", optional: false]}, {:ex_rtp, "~> 0.4.0", [hex: :ex_rtp, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 1.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:media_codecs, "~> 0.10.0", [hex: :media_codecs, repo: "hexpm", optional: false]}, {:membrane_rtsp, "~> 0.11.0", [hex: :membrane_rtsp, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}], "hexpm", "fa8f011c8eb702481ada4432d2d62065943f3a93c242d728d7bf7c680297a0a1"}, + "rtsp": {:hex, :rtsp, "0.8.1", "4bffebfcb0e1354283567178c040bbf40a85c4fbbde6d23addbbc7672cb3c700", [:mix], [{:ex_mp4, "~> 0.14.0", [hex: :ex_mp4, repo: "hexpm", optional: true]}, {:ex_rtcp, "~> 0.4.0", [hex: :ex_rtcp, repo: "hexpm", optional: false]}, {:ex_rtp, "~> 0.4.0", [hex: :ex_rtp, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 1.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:media_codecs, "~> 0.10.0", [hex: :media_codecs, repo: "hexpm", optional: false]}, {:membrane_rtsp, "~> 0.11.0", [hex: :membrane_rtsp, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}], "hexpm", "b4af3c30b8f79dd642940452c6ad6727bfd1df492e5ddbc1eb705f25df6f4053"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "thousand_island": {:hex, :thousand_island, "1.4.3", "2158209580f633be38d43ec4e3ce0a01079592b9657afff9080d5d8ca149a3af", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6e4ce09b0fd761a58594d02814d40f77daff460c48a7354a15ab353bb998ea0b"}, "typed_struct": {:hex, :typed_struct, "0.3.0", "939789e3c1dca39d7170c87f729127469d1315dcf99fee8e152bb774b17e7ff7", [:mix], [], "hexpm", "c50bd5c3a61fe4e198a8504f939be3d3c85903b382bde4865579bc23111d1b6d"}, diff --git a/test/fixtures/big_buck_av1_opus.mp4 b/test/fixtures/big_buck_av1_opus.mp4 new file mode 100644 index 0000000..a27cf83 Binary files /dev/null and b/test/fixtures/big_buck_av1_opus.mp4 differ diff --git a/test/shinkai/pipeline_test.exs b/test/shinkai/pipeline_test.exs index 0a5c5e0..51adb95 100644 --- a/test/shinkai/pipeline_test.exs +++ b/test/shinkai/pipeline_test.exs @@ -3,114 +3,125 @@ defmodule Shinkai.PipelineTest do alias ExM3U8.Tags alias RTSP.FileServer + alias Shinkai.RTSP.Publisher alias Shinkai.Sources.Source alias Shinkai.Utils @moduletag :tmp_dir - setup do - {:ok, server} = - FileServer.start_link( - port: 0, - files: [%{path: "/test", location: "test/fixtures/big_buck_avc_aac.mp4"}] - ) + @fixtures [ + "test/fixtures/big_buck_av1_opus.mp4", + "test/fixtures/big_buck_avc_aac.mp4" + ] - %{rtsp_server: server} - end + for fixture <- @fixtures do + describe "hls sink: #{fixture}" do + test "Stream from rtsp" do + rtsp_server = start_rtsp_server(rate_control: false) - describe "hls sink" do - test "Stream from rtsp", %{tmp_dir: _dir} do - rtsp_server = start_rtsp_server(rate_control: false) - source = %Source{id: UUID.uuid4(), type: :rtsp, uri: rtsp_uri(rtsp_server)} - Phoenix.PubSub.subscribe(Shinkai.PubSub, Utils.sink_topic(source.id)) + source = %Source{ + id: UUID.uuid4(), + type: :rtsp, + uri: rtsp_uri(rtsp_server, unquote(fixture)) + } - _pid = start_supervised!({Shinkai.Pipeline, source}) + Phoenix.PubSub.subscribe(Shinkai.PubSub, Utils.sink_topic(source.id)) - assert_receive {:hls, :done}, 5_000 + _pid = start_supervised!({Shinkai.Pipeline, source}) - hls_path = Path.join(Shinkai.Config.get_config(:hls)[:storage_dir], source.id) - assert_hls(hls_path) + assert_receive {:hls, :done}, 5_000 - File.rm_rf!(hls_path) - end + hls_path = Path.join(Shinkai.Config.get_config(:hls)[:storage_dir], source.id) + audio? = not String.contains?(unquote(fixture), "opus") + assert_hls(hls_path, audio?) - test "Stream from rtmp" do - {:ok, rtmp_server} = - ExRTMP.Server.start( - port: 0, - handler: Shinkai.RTMP.Server.Handler, - handler_options: [fixture: "test/fixtures/big_buck_avc_aac.mp4"] - ) + on_exit(fn -> File.rm_rf!(hls_path) end) + end - source = %Source{id: UUID.uuid4(), type: :rtmp, uri: rtmp_uri(rtmp_server, "live/test")} - Phoenix.PubSub.subscribe(Shinkai.PubSub, Utils.sink_topic(source.id)) + test "Stream from rtsp publish", %{tmp_dir: _dir} do + id = UUID.uuid4() + Phoenix.PubSub.subscribe(Shinkai.PubSub, Utils.sink_topic(id)) - _pid = start_supervised!({Shinkai.Pipeline, source}) + Publisher.new("rtsp://localhost:8554/#{id}", unquote(fixture)) + |> Publisher.publish() - assert_receive {:hls, :done}, 5_000 + assert_receive {:hls, :done}, 5_000 - hls_path = Path.join(Shinkai.Config.get_config(:hls)[:storage_dir], source.id) - assert_hls(hls_path) + hls_path = Path.join(Shinkai.Config.get_config(:hls)[:storage_dir], id) + audio? = not String.contains?(unquote(fixture), "opus") + assert_hls(hls_path, audio?) - ExRTMP.Server.stop(rtmp_server) - File.rm_rf!(hls_path) - end - end + File.rm_rf!(hls_path) + end - describe "rtmp sink" do - test "Stream from rtsp", %{tmp_dir: _dir} do - server = start_rtsp_server() - source = %Source{id: UUID.uuid4(), type: :rtsp, uri: rtsp_uri(server)} + test "Stream from rtmp" do + {:ok, rtmp_server} = + ExRTMP.Server.start( + port: 0, + handler: Shinkai.RTMP.Server.Handler, + handler_options: [fixture: unquote(fixture)] + ) - {:ok, rtmp_server} = ExRTMP.Server.start(port: 0, handler: Shinkai.Sources.RTMP.Handler) - start_source(source) + source = %Source{id: UUID.uuid4(), type: :rtmp, uri: rtmp_uri(rtmp_server, "live/test")} + Phoenix.PubSub.subscribe(Shinkai.PubSub, Utils.sink_topic(source.id)) - # Wait for the RTMP sink to receive tracks - Process.sleep(150) + _pid = start_supervised!({Shinkai.Pipeline, source}) - {:ok, pid} = ExRTMP.Client.start_link(uri: rtmp_uri(rtmp_server), stream_key: source.id) - assert :ok = ExRTMP.Client.connect(pid) - assert :ok = ExRTMP.Client.play(pid) + assert_receive {:hls, :done}, 5_000 - assert_rtmp_receive(pid) + hls_path = Path.join(Shinkai.Config.get_config(:hls)[:storage_dir], source.id) + audio? = not String.contains?(unquote(fixture), "opus") + assert_hls(hls_path, audio?) - ExRTMP.Server.stop(rtmp_server) + ExRTMP.Server.stop(rtmp_server) + File.rm_rf!(hls_path) + end end + end - test "Stream from rtmp" do - {:ok, rtmp_server} = - ExRTMP.Server.start( - port: 0, - handler: Shinkai.RTMP.Server.Handler, - handler_options: [fixture: "test/fixtures/big_buck_avc_aac.mp4"] - ) + for fixture <- @fixtures do + describe "rtmp sink: #{fixture}" do + test "Stream from rtsp" do + server = start_rtsp_server() + source = %Source{id: UUID.uuid4(), type: :rtsp, uri: rtsp_uri(server, unquote(fixture))} - id = UUID.uuid4() + {:ok, rtmp_server} = ExRTMP.Server.start(port: 0, handler: Shinkai.Sources.RTMP.Handler) + start_source(source) - source = %Source{id: "live-#{id}", type: :rtmp, uri: rtmp_uri(rtmp_server, "live/#{id}")} + # Wait for the RTMP sink to receive tracks + Process.sleep(150) - _pid = start_supervised!({Shinkai.Pipeline, source}) + {:ok, pid} = ExRTMP.Client.start_link(uri: rtmp_uri(rtmp_server), stream_key: source.id) + assert :ok = ExRTMP.Client.connect(pid) + assert :ok = ExRTMP.Client.play(pid) - {:ok, pid} = ExRTMP.Client.start_link(uri: rtmp_uri(rtmp_server, "live"), stream_key: id) - assert :ok = ExRTMP.Client.connect(pid) - assert :ok = ExRTMP.Client.play(pid) + assert_rtmp_receive(pid, unquote(fixture)) - assert_rtmp_receive(pid) + ExRTMP.Server.stop(rtmp_server) + end - ExRTMP.Server.stop(rtmp_server) - end + test "Stream from rtmp" do + {:ok, rtmp_server} = + ExRTMP.Server.start( + port: 0, + handler: Shinkai.RTMP.Server.Handler, + handler_options: [fixture: unquote(fixture)] + ) - defp assert_rtmp_receive(pid) do - assert_receive {:video, ^pid, {:codec, :h264, _dcr}}, 1000 - assert_receive {:audio, ^pid, {:codec, :aac, _}}, 1000 + id = UUID.uuid4() - for _i <- 1..20 do - assert_receive {:video, ^pid, {:sample, payload, _dts, _pts, keyframe?}}, 1000 - assert_receive {:audio, ^pid, {:sample, data, _dts}}, 1000 + source = %Source{id: "live-#{id}", type: :rtmp, uri: rtmp_uri(rtmp_server, "live/#{id}")} + start_source(source) - assert is_list(payload) - assert is_binary(data) - assert is_boolean(keyframe?) + # _pid = start_supervised!({Shinkai.Pipeline, source}) + + {:ok, pid} = ExRTMP.Client.start_link(uri: rtmp_uri(rtmp_server, "live"), stream_key: id) + assert :ok = ExRTMP.Client.connect(pid) + assert :ok = ExRTMP.Client.play(pid) + + assert_rtmp_receive(pid, unquote(fixture)) + + ExRTMP.Server.stop(rtmp_server) end end end @@ -120,10 +131,15 @@ defmodule Shinkai.PipelineTest do _pid = start_supervised!({Shinkai.Pipeline, source}) end - defp assert_hls(hls_path) do + defp assert_hls(hls_path, audio?, video? \\ true) do assert File.exists?(Path.join(hls_path, "master.m3u8")) assert File.exists?(Path.join(hls_path, "video.m3u8")) - assert File.exists?(Path.join(hls_path, "audio.m3u8")) + + if audio? do + assert File.exists?(Path.join(hls_path, "audio.m3u8")) + else + refute File.exists?(Path.join(hls_path, "audio.m3u8")) + end assert {:ok, multivariabt_playlist} = hls_path @@ -137,18 +153,58 @@ defmodule Shinkai.PipelineTest do items: items } = multivariabt_playlist - assert length(items) == 2 + assert length(items) == Enum.count([audio?, video?], & &1) + + if audio? do + assert %{type: :audio, group_id: "audio"} = + Enum.find(items, &is_struct(&1, ExM3U8.Tags.Media)) + end - assert %{type: :audio, group_id: "audio"} = - Enum.find(items, &is_struct(&1, ExM3U8.Tags.Media)) + # codesc "avc1.42C00C,mp4a.40.2" + if audio? do + assert %{audio: "audio", codecs: _codecs, resolution: {240, 136}} = + Enum.find(items, &is_struct(&1, ExM3U8.Tags.Stream)) + else + assert %{audio: nil, codecs: _codecs, resolution: {240, 136}} = + Enum.find(items, &is_struct(&1, ExM3U8.Tags.Stream)) + end - assert %{audio: "audio", codecs: "avc1.42C00C,mp4a.40.2", resolution: {240, 136}} = - Enum.find(items, &is_struct(&1, ExM3U8.Tags.Stream)) + if audio? do + assert_media_playlist(hls_path, "audio", 3, 5) + end - assert_media_playlist(hls_path, "audio", 3, 5) assert_media_playlist(hls_path, "video", 2, 5) end + defp assert_rtmp_receive(pid, fixture) do + {video_codec, audio_codec} = + case fixture do + "test/fixtures/big_buck_avc_aac.mp4" -> {:h264, :aac} + _ -> {:av1, nil} + end + + if video_codec do + assert_receive {:video, ^pid, {:codec, ^video_codec, _dcr}}, 1000 + end + + if audio_codec do + assert_receive {:audio, ^pid, {:codec, ^audio_codec, _}}, 1000 + end + + for _i <- 1..20 do + if video_codec do + assert_receive {:video, ^pid, {:sample, payload, _dts, _pts, keyframe?}}, 1000 + assert is_list(payload) or is_binary(payload) + assert is_boolean(keyframe?) + end + + if audio_codec do + assert_receive {:audio, ^pid, {:sample, data, _dts}}, 1000 + assert is_binary(data) + end + end + end + defp rtmp_uri(server, path \\ "") do {:ok, port} = ExRTMP.Server.port(server) "rtmp://127.0.0.1:#{port}/#{path}" @@ -182,20 +238,21 @@ defmodule Shinkai.PipelineTest do end defp start_rtsp_server(other_options \\ []) do - default_options = - [ - port: 0, - files: [%{path: "/test", location: "test/fixtures/big_buck_avc_aac.mp4"}], - rate_control: true - ] - |> Keyword.merge(other_options) - + files = + @fixtures + |> Enum.with_index(1) + |> Enum.map(fn {fixture, idx} -> + %{path: "/test#{idx}", location: fixture} + end) + + default_options = Keyword.merge([port: 0, files: files, rate_control: true], other_options) {:ok, pid} = FileServer.start_link(default_options) pid end - defp rtsp_uri(server) do + defp rtsp_uri(server, fixture) do + idx = Enum.find_index(@fixtures, &(&1 == fixture)) + 1 {:ok, port} = FileServer.port_number(server) - "rtsp://127.0.0.1:#{port}/test" + "rtsp://127.0.0.1:#{port}/test#{idx}" end end diff --git a/test/shinkai/sources/rtsp_test.exs b/test/shinkai/sources/rtsp_test.exs index c17b13c..d363616 100644 --- a/test/shinkai/sources/rtsp_test.exs +++ b/test/shinkai/sources/rtsp_test.exs @@ -31,7 +31,7 @@ defmodule Shinkai.Sources.RTSPTest do id: 1, type: :video, codec: :h264, - timescale: 15_360, + timescale: 90_000, priv_data: {<<103, 66, 192, 12, 217, 3, 196, 254, 95, 252, 2, 32, 2, 28, 64, 0, 0, 3, 0, 64, 0, 0, 15, 3, 197, 10, 146>>, [<<104, 203, 131, 203, 32>>]} diff --git a/test/support/rtmp/mp4_to_flv.ex b/test/support/rtmp/mp4_to_flv.ex index e10dec7..b5bdb90 100644 --- a/test/support/rtmp/mp4_to_flv.ex +++ b/test/support/rtmp/mp4_to_flv.ex @@ -25,7 +25,7 @@ defmodule Shinkai.RTMP.Server.Mp4ToFlv do |> Stream.map(&Reader.read_sample(reader, &1)) |> Enum.each(fn sample -> if sample.track_id == video_track.id do - {dts, tag} = video_sample_tag(sample, video_track.timescale) + {dts, tag} = video_sample_tag(sample, video_track) ClientSession.send_video_data(rtmp_client, dts, tag) else {dts, tag} = audio_sample_tag(sample, audio_track.timescale) @@ -45,6 +45,17 @@ defmodule Shinkai.RTMP.Server.Mp4ToFlv do |> Tag.Serializer.serialize() end + defp flv_init_tag(%{media: :av1} = track) do + av1c = ExMP4.Box.serialize(track.priv_data) + + Tag.Serializer.serialize(%Tag.ExVideoData{ + codec_id: :av1, + packet_type: :sequence_start, + frame_type: :keyframe, + data: binary_part(av1c, 8, byte_size(av1c) - 8) + }) + end + defp flv_init_tag(%{media: :aac} = track) do [descriptor] = MediaCodecs.MPEG4.parse_descriptors(track.priv_data.es_descriptor) @@ -54,9 +65,34 @@ defmodule Shinkai.RTMP.Server.Mp4ToFlv do |> Tag.Serializer.serialize() end - defp video_sample_tag(sample, timescale) do - dts = ExMP4.Helper.timescalify(sample.dts, timescale, :millisecond) - pts = ExMP4.Helper.timescalify(sample.pts, timescale, :millisecond) + defp flv_init_tag(%{media: :opus} = track) do + dops = ExMP4.Box.serialize(track.priv_data) + + Tag.Serializer.serialize(%Tag.ExAudioData{ + codec_id: :opus, + packet_type: :sequence_start, + data: binary_part(dops, 8, byte_size(dops) - 8) + }) + end + + defp video_sample_tag(sample, %{media: codec} = track) when codec in [:h265, :av1] do + dts = ExMP4.Helper.timescalify(sample.dts, track.timescale, :millisecond) + pts = ExMP4.Helper.timescalify(sample.pts, track.timescale, :millisecond) + + sample = + Tag.Serializer.serialize(%Tag.ExVideoData{ + codec_id: codec, + composition_time_offset: pts - dts, + packet_type: :coded_frames, + frame_type: if(sample.sync?, do: :keyframe, else: :interframe) + }) + + {dts, sample} + end + + defp video_sample_tag(sample, track) do + dts = ExMP4.Helper.timescalify(sample.dts, track.timescale, :millisecond) + pts = ExMP4.Helper.timescalify(sample.pts, track.timescale, :millisecond) sample = sample.payload diff --git a/test/support/rtsp/publisher.ex b/test/support/rtsp/publisher.ex new file mode 100644 index 0000000..8402fd4 --- /dev/null +++ b/test/support/rtsp/publisher.ex @@ -0,0 +1,173 @@ +defmodule Shinkai.RTSP.Publisher do + @moduledoc false + + alias ExMP4.Reader + alias Membrane.RTSP, as: MRTSP + alias RTSP.RTP.Encoder + + @spec new(String.t(), String.t()) :: map() + def new(rtsp_url, file) do + {:ok, rtsp} = Membrane.RTSP.start_link(rtsp_url) + reader = Reader.new!(file) + %{rtsp: rtsp, reader: reader} + end + + def publish(state) do + tracks = Reader.tracks(state.reader) + announce_tracks(state.rtsp, tracks) + sockets = setup(state.rtsp, tracks) + {:ok, %{status: 200}} = MRTSP.record(state.rtsp) + + ctx = + Map.new(tracks, fn track -> + {payloader, payloader_state} = init_payloader(track) + + {track.id, + %{ + track: track, + payloader: payloader, + payloader_state: payloader_state, + rtp_socket: sockets[track.id] + }} + end) + + ctx = + state.reader + |> Reader.stream() + |> Stream.map(&Reader.read_sample(state.reader, &1)) + |> Enum.reduce(ctx, fn sample, ctx -> + c = ctx[sample.track_id] + + {rtp_packets, new_payloader_state} = + c.payloader.handle_sample( + payload(c.track, sample.payload), + sample.pts, + c.payloader_state + ) + + Enum.each(rtp_packets, &:gen_udp.send(c.rtp_socket, ExRTP.Packet.encode(&1))) + Map.put(ctx, sample.track_id, %{c | payloader_state: new_payloader_state}) + end) + + Enum.each(ctx, fn {_track_id, c} -> + if function_exported?(c.payloader, :flush, 1) do + c.payloader_state + |> c.payloader.flush() + |> Enum.each(&:gen_udp.send(c.rtp_socket, ExRTP.Packet.encode(&1))) + end + + :gen_udp.close(c.rtp_socket) + end) + + Membrane.RTSP.close(state.rtsp) + end + + defp announce_tracks(rtsp, tracks) do + sdp = %ExSDP{ + origin: %ExSDP.Origin{session_id: 0, session_version: 0, address: {127, 0, 0, 1}}, + media: Enum.map(tracks, &sdp_media/1) + } + + {:ok, %{status: 200}} = + Membrane.RTSP.announce(rtsp, [{"content-type", "application/sdp"}], to_string(sdp)) + end + + defp setup(rtsp, tracks) do + Map.new(tracks, fn track -> + {:ok, rtp_socket} = :gen_udp.open(0, [:binary, active: false]) + {:ok, port} = :inet.port(rtp_socket) + + {:ok, %{status: 200} = resp} = + MRTSP.setup(rtsp, "track=#{track.id}", [ + {"Transport", "RTP/AVP;unicast;client_port=#{port}-#{port + 1};mode=record"} + ]) + + {:ok, transport} = MRTSP.Response.get_header(resp, "Transport") + + [_, rtp_port, _] = Regex.run(~r/server_port=(\d+)-(\d+)/, transport) + :gen_udp.connect(rtp_socket, {127, 0, 0, 1}, String.to_integer(rtp_port)) + + {track.id, rtp_socket} + end) + end + + defp sdp_media(%ExMP4.Track{} = track) do + pt = 95 + track.id + + %ExSDP.Media{ + type: track.type, + protocol: "RTP/AVP", + fmt: [pt], + port: 0, + attributes: [ + {"control", "track=#{track.id}"}, + fmtp(track, pt), + %ExSDP.Attribute.RTPMapping{ + payload_type: pt, + clock_rate: track.timescale, + encoding: encoding(track.media), + params: if(track.type == :audio, do: "2") + } + ] + } + end + + defp encoding(:aac), do: "MPEG4-GENERIC" + defp encoding(codec), do: String.upcase(to_string(codec)) + + defp fmtp(%{media: :h264} = track, pt) do + sps = List.first(track.priv_data.sps) + pps = List.first(track.priv_data.pps) + + %ExSDP.Attribute.FMTP{ + pt: pt, + packetization_mode: 1, + sprop_parameter_sets: %{sps: sps, pps: pps} + } + end + + defp fmtp(%{media: :h265} = track, pt) do + %ExSDP.Attribute.FMTP{ + pt: pt, + sprop_vps: track.priv_data.vps, + sprop_sps: track.priv_data.sps, + sprop_pps: track.priv_data.pps + } + end + + defp fmtp(%{media: :av1} = track, pt) do + %ExSDP.Attribute.FMTP{pt: pt, profile_id: track.priv_data.seq_profile} + end + + defp fmtp(%{media: :aac} = track, pt) do + [descriptor] = MediaCodecs.MPEG4.parse_descriptors(track.priv_data.es_descriptor) + asc = descriptor.dec_config_descr.decoder_specific_info + + "fmtp:#{pt} mode=AAC-hbr; sizeLength=13; indexLength=3; indexDeltaLength=3; constantDuration=1024; config=#{Base.encode16(asc, case: :upper)}" + end + + defp fmtp(%{media: :opus}, pt) do + %ExSDP.Attribute.FMTP{pt: pt, stereo: true} + end + + defp init_payloader(%{media: :h264, id: id}), + do: {Encoder.H264, Encoder.H264.init(payload_type: 95 + id)} + + defp init_payloader(%{media: :h265, id: id}), + do: {Encoder.H265, Encoder.H265.init(payload_type: 95 + id)} + + defp init_payloader(%{media: :av1, id: id}), + do: {Encoder.AV1, Encoder.AV1.init(payload_type: 95 + id)} + + defp init_payloader(%{media: :opus, id: id}), + do: {Encoder.Opus, Encoder.Opus.init(payload_type: 95 + id)} + + defp init_payloader(%{media: :aac, id: id}), + do: {Encoder.MPEG4Audio, Encoder.MPEG4Audio.init(payload_type: 95 + id, mode: :hbr)} + + defp payload(%{media: media}, payload) when media in [:h264, :h265] do + for <>, do: nalu + end + + defp payload(_track, payload), do: payload +end