Skip to content

Commit b177cb4

Browse files
authored
Add usage examples (#3)
* Add examples * Document server * Update readme
1 parent 8ea6328 commit b177cb4

5 files changed

Lines changed: 297 additions & 14 deletions

File tree

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# ExRTMP
22

3+
[![Hex.pm](https://img.shields.io/hexpm/v/ex_rtmp.svg)](https://hex.pm/packages/ex_rtmp)
4+
[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/ex_rtmp)
5+
36
RTMP server and client library for Elixir.
47

58
## Installation
@@ -13,3 +16,27 @@ def deps do
1316
]
1417
end
1518
```
19+
20+
## Usage
21+
See the [examples](./examples) folder.
22+
23+
### `save_to_flv.exs`
24+
A client publish to an RTMP server and the server stores into flv file.
25+
26+
To publish a stream to the server use `ffmpeg`:
27+
```bash
28+
ffmpeg -re -i input_file.mp4 -c:v copy -c:a copy -f flv rtmp://localhost:1935/live/test
29+
```
30+
31+
### `send_mp4.exs`
32+
The server stream an mp4 file to connected clients. The mp4 file must have AAC audio and H264/AVC video.
33+
34+
To start the server:
35+
```bash
36+
elixir examples/send_mp4.exs "input_file.mp4"
37+
```
38+
39+
and to play the stream, you can use `vlc` or `ffplay`:
40+
```bash
41+
ffplay -i rtmp://localhost:1935/live/test
42+
```

examples/save_to_flv.exs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
Mix.install([:ex_rtmp, :ex_flv])
2+
3+
defmodule Handler do
4+
use ExRTMP.Server.Handler
5+
6+
@impl true
7+
def init(_options), do: %{app: nil, file: nil}
8+
9+
@impl true
10+
def handle_connect(%{properties: props}, state) do
11+
app = Map.fetch!(props, "app")
12+
{:ok, %{state | app: app}}
13+
end
14+
15+
@impl true
16+
def handle_publish(_stream_id, stream_key, state) do
17+
file = File.open!("output_#{state.app}_#{stream_key}.flv", [:write, :binary, :raw])
18+
IO.binwrite(file, [ExFLV.Header.serialize(ExFLV.Header.new(1, true, true)), <<0::32>>])
19+
{:ok, %{state | file: file}}
20+
end
21+
22+
@impl true
23+
def handle_audio_data(_stream_id, timestamp, payload, state) do
24+
tag = %ExFLV.Tag{type: :audio, timestamp: timestamp, data: iodata}
25+
write_tag(tag, state)
26+
end
27+
28+
@impl true
29+
def handle_video_data(_stream_id, timestamp, iodata, state) do
30+
tag = %ExFLV.Tag{type: :video, timestamp: timestamp, data: iodata}
31+
write_tag(tag, state)
32+
end
33+
34+
@impl true
35+
def handle_delete_stream(_stream_id, state) do
36+
File.close(state.file)
37+
end
38+
39+
defp write_tag(tag, state) do
40+
data = ExFLV.Tag.serialize(tag)
41+
:ok = IO.binwrite(state.file, [data, <<IO.iodata_length(data)::32>>])
42+
state
43+
end
44+
end
45+
46+
{:ok, pid} = ExRTMP.Server.start_link(port: 1935, handler: Handler)
47+
Process.monitor(pid)
48+
49+
receive do
50+
{:DOWN, _ref, :process, ^pid, _reason} ->
51+
:ok
52+
end

examples/send_mp4.exs

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
Mix.install([:ex_rtmp, :ex_flv, :ex_mp4, :media_codecs])
2+
3+
defmodule Publisher do
4+
use GenServer
5+
6+
import ExMP4.Helper, only: [timescalify: 3]
7+
8+
alias ExFLV.Tag.{AACAudioData, AVCVideoPacket, AudioData, VideoData}
9+
alias ExRTMP.Server.ClientSession
10+
alias MediaCodecs.MPEG4
11+
12+
def start_link(opts) do
13+
GenServer.start_link(__MODULE__, opts)
14+
end
15+
16+
@impl true
17+
def init(opts) do
18+
reader = ExMP4.Reader.new!(opts[:file])
19+
video_track = ExMP4.Reader.track(reader, :video)
20+
audio_track = ExMP4.Reader.track(reader, :audio)
21+
22+
video_reducer = &Enumerable.reduce(video_track, &1, fn elem, _acc -> {:suspend, elem} end)
23+
audio_reducer = &Enumerable.reduce(audio_track, &1, fn elem, _acc -> {:suspend, elem} end)
24+
25+
{:ok,
26+
%{
27+
reader: reader,
28+
rtmp_sender: opts[:rtmp_sender],
29+
video_track: video_track,
30+
video_reducer: video_reducer,
31+
audio_track: audio_track,
32+
audio_reducer: audio_reducer,
33+
stream_id: opts[:stream_id],
34+
}, {:continue, :send_init_data}}
35+
end
36+
37+
@impl true
38+
def handle_continue(:send_init_data, state) do
39+
ClientSession.send_metadata(state.rtmp_sender, state.stream_id, %{
40+
"width" => state.video_track.width,
41+
"height" => state.video_track.height,
42+
"videocodecid" => 7,
43+
"audiocodecid" => 10,
44+
"audiosamplerate" => state.audio_track.timescale,
45+
"stereo" => true,
46+
"filesize" => 0
47+
})
48+
49+
Process.send_after(self(), :send_video, 0)
50+
Process.send_after(self(), :send_audio, 0)
51+
52+
<<_header::binary-size(8), dcr::binary>> = ExMP4.Box.serialize(state.video_track.priv_data)
53+
[descriptor | _rest] = MPEG4.parse_descriptors(state.audio_track.priv_data.es_descriptor)
54+
asc = descriptor.dec_config_descr.decoder_specific_info
55+
56+
dcr
57+
|> AVCVideoPacket.new(:sequence_header, 0)
58+
|> VideoData.new(:avc, :keyframe)
59+
|> ExFLV.Tag.Serializer.serialize()
60+
|> then(&ClientSession.send_video_data(state.rtmp_sender, state.stream_id, 0, &1))
61+
62+
asc
63+
|> AACAudioData.new(:sequence_header)
64+
|> AudioData.new(:aac, 3, 1, :stereo)
65+
|> ExFLV.Tag.Serializer.serialize()
66+
|> then(&ClientSession.send_audio_data(state.rtmp_sender, state.stream_id, 0, &1))
67+
68+
{:noreply, state}
69+
end
70+
71+
@impl true
72+
def handle_info(:send_video, state) do
73+
case state.video_reducer.({:cont, nil}) do
74+
{:suspended, metadata, video_reducer} ->
75+
frame_type = if(metadata.sync?, do: :keyframe, else: :interframe)
76+
77+
dur = timescalify(metadata.duration, state.video_track.timescale, :millisecond)
78+
Process.send_after(self(), :send_video, dur)
79+
80+
sample = ExMP4.Reader.read_sample(state.reader, metadata)
81+
timestamp = timescalify(sample.dts, state.video_track.timescale, :millisecond)
82+
83+
data =
84+
sample.payload
85+
|> AVCVideoPacket.new(:nalu, sample.pts - sample.dts)
86+
|> VideoData.new(:avc, frame_type)
87+
|> ExFLV.Tag.Serializer.serialize()
88+
89+
ClientSession.send_video_data(state.rtmp_sender, state.stream_id, timestamp, data)
90+
91+
{:noreply, %{state | video_reducer: video_reducer}}
92+
93+
:done ->
94+
{:noreply, state}
95+
end
96+
end
97+
98+
@impl true
99+
def handle_info(:send_audio, state) do
100+
case state.audio_reducer.({:cont, nil}) do
101+
{:suspended, metadata, audio_reducer} ->
102+
dur = timescalify(metadata.duration, state.audio_track.timescale, :millisecond)
103+
Process.send_after(self(), :send_audio, dur)
104+
105+
sample = ExMP4.Reader.read_sample(state.reader, metadata)
106+
timestamp = timescalify(sample.dts, state.audio_track.timescale, :millisecond)
107+
108+
data =
109+
sample.payload
110+
|> AACAudioData.new(:raw)
111+
|> AudioData.new(:aac, 3, 1, :stereo)
112+
|> ExFLV.Tag.Serializer.serialize()
113+
114+
ClientSession.send_audio_data(state.rtmp_sender, state.stream_id, timestamp, data)
115+
116+
{:noreply, %{state | audio_reducer: audio_reducer}}
117+
118+
:done ->
119+
{:noreply, state}
120+
end
121+
end
122+
end
123+
124+
defmodule Handler do
125+
use ExRTMP.Server.Handler
126+
127+
@impl true
128+
def init(file) do
129+
%{publisher: nil, file: file}
130+
end
131+
132+
@impl true
133+
def handle_play(stream_id, _play_command, state) do
134+
{:ok, pid} = Publisher.start_link(file: state.file, rtmp_sender: self(), stream_id: stream_id)
135+
{:ok, %{state | publisher: pid}}
136+
end
137+
138+
@impl true
139+
def handle_delete_stream(_stream_id, state) do
140+
GenServer.stop(state.publisher)
141+
%{state | publisher: nil}
142+
end
143+
end
144+
145+
{:ok, pid} =
146+
ExRTMP.Server.start(
147+
handler: Handler,
148+
handler_options: List.first(System.argv())
149+
)
150+
151+
Process.monitor(pid)
152+
153+
IO.puts("Server started...")
154+
155+
receive do
156+
{:EXIT, _ref, ^pid, :process, _reason} -> :ok
157+
end

lib/ex_rtmp/message.ex

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,6 @@ defmodule ExRTMP.Message do
190190
payload =
191191
case ExRTMP.AMF0.parse(IO.iodata_to_binary(payload)) do
192192
["@setDataFrame", "onMetaData", metadata] ->
193-
IO.inspect(metadata)
194193
%Metadata{data: Map.new(metadata)}
195194

196195
["onMetaData", metadata] ->

lib/ex_rtmp/server.ex

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,6 @@ defmodule ExRTMP.Server do
44
55
The server listens for incoming RTMP client connections and spawns a new
66
`ExRTMP.Server.ClientSession` process for each connected client.
7-
8-
## Options
9-
10-
* `:handler` - The module that will handle the RTMP commands and messages.
11-
This module must implement the `ExRTMP.Server.Handler` behaviour. This
12-
option is required.
13-
14-
* `:handler_options` - A keyword list of options that will be passed to the
15-
handler module when it is started. This option is optional.
167
"""
178

189
use GenServer
@@ -21,20 +12,63 @@ defmodule ExRTMP.Server do
2112

2213
alias ExRTMP.Server.ClientSession
2314

15+
@type start_options :: [
16+
{:port, :inet.port_number()},
17+
{:handler, module()},
18+
{:handler_options, any()}
19+
]
20+
2421
@default_port 1935
2522

23+
@doc """
24+
Starts a new RTMP server.
25+
26+
Check `start_link/1` for available options.
27+
"""
28+
@spec start(start_options()) :: GenServer.on_start()
2629
def start(opts) do
2730
GenServer.start(__MODULE__, opts, name: opts[:name])
2831
end
2932

33+
@doc """
34+
Starts and link a new RTMP server.
35+
36+
## Options
37+
* `port` - The port number where the server should listen. Defaults to: `1935`.
38+
39+
* `handler` - The module that will handle the RTMP commands and messages.
40+
This module must implement the `ExRTMP.Server.Handler` behaviour. This
41+
option is required.
42+
43+
* `handler_options` - A keyword list of options that will be passed to the
44+
handler module when it is started. This option is optional.
45+
"""
46+
@spec start_link(start_options()) :: GenServer.on_start()
3047
def start_link(opts) do
3148
GenServer.start_link(__MODULE__, opts, name: opts[:name])
3249
end
3350

51+
@doc """
52+
Gets the port number of the server.
53+
"""
54+
@spec port(GenServer.name()) :: {:ok, port} | {:error, any()}
55+
def port(server) do
56+
GenServer.call(server, :get_port)
57+
end
58+
59+
@doc """
60+
Stops the server.
61+
"""
62+
@spec stop(GenServer.name()) :: :ok
63+
def stop(server) do
64+
GenServer.call(server, :stop)
65+
end
66+
3467
@impl true
3568
def init(opts) do
36-
{:ok, server_socket} =
37-
:gen_tcp.listen(@default_port, [:binary, packet: :raw, active: false, reuseaddr: true])
69+
port = Keyword.get(opts, :port, @default_port)
70+
71+
{:ok, server_socket} = :gen_tcp.listen(port, [:binary, active: false, reuseaddr: true])
3872

3973
state = %{
4074
socket: server_socket,
@@ -43,9 +77,20 @@ defmodule ExRTMP.Server do
4377
handler_options: opts[:handler_options]
4478
}
4579

46-
pid = spawn_link(fn -> accept_client_connection(state) end)
80+
listener = spawn_link(fn -> accept_client_connection(state) end)
81+
82+
{:ok, %{socket: server_socket, listener: listener}}
83+
end
84+
85+
@impl true
86+
def handle_call(:get_port, _from, state) do
87+
{:reply, :inet.port(state.socket), state}
88+
end
4789

48-
{:ok, %{socket: server_socket, pid: pid}}
90+
@impl true
91+
def handle_call(:stop, _from, state) do
92+
Process.exit(state.listener, :normal)
93+
{:stop, :normal, :ok, state}
4994
end
5095

5196
@impl true
@@ -81,6 +126,9 @@ defmodule ExRTMP.Server do
81126
send(state.pid, {:new_client, pid})
82127
accept_client_connection(state)
83128

129+
{:error, :closed} ->
130+
:ok
131+
84132
{:error, reason} ->
85133
Logger.error("Failed to accept client connection: #{inspect(reason)}")
86134
end

0 commit comments

Comments
 (0)