From a9d33cfe99c9d763f6988a0d66ac5f23db0fa0d7 Mon Sep 17 00:00:00 2001 From: Jakub Pisarek <99591440+sgfn@users.noreply.github.com> Date: Thu, 12 Feb 2026 17:46:38 +0100 Subject: [PATCH 1/9] TCP ICE first iteration --- .tool-versions | 2 - example/peer.exs | 3 +- lib/ex_ice/candidate.ex | 99 ++++++-- lib/ex_ice/ice_agent.ex | 23 +- lib/ex_ice/priv/app.ex | 21 +- lib/ex_ice/priv/candidate.ex | 114 ++++++--- lib/ex_ice/priv/candidate/host.ex | 3 + lib/ex_ice/priv/candidate/prflx.ex | 3 + lib/ex_ice/priv/candidate/relay.ex | 3 + lib/ex_ice/priv/candidate/srflx.ex | 3 + lib/ex_ice/priv/candidate_base.ex | 21 +- lib/ex_ice/priv/checklist.ex | 2 + lib/ex_ice/priv/gatherer.ex | 56 +++-- lib/ex_ice/priv/ice_agent.ex | 78 ++++-- lib/ex_ice/priv/mdns/resolver.ex | 63 ++--- lib/ex_ice/priv/transport.ex | 28 ++- lib/ex_ice/priv/transport/tcp.ex | 61 +++++ lib/ex_ice/priv/transport/tcp_client.ex | 307 ++++++++++++++++++++++++ lib/ex_ice/priv/transport/udp.ex | 14 +- mix.exs | 9 +- mix.lock | 4 +- test/ice_agent_test.exs | 20 +- test/integration/p2p_test.exs | 76 +++--- test/priv/candidate_pair_test.exs | 3 +- test/priv/candidate_test.exs | 100 ++++++-- test/priv/checklist_test.exs | 3 +- test/priv/gatherer_test.exs | 2 +- test/priv/ice_agent_test.exs | 3 + test/support/transport/mock.ex | 14 +- 29 files changed, 915 insertions(+), 223 deletions(-) delete mode 100644 .tool-versions create mode 100644 lib/ex_ice/priv/transport/tcp.ex create mode 100644 lib/ex_ice/priv/transport/tcp_client.ex diff --git a/.tool-versions b/.tool-versions deleted file mode 100644 index 537f99f..0000000 --- a/.tool-versions +++ /dev/null @@ -1,2 +0,0 @@ -erlang 27.3.1 -elixir 1.18.3-otp-27 diff --git a/example/peer.exs b/example/peer.exs index cf0a3bb..9c4769e 100644 --- a/example/peer.exs +++ b/example/peer.exs @@ -95,7 +95,8 @@ defmodule Peer do role = String.to_atom(role) {:ok, pid} = - ICEAgent.start_link(role, + ICEAgent.start_link( + role: role, ip_filter: fn {_, _, _, _} -> true {_, _, _, _, _, _, _, _} -> false diff --git a/lib/ex_ice/candidate.ex b/lib/ex_ice/candidate.ex index 4113b6a..efd34fa 100644 --- a/lib/ex_ice/candidate.ex +++ b/lib/ex_ice/candidate.ex @@ -3,9 +3,10 @@ defmodule ExICE.Candidate do ICE candidate representation. """ - @type type() :: :host | :srflx | :prflx | :relay + @type type :: :host | :srflx | :prflx | :relay + @type tcp_type :: :active | :passive | :so - @type t() :: %__MODULE__{ + @type t :: %__MODULE__{ id: integer(), type: type(), address: :inet.ip_address() | String.t(), @@ -14,7 +15,8 @@ defmodule ExICE.Candidate do foundation: integer(), port: :inet.port_number(), priority: integer(), - transport: :udp | :tcp + transport: :udp | :tcp, + tcp_type: tcp_type() | nil } @enforce_keys [ @@ -24,7 +26,8 @@ defmodule ExICE.Candidate do :port, :foundation, :priority, - :transport + :transport, + :tcp_type ] defstruct @enforce_keys ++ [:base_address, :base_port] @@ -38,7 +41,8 @@ defmodule ExICE.Candidate do priority: priority, address: address, port: port, - type: type + type: type, + tcp_type: tcp_type } = cand # This is based on RFC 8839 sec. 5.1. @@ -54,31 +58,45 @@ defmodule ExICE.Candidate do transport = transport_to_string(transport) address = address_to_string(address) - - "#{foundation} #{component_id} #{transport} #{priority} #{address} #{port} typ #{type} #{related_addr}" - |> String.trim() + tcp_type = tcp_type_to_string(tcp_type) + + [ + foundation, + component_id, + transport, + priority, + address, + port, + "typ", + type, + related_addr, + tcp_type + ] + |> Enum.reject(&(&1 == "")) + |> Enum.join(" ") end @spec unmarshal(String.t()) :: {:ok, t()} | {:error, term()} def unmarshal(string) do - with [f_str, c_str, tr_str, pr_str, a_str, po_str, "typ", ty_str] <- - String.split(string, " ", parts: 8), + with [f_str, c_str, tr_str, pr_str, a_str, po_str, "typ", ty_str | rest] <- + String.split(string, " "), {foundation, ""} <- Integer.parse(f_str), {_component_id, ""} <- Integer.parse(c_str), {:ok, transport} <- parse_transport(String.downcase(tr_str)), {priority, ""} <- Integer.parse(pr_str), {:ok, address} <- parse_address(a_str), {port, ""} <- Integer.parse(po_str), - {:ok, type} <- parse_type(ty_str) do - {:ok, - new( - type, - address: address, - port: port, - priority: priority, - foundation: foundation, - transport: transport - )} + {:ok, type} <- parse_type(ty_str), + {:ok, extra_config} <- parse_optional_attributes(rest) do + config = [ + address: address, + port: port, + priority: priority, + foundation: foundation, + transport: transport + ] + + {:ok, new(type, config ++ extra_config)} else err when is_list(err) -> {:error, :invalid_candidate} err -> err @@ -89,12 +107,17 @@ defmodule ExICE.Candidate do def family(%__MODULE__{address: {_, _, _, _}}), do: :ipv4 def family(%__MODULE__{address: {_, _, _, _, _, _, _, _}}), do: :ipv6 + @spec tcp_type(t()) :: tcp_type() | nil + def tcp_type(%__MODULE__{tcp_type: tt}), do: tt + @doc false @spec new(type(), Keyword.t()) :: t() def new(type, config) when type in [:host, :srflx, :prflx, :relay] do transport = Keyword.get(config, :transport, :udp) address = Keyword.fetch!(config, :address) + tcp_type = if transport == :tcp, do: Keyword.fetch!(config, :tcp_type) + %__MODULE__{ id: ExICE.Priv.Utils.id(), address: address, @@ -104,7 +127,8 @@ defmodule ExICE.Candidate do port: Keyword.fetch!(config, :port), priority: Keyword.fetch!(config, :priority), transport: transport, - type: type + type: type, + tcp_type: tcp_type } end @@ -112,8 +136,13 @@ defmodule ExICE.Candidate do defp address_to_string(address), do: :inet.ntoa(address) defp transport_to_string(:udp), do: "UDP" + defp transport_to_string(:tcp), do: "TCP" + + defp tcp_type_to_string(nil), do: "" + defp tcp_type_to_string(type), do: "tcptype #{type}" defp parse_transport("udp"), do: {:ok, :udp} + defp parse_transport("tcp"), do: {:ok, :tcp} defp parse_transport(_other), do: {:error, :invalid_transport} defp parse_address(address) do @@ -124,9 +153,29 @@ defmodule ExICE.Candidate do end end - defp parse_type("host" <> _rest), do: {:ok, :host} - defp parse_type("srflx" <> _rest), do: {:ok, :srflx} - defp parse_type("prflx" <> _rest), do: {:ok, :prflx} - defp parse_type("relay" <> _rest), do: {:ok, :relay} + defp parse_type("host"), do: {:ok, :host} + defp parse_type("srflx"), do: {:ok, :srflx} + defp parse_type("prflx"), do: {:ok, :prflx} + defp parse_type("relay"), do: {:ok, :relay} defp parse_type(_other), do: {:error, :invalid_type} + + defp parse_optional_attributes(list, config \\ []) + defp parse_optional_attributes([], config), do: {:ok, config} + + defp parse_optional_attributes(["raddr", _2, _3, _4 | rest], config), + do: parse_optional_attributes(rest, config) + + defp parse_optional_attributes(["tcptype", tcp_type | rest], config) do + case parse_tcp_type(tcp_type) do + {:ok, tcp_type} -> parse_optional_attributes(rest, config ++ [tcp_type: tcp_type]) + err -> err + end + end + + defp parse_optional_attributes(_other, config), do: {:ok, config} + + defp parse_tcp_type("active"), do: {:ok, :active} + defp parse_tcp_type("passive"), do: {:ok, :passive} + defp parse_tcp_type("so"), do: {:ok, :so} + defp parse_tcp_type(_other), do: {:error, :invalid_tcp_type} end diff --git a/lib/ex_ice/ice_agent.ex b/lib/ex_ice/ice_agent.ex index b73309d..8b7858d 100644 --- a/lib/ex_ice/ice_agent.ex +++ b/lib/ex_ice/ice_agent.ex @@ -71,7 +71,12 @@ defmodule ExICE.ICEAgent do until role is set, adding remote candidates or gathering local candidates won't possible, and calls to these functions will be ignored. Defaults to `nil`. * `ip_filter` - filter applied when gathering host candidates - * `ports` - ports that will be used when gathering host candidates, otherwise the ports are chosen by the OS + * `transport` - transport protocol to be used. + * `udp` - use UDP only (default). + * `tcp`- use TCP only. Note that relay candidates for TCP transport are not yet supported. + * `ports` - ports that will be used when gathering host candidates, otherwise the ports are chosen by the OS. + Warning: when using `transport: :tcp` and setting this option, make sure that this port range is used only by ICE Agents + from this Erlang VM instance. Refer to the source code (`lib/ex_ice/priv/transport/tcp.ex`) for more info. * `ice_servers` - list of STUN/TURN servers * `ice_transport_policy` - candidate types to be used. * `all` - all ICE candidates will be considered (default). @@ -94,6 +99,7 @@ defmodule ExICE.ICEAgent do @type opts() :: [ role: role() | nil, ip_filter: ip_filter(), + transport: :udp | :tcp, ports: Enumerable.t(non_neg_integer()), ice_servers: [ %{ @@ -325,6 +331,13 @@ defmodule ExICE.ICEAgent do def init(opts) do if Keyword.has_key?(opts, :logger_metadata), do: Logger.metadata(opts[:logger_metadata]) + opts = + if opts[:transport] == :tcp do + opts ++ [transport_module: ExICE.Priv.Transport.TCP] + else + opts ++ [transport_module: ExICE.Priv.Transport.UDP] + end + ice_agent = ExICE.Priv.ICEAgent.new(opts) {:ok, %{ice_agent: ice_agent, pending_eoc: false, pending_remote_cands: MapSet.new()}} end @@ -482,6 +495,14 @@ defmodule ExICE.ICEAgent do {:noreply, %{state | ice_agent: ice_agent}} end + @impl true + def handle_info({:tcp, _socket, _packet}, state) do + # TODO: consider receiving TCP data in the ICE Agent process + # ice_agent = ExICE.Priv.ICEAgent.handle_tcp(state.ice_agent, socket, packet) + # {:noreply, %{state | ice_agent: ice_agent}} + {:noreply, state} + end + @impl true def handle_info({:ex_turn, ref, msg}, state) do ice_agent = ExICE.Priv.ICEAgent.handle_ex_turn_msg(state.ice_agent, ref, msg) diff --git a/lib/ex_ice/priv/app.ex b/lib/ex_ice/priv/app.ex index bb51e79..59a1563 100644 --- a/lib/ex_ice/priv/app.ex +++ b/lib/ex_ice/priv/app.ex @@ -9,16 +9,17 @@ defmodule ExICE.Priv.App do kernel_ver = kernel_version() children = - if kernel_ver >= {9, 1} do - [{ExICE.Priv.MDNS.Resolver, :gen_udp}] - else - Logger.warning(""" - Not starting MDNS resolver as it requires kernel version >= 9.1. - Detected kernel version: #{inspect(kernel_ver)} - """) - - [] - end + [{Registry, keys: :unique, name: ExICE.Priv.Registry}] ++ + if kernel_ver >= {9, 1} do + [{ExICE.Priv.MDNS.Resolver, ExICE.Priv.Transport.UDP}] + else + Logger.warning(""" + Not starting mDNS resolver as it requires kernel version >= 9.1. + Detected kernel version: #{inspect(kernel_ver)} + """) + + [] + end Supervisor.start_link(children, strategy: :one_for_one) end diff --git a/lib/ex_ice/priv/candidate.ex b/lib/ex_ice/priv/candidate.ex index ab053b7..57afca6 100644 --- a/lib/ex_ice/priv/candidate.ex +++ b/lib/ex_ice/priv/candidate.ex @@ -1,9 +1,10 @@ defmodule ExICE.Priv.Candidate do @moduledoc false - @type type() :: :host | :srflx | :prflx | :relay + @type type :: :host | :srflx | :prflx | :relay + @type tcp_type :: :active | :passive | :so | nil - @type t() :: struct() + @type t :: struct() @type config :: [ address: :inet.ip_address() | String.t(), @@ -13,7 +14,8 @@ defmodule ExICE.Priv.Candidate do socket: :inet.socket(), priority: integer(), foundation: integer(), - transport: :udp | :tcp + transport: :udp | :tcp, + tcp_type: tcp_type() ] @callback new(config()) :: t() @@ -22,52 +24,104 @@ defmodule ExICE.Priv.Candidate do @callback family(t()) :: :ipv4 | :ipv6 + @callback tcp_type(t()) :: tcp_type() + @callback to_candidate(t()) :: ExICE.Candidate.t() @callback send_data(t(), :inet.ip_address(), :inet.port_number(), binary()) :: {:ok, t()} | {:error, term(), t()} - @spec priority!(%{:inet.ip_address() => non_neg_integer()}, :inet.ip_address(), type()) :: + @spec priority!( + %{:inet.ip_address() => non_neg_integer()}, + :inet.ip_address(), + type(), + tcp_type() + ) :: non_neg_integer() - def priority!(local_preferences, base_address, type) do - local_preference = Map.fetch!(local_preferences, base_address) - do_priority(local_preference, type) + def priority!(local_preferences, base_address, type, tcp_type) do + other_preference = Map.fetch!(local_preferences, base_address) + do_priority(other_preference, type, tcp_type) end - @spec priority(%{:inet.ip_address() => non_neg_integer()}, :inet.ip_address(), type()) :: + @spec priority( + %{:inet.ip_address() => non_neg_integer()}, + :inet.ip_address(), + type(), + tcp_type() + ) :: {%{:inet.ip_address() => non_neg_integer()}, non_neg_integer()} - def priority(local_preferences, base_address, type) do - local_preference = - Map.get(local_preferences, base_address) || generate_local_preference(local_preferences) + def priority(local_preferences, base_address, type, tcp_type) do + other_preference = + Map.get(local_preferences, base_address) || generate_other_preference(local_preferences) - local_preferences = Map.put(local_preferences, base_address, local_preference) + local_preferences = Map.put(local_preferences, base_address, other_preference) - {local_preferences, do_priority(local_preference, type)} + {local_preferences, do_priority(other_preference, type, tcp_type)} end - defp do_priority(local_preference, type) do - type_preference = - case type do - :host -> 126 - :prflx -> 110 - :srflx -> 100 - :relay -> 0 - end + defp do_priority(other_preference, type, tcp_type) do + type_preference = type_preference(type, tcp_type) + direction_preference = direction_preference(type, tcp_type) + + local_preference = 2 ** 13 * direction_preference + other_preference 2 ** 24 * type_preference + 2 ** 8 * local_preference + 2 ** 0 * (256 - 1) end - defp generate_local_preference(local_preferences, attempts \\ 200) + # TODO: revisit these when implementing UDP+TCP support at the same time + # UDP + defp type_preference(type, nil) do + case type do + :host -> 126 + :prflx -> 110 + :srflx -> 100 + :relay -> 10 + end + end + + # TCP + defp type_preference(type, _tcp_type) do + case type do + :host -> 80 + :prflx -> 70 + # :nat_assisted -> 65 + :srflx -> 60 + # :udp_tunneled -> 45 + :relay -> 0 + end + end + + # UDP + defp direction_preference(_type, nil), do: 7 + + # TCP + defp direction_preference(type, tcp_type) when type in [:host, :udp_tunneled, :relay] do + case tcp_type do + :active -> 6 + :passive -> 4 + :so -> 2 + end + end + + defp direction_preference(_type, tcp_type) do + case tcp_type do + :so -> 6 + :active -> 4 + :passive -> 2 + end + end + + defp generate_other_preference(local_preferences, attempts \\ 200) - defp generate_local_preference(_local_preferences, 0), + defp generate_other_preference(_local_preferences, 0), do: raise("Couldn't generate local preference") - defp generate_local_preference(local_preferences, attempts) do - # this should give us a number from 0 to 2**16-1 - <> = :crypto.strong_rand_bytes(2) + defp generate_other_preference(local_preferences, attempts) do + # 0..8191 + <> = :crypto.strong_rand_bytes(2) - if Map.has_key?(local_preferences, pref) do - generate_local_preference(local_preferences, attempts - 1) + if local_preferences |> Map.values() |> Enum.member?(pref) do + generate_other_preference(local_preferences, attempts - 1) else pref end @@ -76,8 +130,6 @@ defmodule ExICE.Priv.Candidate do @spec foundation(type(), :inet.ip_address() | String.t(), :inet.ip_address() | nil, atom()) :: integer() def foundation(type, ip, stun_turn_ip, transport) do - {type, ip, stun_turn_ip, transport} - |> then(&inspect(&1)) - |> then(&:erlang.crc32(&1)) + {type, ip, stun_turn_ip, transport} |> inspect() |> :erlang.crc32() end end diff --git a/lib/ex_ice/priv/candidate/host.ex b/lib/ex_ice/priv/candidate/host.ex index 606af10..e166ade 100644 --- a/lib/ex_ice/priv/candidate/host.ex +++ b/lib/ex_ice/priv/candidate/host.ex @@ -20,6 +20,9 @@ defmodule ExICE.Priv.Candidate.Host do @impl true def family(cand), do: CandidateBase.family(cand.base) + @impl true + def tcp_type(cand), do: CandidateBase.tcp_type(cand.base) + @impl true def to_candidate(cand), do: CandidateBase.to_candidate(cand.base) diff --git a/lib/ex_ice/priv/candidate/prflx.ex b/lib/ex_ice/priv/candidate/prflx.ex index 34db343..25c8969 100644 --- a/lib/ex_ice/priv/candidate/prflx.ex +++ b/lib/ex_ice/priv/candidate/prflx.ex @@ -20,6 +20,9 @@ defmodule ExICE.Priv.Candidate.Prflx do @impl true def family(cand), do: CandidateBase.family(cand.base) + @impl true + def tcp_type(cand), do: CandidateBase.tcp_type(cand.base) + @impl true def to_candidate(cand), do: CandidateBase.to_candidate(cand.base) diff --git a/lib/ex_ice/priv/candidate/relay.ex b/lib/ex_ice/priv/candidate/relay.ex index aea70f2..2c4be57 100644 --- a/lib/ex_ice/priv/candidate/relay.ex +++ b/lib/ex_ice/priv/candidate/relay.ex @@ -20,6 +20,9 @@ defmodule ExICE.Priv.Candidate.Relay do @impl true def family(cand), do: CandidateBase.family(cand.base) + @impl true + def tcp_type(cand), do: CandidateBase.tcp_type(cand.base) + @impl true def to_candidate(cand), do: CandidateBase.to_candidate(cand.base) diff --git a/lib/ex_ice/priv/candidate/srflx.ex b/lib/ex_ice/priv/candidate/srflx.ex index f32f402..3b61a8c 100644 --- a/lib/ex_ice/priv/candidate/srflx.ex +++ b/lib/ex_ice/priv/candidate/srflx.ex @@ -20,6 +20,9 @@ defmodule ExICE.Priv.Candidate.Srflx do @impl true def family(cand), do: CandidateBase.family(cand.base) + @impl true + def tcp_type(cand), do: CandidateBase.tcp_type(cand.base) + @impl true def to_candidate(cand), do: CandidateBase.to_candidate(cand.base) diff --git a/lib/ex_ice/priv/candidate_base.ex b/lib/ex_ice/priv/candidate_base.ex index 1ac18fd..e1eda2c 100644 --- a/lib/ex_ice/priv/candidate_base.ex +++ b/lib/ex_ice/priv/candidate_base.ex @@ -2,7 +2,7 @@ defmodule ExICE.Priv.CandidateBase do @moduledoc false alias ExICE.Priv.{Candidate, Utils} - @type t() :: %__MODULE__{ + @type t :: %__MODULE__{ id: integer(), address: :inet.ip_address() | String.t(), base_address: :inet.ip_address() | nil, @@ -10,10 +10,11 @@ defmodule ExICE.Priv.CandidateBase do foundation: integer(), port: :inet.port_number(), priority: integer(), - transport: :udp, + transport: :udp | :tcp, transport_module: module(), socket: :inet.socket() | nil, type: Candidate.type(), + tcp_type: Candidate.tcp_type(), closed?: boolean() } @@ -28,11 +29,12 @@ defmodule ExICE.Priv.CandidateBase do :transport_module, :type ] - defstruct @enforce_keys ++ [:base_address, :base_port, :socket, closed?: false] + defstruct @enforce_keys ++ [:base_address, :base_port, :socket, :tcp_type, closed?: false] @spec new(Candidate.type(), Keyword.t()) :: t() def new(type, config) do - transport = :udp + transport_module = Keyword.fetch!(config, :transport_module) + transport = transport_module.transport() address = Keyword.fetch!(config, :address) %__MODULE__{ @@ -44,9 +46,10 @@ defmodule ExICE.Priv.CandidateBase do port: Keyword.fetch!(config, :port), priority: Keyword.fetch!(config, :priority), transport: transport, - transport_module: Keyword.get(config, :transport_module, ExICE.Priv.Transport.UDP), + transport_module: transport_module, socket: Keyword.fetch!(config, :socket), - type: type + type: type, + tcp_type: config[:tcp_type] } end @@ -57,6 +60,9 @@ defmodule ExICE.Priv.CandidateBase do def family(%__MODULE__{address: {_, _, _, _}}), do: :ipv4 def family(%__MODULE__{address: {_, _, _, _, _, _, _, _}}), do: :ipv6 + @spec tcp_type(t()) :: Candidate.tcp_type() + def tcp_type(%__MODULE__{tcp_type: tt}), do: tt + @spec to_candidate(t()) :: ExICE.Candidate.t() def to_candidate(cand) do ExICE.Candidate.new(cand.type, @@ -66,7 +72,8 @@ defmodule ExICE.Priv.CandidateBase do base_port: cand.base_port, foundation: cand.foundation, transport: cand.transport, - priority: cand.priority + priority: cand.priority, + tcp_type: cand.tcp_type ) end end diff --git a/lib/ex_ice/priv/checklist.ex b/lib/ex_ice/priv/checklist.ex index 557b57f..522427d 100644 --- a/lib/ex_ice/priv/checklist.ex +++ b/lib/ex_ice/priv/checklist.ex @@ -63,6 +63,8 @@ defmodule ExICE.Priv.Checklist do @spec prune(t()) :: t() def prune(checklist) do + # TODO: prune pairs where the local TCP candidate is passive, per RFC 6544, sec. 6.2. + # This is done according to RFC 8838 sec. 10 {waiting, in_flight_or_done} = Enum.split_with(checklist, fn {_id, p} -> p.state in [:waiting, :frozen] end) diff --git a/lib/ex_ice/priv/gatherer.ex b/lib/ex_ice/priv/gatherer.ex index b705a5e..9b937b7 100644 --- a/lib/ex_ice/priv/gatherer.ex +++ b/lib/ex_ice/priv/gatherer.ex @@ -41,14 +41,16 @@ defmodule ExICE.Priv.Gatherer do |> Stream.reject(&unsupported_ipv6?(&1)) |> Enum.to_list() - ips - |> Enum.map(&open_socket(gatherer, &1)) + for transport_opts <- gatherer.transport_module.socket_configs(), + ip <- ips do + open_socket(gatherer, ip, transport_opts) + end |> Enum.reject(&(&1 == nil)) |> then(&{:ok, &1}) end end - defp open_socket(gatherer, ip) do + defp open_socket(gatherer, ip, transport_opts) do inet = case ip do {_, _, _, _} -> :inet @@ -56,8 +58,9 @@ defmodule ExICE.Priv.Gatherer do end socket_opts = [ - {:inet_backend, :socket}, - {:ip, ip}, + # We're using the :inet` backend, as `:socket` has issues + # with the `{:reuseport, true}` option added by the TCP Client + {:inet_backend, :inet}, {:active, true}, :binary, inet @@ -66,7 +69,12 @@ defmodule ExICE.Priv.Gatherer do gatherer.ports |> Enum.shuffle() |> Enum.reduce_while(nil, fn port, _ -> - case gatherer.transport_module.open(port, socket_opts) do + case gatherer.transport_module.setup_socket( + ip, + port, + socket_opts, + transport_opts + ) do {:ok, socket} -> {:ok, {^ip, sock_port}} = gatherer.transport_module.sockname(socket) @@ -74,7 +82,7 @@ defmodule ExICE.Priv.Gatherer do "Successfully opened socket for: #{inspect(ip)}:#{sock_port}, socket: #{inspect(socket)}" ) - {:halt, socket} + {:halt, %{socket: socket, transport_opts: transport_opts}} {:error, :eaddrinuse} -> Logger.debug("Address #{inspect(ip)}:#{inspect(port)} in use. Trying next port.") @@ -88,7 +96,7 @@ defmodule ExICE.Priv.Gatherer do end @spec gather_host_candidates(t(), %{:inet.ip_address() => non_neg_integer()}, [ - Transport.socket() + {Transport.socket(), map()} ]) :: [Candidate.t()] def gather_host_candidates(gatherer, local_preferences, sockets) do {local_preferences, cands} = @@ -123,7 +131,14 @@ defmodule ExICE.Priv.Gatherer do stun_family = Utils.family(ip) if cand_family == stun_family do - gatherer.transport_module.send(socket, {ip, port}, binding_request) + # If using TCP transport: + # Communication with STUN servers should be handled differently + # than the rest of the TCP traffic: the messages are not RFC 4571-framed, + # and we want to issue connection attempts from passive candidates as well. + gatherer.transport_module.send(socket, {ip, port}, binding_request, + frame?: false, + connect?: true + ) else Logger.debug(""" Not gathering srflx candidate because of incompatible ip address families. @@ -180,7 +195,12 @@ defmodule ExICE.Priv.Gatherer do if valid_external_ip?(external_ip, host_cand.base.address, external_ips) do priority = - Candidate.priority!(local_preferences, host_cand.base.address, :srflx) + Candidate.priority!( + local_preferences, + host_cand.base.address, + :srflx, + host_cand.base.tcp_type + ) cand = Candidate.Srflx.new( @@ -190,7 +210,8 @@ defmodule ExICE.Priv.Gatherer do base_port: host_cand.base.port, priority: priority, transport_module: host_cand.base.transport_module, - socket: host_cand.base.socket + socket: host_cand.base.socket, + tcp_type: host_cand.base.tcp_type ) Logger.debug("New srflx candidate from NAT mapping: #{inspect(cand)}") @@ -264,10 +285,16 @@ defmodule ExICE.Priv.Gatherer do Keyword.get_values(int, :addr) end - defp create_new_host_candidate(gatherer, local_preferences, socket) do + defp create_new_host_candidate(gatherer, local_preferences, %{ + socket: socket, + transport_opts: transport_opts + }) do {:ok, {sock_ip, sock_port}} = gatherer.transport_module.sockname(socket) - {local_preferences, priority} = Candidate.priority(local_preferences, sock_ip, :host) + tcp_type = transport_opts[:tcp_type] + + {local_preferences, priority} = + Candidate.priority(local_preferences, sock_ip, :host, tcp_type) cand = Candidate.Host.new( @@ -277,7 +304,8 @@ defmodule ExICE.Priv.Gatherer do base_port: sock_port, priority: priority, transport_module: gatherer.transport_module, - socket: socket + socket: socket, + tcp_type: tcp_type ) Logger.debug("New candidate: #{inspect(cand)}") diff --git a/lib/ex_ice/priv/ice_agent.ex b/lib/ex_ice/priv/ice_agent.ex index f854bd3..c61f08a 100644 --- a/lib/ex_ice/priv/ice_agent.ex +++ b/lib/ex_ice/priv/ice_agent.ex @@ -10,7 +10,6 @@ defmodule ExICE.Priv.ICEAgent do ConnCheckHandler, Gatherer, IfDiscovery, - Transport, Utils } @@ -142,9 +141,9 @@ defmodule ExICE.Priv.ICEAgent do {local_ufrag, local_pwd} = generate_credentials() controlling_process = Keyword.fetch!(opts, :controlling_process) + transport_module = Keyword.fetch!(opts, :transport_module) if_discovery_module = opts[:if_discovery_module] || IfDiscovery.Inet - transport_module = opts[:transport_module] || Transport.UDP ip_filter = opts[:ip_filter] || fn _ -> true end ports = opts[:ports] || [0] @@ -358,7 +357,7 @@ defmodule ExICE.Priv.ICEAgent do %{ ice_agent - | sockets: sockets, + | sockets: Enum.map(sockets, fn %{socket: socket} -> socket end), gathering_transactions: gathering_transactions } |> update_gathering_state() @@ -377,7 +376,7 @@ defmodule ExICE.Priv.ICEAgent do %{ ice_agent - | sockets: sockets, + | sockets: Enum.map(sockets, fn %{socket: socket} -> socket end), gathering_transactions: relay_gathering_transactions } |> update_gathering_state() @@ -993,7 +992,7 @@ defmodule ExICE.Priv.ICEAgent do ## PRIV API defp create_srflx_gathering_transactions(stun_servers, sockets) do - for stun_server <- stun_servers, socket <- sockets, into: %{} do + for stun_server <- stun_servers, %{socket: socket} <- sockets, into: %{} do <> = :crypto.strong_rand_bytes(12) t = %{ @@ -1009,8 +1008,18 @@ defmodule ExICE.Priv.ICEAgent do end defp create_relay_gathering_transactions(ice_agent, turn_servers, sockets) do + transport = ice_agent.transport_module.transport() + + turn_servers = + if transport == :udp or turn_servers == [] do + turn_servers + else + Logger.warning("Relay candidates for transport #{inspect(transport)} are unsupported") + [] + end + # TODO revisit this - for turn_server <- turn_servers, socket <- sockets do + for turn_server <- turn_servers, %{socket: socket} <- sockets do with {:ok, client} <- ExTURN.Client.new(turn_server.url, turn_server.username, turn_server.credential), {:ok, {sock_ip, _sock_port}} <- ice_agent.transport_module.sockname(socket), @@ -1217,8 +1226,9 @@ defmodule ExICE.Priv.ICEAgent do # In other case, we might get duplicates. {:ok, {sock_addr, _sock_port}} = ice_agent.transport_module.sockname(tr.socket) + # TODO: set correct tcp_type here {local_preferences, priority} = - Candidate.priority(ice_agent.local_preferences, sock_addr, :relay) + Candidate.priority(ice_agent.local_preferences, sock_addr, :relay, nil) ice_agent = %{ ice_agent @@ -1913,7 +1923,15 @@ defmodule ExICE.Priv.ICEAgent do nil -> {:ok, {base_addr, base_port}} = ice_agent.transport_module.sockname(tr.socket) - priority = Candidate.priority!(ice_agent.local_preferences, base_addr, :srflx) + host_cand = find_host_cand(Map.values(ice_agent.local_cands), tr.socket) + + priority = + Candidate.priority!( + ice_agent.local_preferences, + base_addr, + :srflx, + host_cand.base.tcp_type + ) cand = Candidate.Srflx.new( @@ -1923,7 +1941,8 @@ defmodule ExICE.Priv.ICEAgent do base_port: base_port, priority: priority, transport_module: ice_agent.transport_module, - socket: tr.socket + socket: tr.socket, + tcp_type: host_cand.base.tcp_type ) Logger.debug("New srflx candidate: #{inspect(cand)}") @@ -2208,16 +2227,24 @@ defmodule ExICE.Priv.ICEAgent do defp get_matching_candidates_local(candidates, %c_mod{} = cand) do Enum.filter(candidates, fn c -> - ExICE.Candidate.family(c) == c_mod.family(cand) + ExICE.Candidate.family(c) == c_mod.family(cand) and + tcp_types_ok?(ExICE.Candidate.tcp_type(c), c_mod.tcp_type(cand)) end) end defp get_matching_candidates_remote(candidates, cand) do Enum.filter(candidates, fn %c_mod{} = c -> - c_mod.family(c) == ExICE.Candidate.family(cand) + c_mod.family(c) == ExICE.Candidate.family(cand) and + tcp_types_ok?(c_mod.tcp_type(c), ExICE.Candidate.tcp_type(cand)) end) end + defp tcp_types_ok?(nil, nil), do: true + defp tcp_types_ok?(:so, :so), do: true + defp tcp_types_ok?(:active, :passive), do: true + defp tcp_types_ok?(:passive, :active), do: true + defp tcp_types_ok?(_, _), do: false + defp symmetric?(ice_agent, socket, response_src, conn_check_pair) do local_cand = Map.fetch!(ice_agent.local_cands, conn_check_pair.local_cand_id) remote_cand = Map.fetch!(ice_agent.remote_cands, conn_check_pair.remote_cand_id) @@ -2275,7 +2302,12 @@ defmodule ExICE.Priv.ICEAgent do local_cand = conn_check_local_cand priority = - Candidate.priority!(ice_agent.local_preferences, local_cand.base.base_address, :prflx) + Candidate.priority!( + ice_agent.local_preferences, + local_cand.base.base_address, + :prflx, + local_cand.base.tcp_type + ) cand = Candidate.Prflx.new( @@ -2285,7 +2317,8 @@ defmodule ExICE.Priv.ICEAgent do base_port: local_cand.base.base_port, priority: priority, transport_module: ice_agent.transport_module, - socket: local_cand.base.socket + socket: local_cand.base.socket, + tcp_type: local_cand.base.tcp_type ) Logger.debug("Adding new local prflx candidate: #{inspect(cand)}") @@ -2400,7 +2433,8 @@ defmodule ExICE.Priv.ICEAgent do # but it's return type is not standardized - sometimes it's %{states: [:closed]}, # some other time %{rstates: [:closed], wstates: [:closed]}. case ice_agent.transport_module.sockname(socket) do - {:error, :closed} -> ice_agent + # usually we'd see EINVAL here, but in rare cases it might be EBADF (FD already disposed of) + {:error, _reason} -> ice_agent _ -> do_close_socket(ice_agent, socket) end end @@ -2497,7 +2531,7 @@ defmodule ExICE.Priv.ICEAgent do :ok = ice_agent.transport_module.close(socket) :ok = flush_socket_msg(socket) - {:error, :closed} -> + {:error, _reason} -> # socket already closed :ok end @@ -2652,7 +2686,7 @@ defmodule ExICE.Priv.ICEAgent do end defp change_gathering_state(ice_agent, new_gathering_state, opts \\ []) do - Logger.debug("Gatering state change: #{ice_agent.gathering_state} -> #{new_gathering_state}") + Logger.debug("Gathering state change: #{ice_agent.gathering_state} -> #{new_gathering_state}") if opts[:notify] != false do notify(ice_agent.on_gathering_state_change, {:gathering_state_change, new_gathering_state}) @@ -3094,12 +3128,18 @@ defmodule ExICE.Priv.ICEAgent do {:ok, {sock_addr, _sock_port}} = ice_agent.transport_module.sockname(local_candidate.base.socket) - Candidate.priority!(ice_agent.local_preferences, sock_addr, :prflx) + Candidate.priority!( + ice_agent.local_preferences, + sock_addr, + :prflx, + local_candidate.base.tcp_type + ) else Candidate.priority!( ice_agent.local_preferences, local_candidate.base.base_address, - :prflx + :prflx, + local_candidate.base.tcp_type ) end @@ -3130,7 +3170,7 @@ defmodule ExICE.Priv.ICEAgent do # we get an eperm error but retrying seems to help ¯\_(ツ)_/¯ Logger.debug(""" Couldn't send data to: #{inspect(dst_ip)}:#{dst_port}, reason: #{reason}, cand: #{inspect(local_cand)}. \ - Retyring...\ + Retrying...\ """) do_send(ice_agent, local_cand, dst, data, false) diff --git a/lib/ex_ice/priv/mdns/resolver.ex b/lib/ex_ice/priv/mdns/resolver.ex index ff9a259..8a2a4cf 100644 --- a/lib/ex_ice/priv/mdns/resolver.ex +++ b/lib/ex_ice/priv/mdns/resolver.ex @@ -12,7 +12,7 @@ defmodule ExICE.Priv.MDNS.Resolver do @rtx_timeout_ms 500 @spec start_link(module()) :: GenServer.on_start() - def start_link(transport_module \\ :gen_udp) do + def start_link(transport_module \\ ExICE.Priv.Transport.UDP) do GenServer.start_link(__MODULE__, transport_module, name: __MODULE__) end @@ -28,39 +28,44 @@ defmodule ExICE.Priv.MDNS.Resolver do @impl true def init(transport_module) do - Logger.debug("Starting MDNS Resolver") + Logger.debug("Starting mDNS Resolver") {:ok, %{transport_module: transport_module, cache: %{}}, {:continue, nil}} end @impl true def handle_continue(_, state) do ret = - state.transport_module.open( - # Listen on the port specific to mDNS traffic. - # `add_membership` option only defines an address. - @mdns_port, - mode: :binary, - reuseaddr: true, - active: true, - # Allow other apps to bind to @mdns_port. - # If there are multiple sockets, bound to the same port, - # and subscribed to the same group (in fact, if one socket - # subscribes to some group, all other sockets bound to - # the same port also join this group), all those sockets - # will receive every message. In other words, `reuseport` for - # multicast works differently than for casual sockets. - reuseport: true, - # Support running two ICE agents on a single machine. - # In other case, our request won't be delivered to the mDNS address owner - # running on the same machine (e.g., a web browser). - multicast_loop: true, - # Receive responses - they are sent to the multicast address. - # The second argument specifies interfaces where we should listen - # for multicast traffic. - # This option works on interfaces i.e. it affects all sockets - # bound to the same port. - add_membership: {{224, 0, 0, 251}, {0, 0, 0, 0}} - ) + if state.transport_module.transport() == :udp do + state.transport_module.setup_socket( + nil, + # Listen on the port specific to mDNS traffic. + # `add_membership` option only defines an address. + @mdns_port, + mode: :binary, + reuseaddr: true, + active: true, + # Allow other apps to bind to @mdns_port. + # If there are multiple sockets, bound to the same port, + # and subscribed to the same group (in fact, if one socket + # subscribes to some group, all other sockets bound to + # the same port also join this group), all those sockets + # will receive every message. In other words, `reuseport` for + # multicast works differently than for casual sockets. + reuseport: true, + # Support running two ICE agents on a single machine. + # In other case, our request won't be delivered to the mDNS address owner + # running on the same machine (e.g., a web browser). + multicast_loop: true, + # Receive responses - they are sent to the multicast address. + # The second argument specifies interfaces where we should listen + # for multicast traffic. + # This option works on interfaces i.e. it affects all sockets + # bound to the same port. + add_membership: {{224, 0, 0, 251}, {0, 0, 0, 0}} + ) + else + {:error, :using_tcp_transport} + end case ret do {:ok, socket} -> @@ -69,7 +74,7 @@ defmodule ExICE.Priv.MDNS.Resolver do {:error, reason} -> Logger.warning(""" - Couldn't start MDNS resolver, reason: #{reason}. MDNS candidates won't be resolved. + Couldn't start mDNS resolver, reason: #{reason}. mDNS candidates won't be resolved. """) {:stop, {:shutdown, reason}, state} diff --git a/lib/ex_ice/priv/transport.ex b/lib/ex_ice/priv/transport.ex index bda06a4..5f6dee2 100644 --- a/lib/ex_ice/priv/transport.ex +++ b/lib/ex_ice/priv/transport.ex @@ -1,15 +1,37 @@ defmodule ExICE.Priv.Transport do @moduledoc false - @type socket() :: term() + @type socket :: term() - @callback open(:inet.port_number(), [:gen_udp.open_option()]) :: + @type open_option :: + :inet.inet_backend() + | :inet.address_family() + | {:ip, :inet.socket_address()} + | :inet.socket_setopt() + + @type transport_options :: Keyword.t() + + @callback transport() :: atom() + + @callback socket_configs() :: [transport_options()] + + @callback setup_socket( + :inet.ip_address(), + :inet.port_number(), + [open_option()], + transport_options() + ) :: {:ok, socket()} | {:error, term()} @callback sockname(socket()) :: {:ok, {:inet.ip_address(), :inet.port_number()}} | {:error, term()} - @callback send(socket(), {:inet.ip_address(), :inet.port_number()}, binary()) :: + @callback send( + socket(), + {:inet.ip_address(), :inet.port_number()}, + binary(), + transport_options() + ) :: :ok | {:error, term()} @callback close(socket()) :: :ok diff --git a/lib/ex_ice/priv/transport/tcp.ex b/lib/ex_ice/priv/transport/tcp.ex new file mode 100644 index 0000000..318eab5 --- /dev/null +++ b/lib/ex_ice/priv/transport/tcp.ex @@ -0,0 +1,61 @@ +defmodule ExICE.Priv.Transport.TCP do + @moduledoc false + @behaviour ExICE.Priv.Transport + + require Logger + + alias ExICE.Priv.Transport.TCP.Client + + @impl true + def transport, do: :tcp + + # Obtaining three candidates for each IP address, per RFC 6544, sec. 5.1. + @impl true + def socket_configs, + do: [ + [tcp_type: :passive], + [tcp_type: :so], + [tcp_type: :active] + ] + + @impl true + def setup_socket(ip, port, socket_opts, tp_opts) do + case Registry.lookup(ExICE.Priv.Registry, {ip, port}) do + # This protects us from reusing ports ONLY within the same VM instance + # See `ExICE.Priv.Transport.TCP.Client.setup_socket/5` for more info + [{_pid, _}] -> + # TODO: Consider using another (custom) reason to distinguish from POSIX EADDRINUSE + {:error, :eaddrinuse} + + [] -> + {:ok, pid} = Client.start_link() + Client.setup_socket(pid, ip, port, socket_opts, tp_opts) + end + end + + @impl true + defdelegate sockname(socket), to: :inet + + # HACK: using listen sockets here is ugly, but was easier to fit into the existing ICE Agent implementation. + # This should be changed, especially because we're going to want to close the listen sockets + # after the connection is successfully established. + @impl true + def send(listen_socket, dest, packet, tp_opts \\ []) do + with {:ok, local} <- sockname(listen_socket), + [{pid, _}] <- Registry.lookup(ExICE.Priv.Registry, local) do + GenServer.call(pid, {:send, listen_socket, dest, packet, tp_opts}) + else + _ -> {:error, :no_client_process} + end + end + + @impl true + def close(listen_socket) do + with {:ok, local} <- sockname(listen_socket), + [{pid, _}] <- Registry.lookup(ExICE.Priv.Registry, local) do + GenServer.call(pid, {:close, listen_socket}) + else + _ -> {:error, :no_client_process} + end + end +end diff --git a/lib/ex_ice/priv/transport/tcp_client.ex b/lib/ex_ice/priv/transport/tcp_client.ex new file mode 100644 index 0000000..72ebf77 --- /dev/null +++ b/lib/ex_ice/priv/transport/tcp_client.ex @@ -0,0 +1,307 @@ +defmodule ExICE.Priv.Transport.TCP.Client do + @moduledoc false + + use GenServer + + require Logger + + alias ExICE.Priv.Transport + + @connect_timeout_ms 500 + + @spec start_link(Keyword.t()) :: GenServer.on_start() + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts ++ [ice_agent: self()]) + end + + @spec setup_socket( + GenServer.server(), + :inet.ip_address(), + :inet.port_number(), + [Transport.open_option()], + Transport.transport_options() + ) :: {:ok, Transport.socket()} | {:error, term()} + def setup_socket(pid, ip, port, socket_opts, tp_opts) do + GenServer.call(pid, {:setup_socket, ip, port, socket_opts, tp_opts}) + end + + # HACK: using listen sockets here is ugly, but was easier to fit into the existing ICE Agent implementation. + # This should be changed, especially because we're going to want to close the listen sockets + # after the connection is successfully established. + @spec send( + GenServer.server(), + Transport.socket(), + {:inet.ip_address(), :inet.port_number()}, + binary(), + Transport.transport_options() + ) :: :ok | {:error, term()} + def send(pid, listen_socket, dest, packet, tp_opts \\ []) do + GenServer.call(pid, {:send, listen_socket, dest, packet, tp_opts}) + end + + @spec close(GenServer.server(), Transport.socket()) :: :ok + def close(pid, listen_socket) do + GenServer.call(pid, {:close, listen_socket}) + end + + @impl true + def init(opts) do + state = %{ + ice_agent: Keyword.fetch!(opts, :ice_agent), + listen_socket: nil, + socket_opts: nil, + tcp_type: nil, + connections: %{} + } + + {:ok, state} + end + + @impl true + def handle_call({:setup_socket, ip, port, socket_opts, tp_opts}, _from, state) do + # * For TCP ICE to work, in certain cases we need to be able to both listen + # and make connection attempts from the same socket. + # * The OS will not allow that unless we set SO_REUSEADDR/SO_REUSEPORT, which we do here. + # * However, this means the OS will no longer protect us from binding to the same address+port + # when we DON'T want that to happen. + # * If `ice_agent.ports == [0]`, this has no impact: the OS will use a different ephemeral port every time. + # * Otherwise, we run into a problem: the user has specified a port range (i.e. `50000..50500`), + # and we'd need to explicitly ask the OS to list currently bound sockets to determine + # whether we can use a certain port number from that range. + # * We try to alleviate this issue to an extent by checking against the sockets which we know of, + # those present in `ExICE.Priv.Registry`. + # * This, however, does not protect us from binding to sockets opened with SO_REUSEPORT, + # that are in use by another OS process with the same effective UID as this one, + # which can happen i.e. when we run ICE Agents in two separate VM instances. + socket_opts = socket_opts ++ [ip: ip, reuseport: true] + tcp_type = Keyword.fetch!(tp_opts, :tcp_type) + + case :gen_tcp.listen(port, socket_opts) do + {:ok, listen_socket} -> + {:ok, {^ip, _sock_port} = local} = :inet.sockname(listen_socket) + + # Always claim the port, but don't accept incoming connections in :active mode + if tcp_type in [:passive, :so] do + pid = self() + spawn_link(fn -> acceptor_loop(listen_socket, pid) end) + end + + state = + %{ + state + | listen_socket: listen_socket, + socket_opts: socket_opts, + tcp_type: tcp_type, + connections: %{} + } + + {:ok, _} = Registry.register(ExICE.Priv.Registry, local, self()) + + {:reply, {:ok, listen_socket}, state} + + {:error, _reason} = err -> + {:reply, err, state} + end + end + + @impl true + def handle_call({:send, listen_socket, dest, packet, tp_opts}, _from, state) do + {:ok, src} = :inet.sockname(listen_socket) + + state = + if state.connections[dest] == nil and + Keyword.get(tp_opts, :connect?, state.tcp_type in [:active, :so]) do + try_connect(state, src, dest, tp_opts) + else + state + end + + case state.connections[dest] do + %{socket: socket, frame?: frame?} -> + {:reply, do_send(socket, packet, frame?), state} + + nil -> + if state.tcp_type == :passive do + Logger.debug("Not sending data from a passive candidate that isn't connected") + # We're lying here to make the rest of the logic (kinda) work + {:reply, :ok, state} + else + {:reply, {:error, :enotconn}, state} + end + end + end + + @impl true + def handle_call({:close, listen_socket}, _from, state) do + # TODO: revisit the closing logic + :gen_tcp.close(listen_socket) + Enum.each(state.connections, fn {_, %{socket: socket}} -> :gen_tcp.close(socket) end) + + {:stop, :normal, :ok, state} + end + + @impl true + def handle_info({:connected, socket}, state) do + {:ok, remote} = :inet.peername(socket) + + conn_state = %{ + socket: socket, + recv_buffer: <<>>, + frame?: true + } + + state = put_in(state, [:connections, remote], conn_state) + + {:noreply, state} + end + + # TODO: consider receiving TCP data in the ICEAgent process + @impl true + def handle_info({:tcp, socket, packet}, state) do + {:ok, {src_ip, src_port} = remote} = :inet.peername(socket) + + conn_state = state.connections[remote] + + cond do + is_nil(conn_state) -> + Logger.warning("Received TCP data on unknown connection, dropping") + {:noreply, state} + + conn_state.frame? -> + # Framing according to RFC 4571 + previous = + case conn_state.recv_buffer do + nil -> <<>> + data -> data + end + + case previous <> packet do + <> -> + # HACK: this is dirty and means that, with framing, we're miscalculating + # the bytes_sent and bytes_received counters + send(state.ice_agent, {:udp, state.listen_socket, src_ip, src_port, data}) + state = put_in(state, [:connections, remote, :recv_buffer], <<>>) + + if rest != <<>> do + handle_info({:tcp, socket, rest}, state) + else + {:noreply, state} + end + + data -> + state = put_in(state, [:connections, remote, :recv_buffer], data) + {:noreply, state} + end + + true -> + send(state.ice_agent, {:udp, state.listen_socket, src_ip, src_port, packet}) + {:noreply, state} + end + end + + @impl true + def handle_info({:tcp_closed, socket}, state) do + connections = Map.reject(state.connections, fn {_, %{socket: s}} -> s == socket end) + + {:noreply, %{state | connections: connections}} + end + + defp try_connect(state, local, remote, tp_opts) do + {local_ip, local_port} = local + {remote_ip, remote_port} = remote + + # TODO: determine how big of a timeout we should use here + case :gen_tcp.connect( + remote_ip, + remote_port, + state.socket_opts ++ [port: local_port], + @connect_timeout_ms + ) do + {:ok, socket} -> + Logger.debug(""" + Successfully initiated new connection. + Local: #{inspect(local_ip)}:#{inspect(local_port)} + Remote: #{inspect(remote_ip)}:#{inspect(remote_port)} + Socket: #{inspect(socket)} + """) + + conn_state = %{ + socket: socket, + recv_buffer: <<>>, + frame?: Keyword.get(tp_opts, :frame?, true) + } + + put_in(state, [:connections, remote], conn_state) + + {:error, :eaddrinuse} -> + # This happens with SO candidates, when the acceptor loop accepted the incoming connection already, + # but we have yet to process the relevant message + Logger.debug("Unable to initiate connection, we're already connected") + + receive do + {:connected, _} = msg -> + {:noreply, state} = handle_info(msg, state) + state + after + 50 -> state + end + + other -> + Logger.debug("Unable to initiate connection, reason: #{inspect(other)}") + state + end + end + + defp acceptor_loop(listen_socket, pid) do + {:ok, {sock_ip, sock_port}} = :inet.sockname(listen_socket) + + case :gen_tcp.accept(listen_socket) do + {:ok, socket} -> + :ok = :gen_tcp.controlling_process(socket, pid) + send(pid, {:connected, socket}) + + {:ok, {peer_ip, peer_port}} = :inet.peername(socket) + + Logger.debug(""" + Accepted new incoming connection. + Local: #{inspect(sock_ip)}:#{inspect(sock_port)} + Remote: #{inspect(peer_ip)}:#{inspect(peer_port)} + Listen socket: #{inspect(listen_socket)} + Socket: #{inspect(socket)} + """) + + acceptor_loop(listen_socket, pid) + + {:error, :closed} -> + Logger.debug(""" + TCP listen socket closed. + Local: #{inspect(sock_ip)}:#{inspect(sock_port)} + Listen socket: #{inspect(listen_socket)} + """) + + :ok + + # TODO: should we keep accepting in this case? + {:error, reason} -> + Logger.debug(""" + TCP listen socket accept failed with reason: #{inspect(reason)}. + Local: #{inspect(sock_ip)}:#{inspect(sock_port)} + Listen socket: #{inspect(listen_socket)} + """) + + acceptor_loop(listen_socket, pid) + end + end + + defp do_send(socket, packet, frame?) do + data = + if frame? do + # RFC 4571 + <> + else + packet + end + + :gen_tcp.send(socket, data) + end +end diff --git a/lib/ex_ice/priv/transport/udp.ex b/lib/ex_ice/priv/transport/udp.ex index 5393431..ba178ff 100644 --- a/lib/ex_ice/priv/transport/udp.ex +++ b/lib/ex_ice/priv/transport/udp.ex @@ -3,13 +3,23 @@ defmodule ExICE.Priv.Transport.UDP do @behaviour ExICE.Priv.Transport @impl true - defdelegate open(port, opts), to: :gen_udp + def transport, do: :udp + + # Obtaining one candidate for each IP address + @impl true + def socket_configs, do: [[]] + + @impl true + def setup_socket(ip, port, socket_opts, _tp_opts \\ []) do + ip_opt = if ip, do: [ip: ip], else: [] + :gen_udp.open(port, socket_opts ++ ip_opt) + end @impl true defdelegate sockname(socket), to: :inet @impl true - defdelegate send(socket, dest, packet), to: :gen_udp + def send(socket, dest, packet, _tp_opts \\ []), do: :gen_udp.send(socket, dest, packet) @impl true defdelegate close(socket), to: :gen_udp diff --git a/mix.exs b/mix.exs index c494ebe..7054b3d 100644 --- a/mix.exs +++ b/mix.exs @@ -20,8 +20,13 @@ defmodule ExICE.MixProject do source_url: @source_url, # code coverage - test_coverage: [tool: ExCoveralls], - preferred_cli_env: [ + test_coverage: [tool: ExCoveralls] + ] + end + + def cli do + [ + preferred_envs: [ coveralls: :test, "coveralls.detail": :test, "coveralls.post": :test, diff --git a/mix.lock b/mix.lock index d6fbd93..50f4d94 100644 --- a/mix.lock +++ b/mix.lock @@ -1,10 +1,10 @@ %{ "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "credo": {:hex, :credo, "1.7.10", "6e64fe59be8da5e30a1b96273b247b5cf1cc9e336b5fd66302a64b25749ad44d", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "71fbc9a6b8be21d993deca85bf151df023a3097b01e09a2809d460348561d8cd"}, - "dialyxir": {:hex, :dialyxir, "1.4.5", "ca1571ac18e0f88d4ab245f0b60fa31ff1b12cbae2b11bd25d207f865e8ae78a", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b0fb08bb8107c750db5c0b324fa2df5ceaa0f9307690ee3c1f6ba5b9eb5d35c3"}, + "dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"}, "earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, - "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, + "erlex": {:hex, :erlex, "0.2.8", "cd8116f20f3c0afe376d1e8d1f0ae2452337729f68be016ea544a72f767d9c12", [:mix], [], "hexpm", "9d66ff9fedf69e49dc3fd12831e12a8a37b76f8651dd21cd45fcf5561a8a7590"}, "ex_doc": {:hex, :ex_doc, "0.35.1", "de804c590d3df2d9d5b8aec77d758b00c814b356119b3d4455e4b8a8687aecaf", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "2121c6402c8d44b05622677b761371a759143b958c6c19f6558ff64d0aed40df"}, "ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"}, "ex_turn": {:hex, :ex_turn, "0.2.0", "4e1f9b089e9a5ee44928d12370cc9ea7a89b84b2f6256832de65271212eb80de", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "08e884f0af2c4a147e3f8cd4ffe33e3452a256389f0956e55a8c4d75bf0e74cd"}, diff --git a/test/ice_agent_test.exs b/test/ice_agent_test.exs index 2d67791..544b5a3 100644 --- a/test/ice_agent_test.exs +++ b/test/ice_agent_test.exs @@ -16,18 +16,20 @@ defmodule ExICE.ICEAgentTest do end test "gather_candidates/1" do - {:ok, agent} = ICEAgent.start_link(role: :controlling) - :ok = ICEAgent.gather_candidates(agent) + for transport <- [:udp, :tcp] do + {:ok, agent} = ICEAgent.start_link(role: :controlling, transport: transport) + :ok = ICEAgent.gather_candidates(agent) - assert_receive {:ex_ice, ^agent, {:gathering_state_change, :gathering}} - assert_receive {:ex_ice, ^agent, {:gathering_state_change, :complete}} + assert_receive {:ex_ice, ^agent, {:gathering_state_change, :gathering}} + assert_receive {:ex_ice, ^agent, {:gathering_state_change, :complete}} - :ok = ICEAgent.restart(agent) - assert_receive {:ex_ice, ^agent, {:gathering_state_change, :new}} + :ok = ICEAgent.restart(agent) + assert_receive {:ex_ice, ^agent, {:gathering_state_change, :new}} - :ok = ICEAgent.gather_candidates(agent) - assert_receive {:ex_ice, ^agent, {:gathering_state_change, :gathering}} - assert_receive {:ex_ice, ^agent, {:gathering_state_change, :complete}} + :ok = ICEAgent.gather_candidates(agent) + assert_receive {:ex_ice, ^agent, {:gathering_state_change, :gathering}} + assert_receive {:ex_ice, ^agent, {:gathering_state_change, :complete}} + end end test "get_stats/1" do diff --git a/test/integration/p2p_test.exs b/test/integration/p2p_test.exs index d1cd217..3511fde 100644 --- a/test/integration/p2p_test.exs +++ b/test/integration/p2p_test.exs @@ -36,8 +36,8 @@ defmodule ExICE.Integration.P2PTest do a1_fd = File.open!(Path.join([tmp_dir, "a1_recv_data"]), [:append]) a2_fd = File.open!(Path.join([tmp_dir, "a2_recv_data"]), [:append]) - a1_status = %{fd: a1_fd, completed: false, data_recv: false} - a2_status = %{fd: a2_fd, completed: false, data_recv: false} + a1_status = %{fd: a1_fd, data_send: false, completed: false, data_recv: false} + a2_status = %{fd: a2_fd, data_send: false, completed: false, data_recv: false} assert p2p(agent1, agent2, a1_status, a2_status) @@ -53,8 +53,8 @@ defmodule ExICE.Integration.P2PTest do a1_fd = File.open!(Path.join([tmp_dir, "a1_restart_recv_data"]), [:append]) a2_fd = File.open!(Path.join([tmp_dir, "a2_restart_recv_data"]), [:append]) - a1_status = %{fd: a1_fd, completed: false, data_recv: false} - a2_status = %{fd: a2_fd, completed: false, data_recv: false} + a1_status = %{fd: a1_fd, data_send: false, completed: false, data_recv: false} + a2_status = %{fd: a2_fd, data_send: false, completed: false, data_recv: false} flush_ice_mailbox() @@ -117,8 +117,8 @@ defmodule ExICE.Integration.P2PTest do a1_fd = File.open!(Path.join([tmp_dir, "a1_recv_data"]), [:append]) a2_fd = File.open!(Path.join([tmp_dir, "a2_recv_data"]), [:append]) - a1_status = %{fd: a1_fd, completed: false, data_recv: false} - a2_status = %{fd: a2_fd, completed: false, data_recv: false} + a1_status = %{fd: a1_fd, data_send: false, completed: false, data_recv: false} + a2_status = %{fd: a2_fd, data_send: false, completed: false, data_recv: false} assert p2p(agent1, agent2, a1_status, a2_status) @@ -172,8 +172,8 @@ defmodule ExICE.Integration.P2PTest do a1_fd = File.open!(Path.join([tmp_dir, "a1_recv_data"]), [:append]) a2_fd = File.open!(Path.join([tmp_dir, "a2_recv_data"]), [:append]) - a1_status = %{fd: a1_fd, completed: false, data_recv: false} - a2_status = %{fd: a2_fd, completed: false, data_recv: false} + a1_status = %{fd: a1_fd, data_send: false, completed: false, data_recv: false} + a2_status = %{fd: a2_fd, data_send: false, completed: false, data_recv: false} assert p2p(agent1, agent2, a1_status, a2_status) @@ -200,17 +200,27 @@ defmodule ExICE.Integration.P2PTest do ICEAgent.end_of_candidates(agent2) p2p(agent1, agent2, a1_status, a2_status) - {:ex_ice, ^agent1, {:connection_state_change, :connected}} -> - Logger.info("Connected, sending file...") + {:ex_ice, ^agent1, {:connection_state_change, new_state}} -> + Logger.info("Conn state change: #{inspect(new_state)}") - Task.start(fn -> - File.stream!("./test/fixtures/lotr.txt", [], 1000) - |> Stream.each(fn chunk -> ICEAgent.send_data(agent1, chunk) end) - |> Stream.run() + a1_status = + if new_state in [:connected, :completed] and not a1_status.data_send do + Logger.info("Sending file...") - ICEAgent.send_data(agent1, "eof") - end) + Task.start(fn -> + File.stream!("./test/fixtures/lotr.txt", [], 1000) + |> Stream.each(fn chunk -> ICEAgent.send_data(agent1, chunk) end) + |> Stream.run() + ICEAgent.send_data(agent1, "eof") + end) + + %{a1_status | data_send: true} + else + a1_status + end + + a1_status = %{a1_status | completed: new_state == :completed} p2p(agent1, agent2, a1_status, a2_status) {:ex_ice, ^agent1, {:data, "eof"}} -> @@ -220,11 +230,6 @@ defmodule ExICE.Integration.P2PTest do :ok = IO.binwrite(a1_status.fd, data) p2p(agent1, agent2, a1_status, a2_status) - {:ex_ice, ^agent1, {:connection_state_change, :completed}} -> - Logger.info("Completed") - a1_status = %{a1_status | completed: true} - p2p(agent1, agent2, a1_status, a2_status) - {:ex_ice, ^agent2, {:new_candidate, cand}} -> ICEAgent.add_remote_candidate(agent1, cand) p2p(agent1, agent2, a1_status, a2_status) @@ -233,22 +238,27 @@ defmodule ExICE.Integration.P2PTest do ICEAgent.end_of_candidates(agent1) p2p(agent1, agent2, a1_status, a2_status) - {:ex_ice, ^agent2, {:connection_state_change, :completed}} -> - Logger.info("Completed") - a2_status = %{a2_status | completed: true} - p2p(agent1, agent2, a1_status, a2_status) + {:ex_ice, ^agent2, {:connection_state_change, new_state}} -> + Logger.info("Conn state change: #{inspect(new_state)}") + + a2_status = + if new_state in [:connected, :completed] and not a2_status.data_send do + Logger.info("Sending file...") - {:ex_ice, ^agent2, {:connection_state_change, :connected}} -> - Logger.info("Connected, sending file...") + Task.start(fn -> + File.stream!("./test/fixtures/lotr.txt", [], 1000) + |> Stream.each(fn chunk -> ICEAgent.send_data(agent2, chunk) end) + |> Stream.run() - Task.start(fn -> - File.stream!("./test/fixtures/lotr.txt", [], 1000) - |> Stream.each(fn chunk -> ICEAgent.send_data(agent2, chunk) end) - |> Stream.run() + ICEAgent.send_data(agent2, "eof") + end) - ICEAgent.send_data(agent2, "eof") - end) + %{a2_status | data_send: true} + else + a2_status + end + a2_status = %{a2_status | completed: new_state == :completed} p2p(agent1, agent2, a1_status, a2_status) {:ex_ice, ^agent2, {:data, "eof"}} -> diff --git a/test/priv/candidate_pair_test.exs b/test/priv/candidate_pair_test.exs index a6e0a2f..8018c9f 100644 --- a/test/priv/candidate_pair_test.exs +++ b/test/priv/candidate_pair_test.exs @@ -14,7 +14,8 @@ defmodule ExICE.Priv.CandidatePairTest do base_address: addr1, base_port: port1, priority: 100, - socket: nil + socket: nil, + transport_module: ExICE.Support.Transport.Mock ) addr2 = {192, 168, 1, 2} diff --git a/test/priv/candidate_test.exs b/test/priv/candidate_test.exs index 7ba719f..8df5753 100644 --- a/test/priv/candidate_test.exs +++ b/test/priv/candidate_test.exs @@ -3,41 +3,91 @@ defmodule ExICE.Priv.CandidateTest do alias ExICE.Priv.Candidate - test "priority/3" do - {local_preferences1, prio1} = Candidate.priority(%{}, {192, 168, 0, 1}, :host) + test "priority/4" do + base_addr = {192, 168, 0, 1} - assert map_size(local_preferences1) == 1 - assert Map.has_key?(local_preferences1, {192, 168, 0, 1}) + ## UDP and general behaviour + + {prefs, prio_host_udp} = Candidate.priority(%{}, base_addr, :host, nil) + + assert map_size(prefs) == 1 + assert Map.has_key?(prefs, base_addr) # is idempotent - {^local_preferences1, ^prio1} = - Candidate.priority(local_preferences1, {192, 168, 0, 1}, :host) + {^prefs, ^prio_host_udp} = Candidate.priority(prefs, base_addr, :host, nil) - {local_preferences2, prio2} = Candidate.priority(local_preferences1, {192, 168, 0, 2}, :host) - assert map_size(local_preferences2) == 2 - assert Map.has_key?(local_preferences2, {192, 168, 0, 1}) - assert Map.has_key?(local_preferences2, {192, 168, 0, 2}) - assert prio2 != prio1 + base_addr2 = {192, 168, 0, 2} + {prefs, prio_host_udp_2} = Candidate.priority(prefs, base_addr2, :host, nil) + assert map_size(prefs) == 2 + assert Map.has_key?(prefs, base_addr) + assert Map.has_key?(prefs, base_addr2) + assert prio_host_udp != prio_host_udp_2 # the same base address that created srflx candidate - {^local_preferences2, prio3} = - Candidate.priority(local_preferences2, {192, 168, 0, 1}, :srflx) - - assert prio3 < prio2 - assert prio3 < prio1 + {^prefs, prio_srflx_udp} = Candidate.priority(prefs, base_addr, :srflx, nil) + assert prio_srflx_udp < prio_host_udp + assert prio_srflx_udp < prio_host_udp_2 # the same base address that created relay candidate - {^local_preferences2, prio4} = - Candidate.priority(local_preferences2, {192, 168, 0, 1}, :relay) - - assert prio4 < prio3 + {^prefs, prio_relay_udp} = Candidate.priority(prefs, base_addr, :relay, nil) + assert prio_relay_udp < prio_srflx_udp # the same base address that created prflx candidate - {^local_preferences2, prio5} = - Candidate.priority(local_preferences2, {192, 168, 0, 1}, :prflx) + {^prefs, prio_prflx_udp} = Candidate.priority(prefs, base_addr, :prflx, nil) + assert prio_prflx_udp < prio_host_udp + assert prio_prflx_udp < prio_host_udp_2 + assert prio_prflx_udp > prio_relay_udp + + ## TCP + + {prefs, prio_host_active} = Candidate.priority(prefs, base_addr, :host, :active) + {prefs, prio_host_passive} = Candidate.priority(prefs, base_addr, :host, :passive) + {prefs, prio_host_so} = Candidate.priority(prefs, base_addr, :host, :so) + {prefs, prio_srflx_so} = Candidate.priority(prefs, base_addr, :srflx, :so) + {prefs, prio_srflx_active} = Candidate.priority(prefs, base_addr, :srflx, :active) + {prefs, prio_srflx_passive} = Candidate.priority(prefs, base_addr, :srflx, :passive) + {prefs, prio_relay_so} = Candidate.priority(prefs, base_addr, :relay, :so) + {prefs, prio_relay_active} = Candidate.priority(prefs, base_addr, :relay, :active) + {prefs, prio_relay_passive} = Candidate.priority(prefs, base_addr, :relay, :passive) + {prefs, prio_prflx_so} = Candidate.priority(prefs, base_addr, :prflx, :so) + {prefs, prio_prflx_active} = Candidate.priority(prefs, base_addr, :prflx, :active) + {_prefs, prio_prflx_passive} = Candidate.priority(prefs, base_addr, :prflx, :passive) + + # Direction preference + # For :host, :udp_tunneled, :relay -> Active (6) > Passive (4) > SO (2) + assert prio_host_active > prio_host_passive + assert prio_host_passive > prio_host_so + + assert prio_relay_active > prio_relay_passive + assert prio_relay_passive > prio_relay_so + + # For :srflx, :prfix, :nat_assisted -> SO (6) > Active (4) > Passive (2) + assert prio_srflx_so > prio_srflx_active + assert prio_srflx_active > prio_srflx_passive + + assert prio_prflx_so > prio_prflx_active + assert prio_prflx_active > prio_prflx_passive + + # Type preference + assert prio_host_so > prio_srflx_so + assert prio_host_so > prio_prflx_so + + assert prio_srflx_passive > prio_relay_active + assert prio_prflx_passive > prio_relay_active + + ## UDP + TCP currently planned behaviour + + # Prefer UDP host, prflx, srflx over TCP + assert prio_host_udp > prio_host_active + assert prio_prflx_udp > prio_host_active + assert prio_srflx_udp > prio_host_active + + # Prefer UDP relay over TCP relay + assert prio_relay_udp > prio_relay_active - assert prio5 < prio1 - assert prio5 < prio2 - assert prio5 > prio3 + # Prefer TCP host, prflx, srflx over relay + assert prio_host_so > prio_relay_udp + assert prio_prflx_passive > prio_relay_udp + assert prio_srflx_passive > prio_relay_udp end end diff --git a/test/priv/checklist_test.exs b/test/priv/checklist_test.exs index 8a4ea8f..e516e7e 100644 --- a/test/priv/checklist_test.exs +++ b/test/priv/checklist_test.exs @@ -18,7 +18,8 @@ defmodule ExICE.Priv.ChecklistTest do base_address: local_addr, base_port: local_port, priority: 123, - socket: nil + socket: nil, + transport_module: ExICE.Support.Transport.Mock ) remote_host_cand = diff --git a/test/priv/gatherer_test.exs b/test/priv/gatherer_test.exs index 95162f1..9a383df 100644 --- a/test/priv/gatherer_test.exs +++ b/test/priv/gatherer_test.exs @@ -95,7 +95,7 @@ defmodule ExICE.Priv.GathererTest do {:ok, sockets} = Gatherer.open_sockets(gatherer) - for socket <- sockets do + for %{socket: socket} <- sockets do {:ok, {_ip, port}} = Transport.Mock.sockname(socket) assert port in port_range end diff --git a/test/priv/ice_agent_test.exs b/test/priv/ice_agent_test.exs index 356cceb..6f412c8 100644 --- a/test/priv/ice_agent_test.exs +++ b/test/priv/ice_agent_test.exs @@ -67,6 +67,9 @@ defmodule ExICE.Priv.ICEAgentTest do @impl true def family(cand), do: CandidateBase.family(cand.base) + @impl true + def tcp_type(cand), do: CandidateBase.tcp_type(cand.base) + @impl true def to_candidate(cand), do: CandidateBase.to_candidate(cand.base) diff --git a/test/support/transport/mock.ex b/test/support/transport/mock.ex index b50525e..dbd58f1 100644 --- a/test/support/transport/mock.ex +++ b/test/support/transport/mock.ex @@ -26,7 +26,7 @@ defmodule ExICE.Support.Transport.Mock do end end - @spec recv(ExICE.Transport.socket()) :: binary() | nil + @spec recv(ExICE.Priv.Transport.socket()) :: binary() | nil def recv(ref) do case :ets.lookup(:transport_mock, ref) do [{^ref, %{buf: []} = _socket}] -> @@ -39,7 +39,13 @@ defmodule ExICE.Support.Transport.Mock do end @impl true - def open(port, opts) do + def transport, do: :udp + + @impl true + def socket_configs, do: [[]] + + @impl true + def setup_socket(ip, port, _opts \\ [], _tp_opts \\ []) do unless :transport_mock in :ets.all() do raise """ #{__MODULE__} has not been initialized. @@ -48,8 +54,6 @@ defmodule ExICE.Support.Transport.Mock do """ end - ip = Keyword.get(opts, :ip, {0, 0, 0, 0}) - case port do 0 -> ref = open_ephemeral(ip) @@ -79,7 +83,7 @@ defmodule ExICE.Support.Transport.Mock do end @impl true - def send(ref, _dst, packet) do + def send(ref, _dst, packet, _tp_opts \\ []) do [{^ref, %{state: :open} = socket}] = :ets.lookup(:transport_mock, ref) :ets.insert(:transport_mock, {ref, %{socket | buf: socket.buf ++ [packet]}}) :ok From 22fe54f361c058127ae2f45447755351d3889f9b Mon Sep 17 00:00:00 2001 From: Jakub Pisarek <99591440+sgfn@users.noreply.github.com> Date: Fri, 13 Feb 2026 16:03:13 +0100 Subject: [PATCH 2/9] Add log for timeout, fix RC with data received on unknown connection --- lib/ex_ice/priv/transport/tcp_client.ex | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/ex_ice/priv/transport/tcp_client.ex b/lib/ex_ice/priv/transport/tcp_client.ex index 72ebf77..6310dcd 100644 --- a/lib/ex_ice/priv/transport/tcp_client.ex +++ b/lib/ex_ice/priv/transport/tcp_client.ex @@ -243,7 +243,9 @@ defmodule ExICE.Priv.Transport.TCP.Client do {:noreply, state} = handle_info(msg, state) state after - 50 -> state + 50 -> + Logger.debug("No `:connected` message received in 50 ms, connection state unknown") + state end other -> @@ -257,8 +259,8 @@ defmodule ExICE.Priv.Transport.TCP.Client do case :gen_tcp.accept(listen_socket) do {:ok, socket} -> - :ok = :gen_tcp.controlling_process(socket, pid) send(pid, {:connected, socket}) + :ok = :gen_tcp.controlling_process(socket, pid) {:ok, {peer_ip, peer_port}} = :inet.peername(socket) From 2a70e0762253f682396cf092d38a5717a32e698e Mon Sep 17 00:00:00 2001 From: Jakub Pisarek <99591440+sgfn@users.noreply.github.com> Date: Thu, 19 Feb 2026 12:56:45 +0100 Subject: [PATCH 3/9] Review fixes --- lib/ex_ice/ice_agent.ex | 8 +++----- lib/ex_ice/priv/ice_agent.ex | 12 ++++++++++++ lib/ex_ice/priv/transport/tcp.ex | 5 +++-- lib/ex_ice/priv/transport/tcp_client.ex | 6 +++--- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/lib/ex_ice/ice_agent.ex b/lib/ex_ice/ice_agent.ex index 8b7858d..fabcefc 100644 --- a/lib/ex_ice/ice_agent.ex +++ b/lib/ex_ice/ice_agent.ex @@ -496,11 +496,9 @@ defmodule ExICE.ICEAgent do end @impl true - def handle_info({:tcp, _socket, _packet}, state) do - # TODO: consider receiving TCP data in the ICE Agent process - # ice_agent = ExICE.Priv.ICEAgent.handle_tcp(state.ice_agent, socket, packet) - # {:noreply, %{state | ice_agent: ice_agent}} - {:noreply, state} + def handle_info({:tcp, socket, src_ip, src_port, packet}, state) do + ice_agent = ExICE.Priv.ICEAgent.handle_tcp(state.ice_agent, socket, src_ip, src_port, packet) + {:noreply, %{state | ice_agent: ice_agent}} end @impl true diff --git a/lib/ex_ice/priv/ice_agent.ex b/lib/ex_ice/priv/ice_agent.ex index c61f08a..b145f19 100644 --- a/lib/ex_ice/priv/ice_agent.ex +++ b/lib/ex_ice/priv/ice_agent.ex @@ -920,6 +920,18 @@ defmodule ExICE.Priv.ICEAgent do end end + @spex handle_tcp( + t(), + ExICE.Priv.Transport.socket(), + :inet.ip_address(), + :inet.port_number(), + binary() + ) :: t() + def handle_tcp(ice_agent, socket, src_ip, src_port, packet) do + # This is the same because framing and mapping the sockets is handled in the TCP Client + handle_udp(ice_agent, socket, src_ip, src_port, packet) + end + @spec handle_ex_turn_msg(t(), reference(), ExTURN.Client.notification_message()) :: t() def handle_ex_turn_msg(%__MODULE__{state: :closed} = ice_agent, _, _) do Logger.debug("Received ex_turn message in closed state. Ignoring.") diff --git a/lib/ex_ice/priv/transport/tcp.ex b/lib/ex_ice/priv/transport/tcp.ex index 318eab5..80beb48 100644 --- a/lib/ex_ice/priv/transport/tcp.ex +++ b/lib/ex_ice/priv/transport/tcp.ex @@ -37,13 +37,14 @@ defmodule ExICE.Priv.Transport.TCP do defdelegate sockname(socket), to: :inet # HACK: using listen sockets here is ugly, but was easier to fit into the existing ICE Agent implementation. + # At the moment, `listen_socket` is used to access the specific socket connected to `dest`. # This should be changed, especially because we're going to want to close the listen sockets # after the connection is successfully established. @impl true def send(listen_socket, dest, packet, tp_opts \\ []) do with {:ok, local} <- sockname(listen_socket), [{pid, _}] <- Registry.lookup(ExICE.Priv.Registry, local) do - GenServer.call(pid, {:send, listen_socket, dest, packet, tp_opts}) + Client.send(pid, listen_socket, dest, packet, tp_opts) else _ -> {:error, :no_client_process} end @@ -53,7 +54,7 @@ defmodule ExICE.Priv.Transport.TCP do def close(listen_socket) do with {:ok, local} <- sockname(listen_socket), [{pid, _}] <- Registry.lookup(ExICE.Priv.Registry, local) do - GenServer.call(pid, {:close, listen_socket}) + Client.close(pid, listen_socket) else _ -> {:error, :no_client_process} end diff --git a/lib/ex_ice/priv/transport/tcp_client.ex b/lib/ex_ice/priv/transport/tcp_client.ex index 6310dcd..2d2e758 100644 --- a/lib/ex_ice/priv/transport/tcp_client.ex +++ b/lib/ex_ice/priv/transport/tcp_client.ex @@ -133,7 +133,7 @@ defmodule ExICE.Priv.Transport.TCP.Client do @impl true def handle_call({:close, listen_socket}, _from, state) do - # TODO: revisit the closing logic + # TODO: revisit the closing logic. Listen sockets should be closed after the connection is established :gen_tcp.close(listen_socket) Enum.each(state.connections, fn {_, %{socket: socket}} -> :gen_tcp.close(socket) end) @@ -179,7 +179,7 @@ defmodule ExICE.Priv.Transport.TCP.Client do <> -> # HACK: this is dirty and means that, with framing, we're miscalculating # the bytes_sent and bytes_received counters - send(state.ice_agent, {:udp, state.listen_socket, src_ip, src_port, data}) + send(state.ice_agent, {:tcp, state.listen_socket, src_ip, src_port, data}) state = put_in(state, [:connections, remote, :recv_buffer], <<>>) if rest != <<>> do @@ -194,7 +194,7 @@ defmodule ExICE.Priv.Transport.TCP.Client do end true -> - send(state.ice_agent, {:udp, state.listen_socket, src_ip, src_port, packet}) + send(state.ice_agent, {:tcp, state.listen_socket, src_ip, src_port, packet}) {:noreply, state} end end From a8e916727a4562f656e23cb47e39f8c93ea580d3 Mon Sep 17 00:00:00 2001 From: Jakub Pisarek <99591440+sgfn@users.noreply.github.com> Date: Thu, 19 Feb 2026 12:58:36 +0100 Subject: [PATCH 4/9] @spex --- lib/ex_ice/priv/ice_agent.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ex_ice/priv/ice_agent.ex b/lib/ex_ice/priv/ice_agent.ex index b145f19..51608cf 100644 --- a/lib/ex_ice/priv/ice_agent.ex +++ b/lib/ex_ice/priv/ice_agent.ex @@ -920,7 +920,7 @@ defmodule ExICE.Priv.ICEAgent do end end - @spex handle_tcp( + @spec handle_tcp( t(), ExICE.Priv.Transport.socket(), :inet.ip_address(), From 9f84d4593bfcc516ba744576ac16d84cc7cb61ba Mon Sep 17 00:00:00 2001 From: Jakub Pisarek <99591440+sgfn@users.noreply.github.com> Date: Thu, 19 Feb 2026 13:18:51 +0100 Subject: [PATCH 5/9] tcp transport in p2p integration tests --- test/integration/p2p_test.exs | 212 ++++++++++++++++++++-------------- 1 file changed, 123 insertions(+), 89 deletions(-) diff --git a/test/integration/p2p_test.exs b/test/integration/p2p_test.exs index 3511fde..39a3d14 100644 --- a/test/integration/p2p_test.exs +++ b/test/integration/p2p_test.exs @@ -8,125 +8,159 @@ defmodule ExICE.Integration.P2PTest do @tag :p2p @tag :tmp_dir test "P2P connection", %{tmp_dir: tmp_dir} do - ice_servers = [%{urls: "stun:stun.l.google.com:19302"}] + for {transport, ice_servers} <- [ + {:udp, [%{urls: "stun:stun.l.google.com:19302"}]}, + {:tcp, [%{urls: "stun:stun.freeswitch.org:3478"}]} + ] do + ip_filter = fn + {_, _, _, _, _, _, _, _} -> true + {172, _, _, _} -> true + _other -> true + end - ip_filter = fn - {_, _, _, _, _, _, _, _} -> true - {172, _, _, _} -> true - _other -> true - end + {:ok, agent1} = + ICEAgent.start_link( + role: :controlling, + ip_filter: ip_filter, + ice_servers: ice_servers, + transport: transport + ) - {:ok, agent1} = - ICEAgent.start_link(role: :controlling, ip_filter: ip_filter, ice_servers: ice_servers) + {:ok, agent2} = + ICEAgent.start_link( + role: :controlled, + ip_filter: ip_filter, + ice_servers: [], + transport: transport + ) - {:ok, agent2} = ICEAgent.start_link(role: :controlled, ip_filter: ip_filter, ice_servers: []) + {:ok, a1_ufrag, a1_pwd} = ICEAgent.get_local_credentials(agent1) + {:ok, a2_ufrag, a2_pwd} = ICEAgent.get_local_credentials(agent2) - {:ok, a1_ufrag, a1_pwd} = ICEAgent.get_local_credentials(agent1) - {:ok, a2_ufrag, a2_pwd} = ICEAgent.get_local_credentials(agent2) + :ok = ICEAgent.set_remote_credentials(agent2, a1_ufrag, a1_pwd) + :ok = ICEAgent.set_remote_credentials(agent1, a2_ufrag, a2_pwd) - :ok = ICEAgent.set_remote_credentials(agent2, a1_ufrag, a1_pwd) - :ok = ICEAgent.set_remote_credentials(agent1, a2_ufrag, a2_pwd) + :ok = ICEAgent.gather_candidates(agent1) + :ok = ICEAgent.gather_candidates(agent2) - :ok = ICEAgent.gather_candidates(agent1) - :ok = ICEAgent.gather_candidates(agent2) + assert_receive {:ex_ice, ^agent1, {:gathering_state_change, :gathering}} + assert_receive {:ex_ice, ^agent2, {:gathering_state_change, :gathering}} - assert_receive {:ex_ice, ^agent1, {:gathering_state_change, :gathering}} - assert_receive {:ex_ice, ^agent2, {:gathering_state_change, :gathering}} + a1_fd = File.open!(Path.join([tmp_dir, "a1_recv_data"]), [:append]) + a2_fd = File.open!(Path.join([tmp_dir, "a2_recv_data"]), [:append]) - a1_fd = File.open!(Path.join([tmp_dir, "a1_recv_data"]), [:append]) - a2_fd = File.open!(Path.join([tmp_dir, "a2_recv_data"]), [:append]) + a1_status = %{fd: a1_fd, data_send: false, completed: false, data_recv: false} + a2_status = %{fd: a2_fd, data_send: false, completed: false, data_recv: false} - a1_status = %{fd: a1_fd, data_send: false, completed: false, data_recv: false} - a2_status = %{fd: a2_fd, data_send: false, completed: false, data_recv: false} + assert p2p(agent1, agent2, a1_status, a2_status) - assert p2p(agent1, agent2, a1_status, a2_status) + assert File.read!(Path.join([tmp_dir, "a1_recv_data"])) == + File.read!("./test/fixtures/lotr.txt") - assert File.read!(Path.join([tmp_dir, "a1_recv_data"])) == - File.read!("./test/fixtures/lotr.txt") + assert File.read!(Path.join([tmp_dir, "a2_recv_data"])) == + File.read!("./test/fixtures/lotr.txt") - assert File.read!(Path.join([tmp_dir, "a2_recv_data"])) == - File.read!("./test/fixtures/lotr.txt") + :ok = File.close(a1_fd) + :ok = File.close(a2_fd) - :ok = File.close(a1_fd) - :ok = File.close(a2_fd) + a1_fd = File.open!(Path.join([tmp_dir, "a1_restart_recv_data"]), [:append]) + a2_fd = File.open!(Path.join([tmp_dir, "a2_restart_recv_data"]), [:append]) - a1_fd = File.open!(Path.join([tmp_dir, "a1_restart_recv_data"]), [:append]) - a2_fd = File.open!(Path.join([tmp_dir, "a2_restart_recv_data"]), [:append]) + a1_status = %{fd: a1_fd, data_send: false, completed: false, data_recv: false} + a2_status = %{fd: a2_fd, data_send: false, completed: false, data_recv: false} - a1_status = %{fd: a1_fd, data_send: false, completed: false, data_recv: false} - a2_status = %{fd: a2_fd, data_send: false, completed: false, data_recv: false} + flush_ice_mailbox() - flush_ice_mailbox() + :ok = ICEAgent.restart(agent1) + {:ok, a1_ufrag, a1_pwd} = ICEAgent.get_local_credentials(agent1) + :ok = ICEAgent.set_remote_credentials(agent2, a1_ufrag, a1_pwd) + {:ok, a2_ufrag, a2_pwd} = ICEAgent.get_local_credentials(agent2) + :ok = ICEAgent.set_remote_credentials(agent1, a2_ufrag, a2_pwd) - :ok = ICEAgent.restart(agent1) - {:ok, a1_ufrag, a1_pwd} = ICEAgent.get_local_credentials(agent1) - :ok = ICEAgent.set_remote_credentials(agent2, a1_ufrag, a1_pwd) - {:ok, a2_ufrag, a2_pwd} = ICEAgent.get_local_credentials(agent2) - :ok = ICEAgent.set_remote_credentials(agent1, a2_ufrag, a2_pwd) + assert_receive {:ex_ice, ^agent1, {:gathering_state_change, :new}} + assert_receive {:ex_ice, ^agent1, {:connection_state_change, :checking}} + assert_receive {:ex_ice, ^agent2, {:gathering_state_change, :new}} + assert_receive {:ex_ice, ^agent2, {:connection_state_change, :checking}} - assert_receive {:ex_ice, ^agent1, {:gathering_state_change, :new}} - assert_receive {:ex_ice, ^agent1, {:connection_state_change, :checking}} - assert_receive {:ex_ice, ^agent2, {:gathering_state_change, :new}} - assert_receive {:ex_ice, ^agent2, {:connection_state_change, :checking}} + :ok = ICEAgent.gather_candidates(agent1) + :ok = ICEAgent.gather_candidates(agent2) - :ok = ICEAgent.gather_candidates(agent1) - :ok = ICEAgent.gather_candidates(agent2) + assert p2p(agent1, agent2, a1_status, a2_status) - assert p2p(agent1, agent2, a1_status, a2_status) + assert File.read!(Path.join([tmp_dir, "a1_restart_recv_data"])) == + File.read!("./test/fixtures/lotr.txt") - assert File.read!(Path.join([tmp_dir, "a1_restart_recv_data"])) == - File.read!("./test/fixtures/lotr.txt") + assert File.read!(Path.join([tmp_dir, "a2_restart_recv_data"])) == + File.read!("./test/fixtures/lotr.txt") - assert File.read!(Path.join([tmp_dir, "a2_restart_recv_data"])) == - File.read!("./test/fixtures/lotr.txt") + assert :ok = ICEAgent.close(agent1) + assert :ok = ICEAgent.close(agent2) - assert :ok = ICEAgent.close(agent1) - assert :ok = ICEAgent.close(agent2) + assert :ok = ICEAgent.stop(agent1) + assert :ok = ICEAgent.stop(agent2) - assert :ok = ICEAgent.stop(agent1) - assert :ok = ICEAgent.stop(agent2) + File.rm(Path.join([tmp_dir, "a1_recv_data"])) + File.rm(Path.join([tmp_dir, "a2_recv_data"])) + File.rm(Path.join([tmp_dir, "a1_restart_recv_data"])) + File.rm(Path.join([tmp_dir, "a2_restart_recv_data"])) + end end @tag :tmp_dir @tag :role_conflict test "P2P connection with role conflict", %{tmp_dir: tmp_dir} do - ice_servers = [%{urls: "stun:stun.l.google.com:19302"}] - # ice_servers = [] - - ip_filter = fn - {_, _, _, _, _, _, _, _} -> true - {172, _, _, _} -> true - _other -> true + for {transport, ice_servers} <- [ + {:udp, [%{urls: "stun:stun.l.google.com:19302"}]}, + {:tcp, [%{urls: "stun:stun.freeswitch.org:3478"}]} + ] do + ip_filter = fn + {_, _, _, _, _, _, _, _} -> true + {172, _, _, _} -> true + _other -> true + end + + {:ok, agent1} = + ICEAgent.start_link( + role: :controlled, + ip_filter: ip_filter, + ice_servers: ice_servers, + transport: transport + ) + + {:ok, agent2} = + ICEAgent.start_link( + role: :controlled, + ip_filter: ip_filter, + ice_servers: ice_servers, + transport: transport + ) + + {:ok, a1_ufrag, a1_pwd} = ICEAgent.get_local_credentials(agent1) + {:ok, a2_ufrag, a2_pwd} = ICEAgent.get_local_credentials(agent2) + + :ok = ICEAgent.set_remote_credentials(agent2, a1_ufrag, a1_pwd) + :ok = ICEAgent.set_remote_credentials(agent1, a2_ufrag, a2_pwd) + + :ok = ICEAgent.gather_candidates(agent1) + :ok = ICEAgent.gather_candidates(agent2) + + a1_fd = File.open!(Path.join([tmp_dir, "a1_recv_data"]), [:append]) + a2_fd = File.open!(Path.join([tmp_dir, "a2_recv_data"]), [:append]) + + a1_status = %{fd: a1_fd, data_send: false, completed: false, data_recv: false} + a2_status = %{fd: a2_fd, data_send: false, completed: false, data_recv: false} + + assert p2p(agent1, agent2, a1_status, a2_status) + + assert File.read!(Path.join([tmp_dir, "a1_recv_data"])) == + File.read!("./test/fixtures/lotr.txt") + + assert File.read!(Path.join([tmp_dir, "a2_recv_data"])) == + File.read!("./test/fixtures/lotr.txt") + + File.rm(Path.join([tmp_dir, "a1_recv_data"])) + File.rm(Path.join([tmp_dir, "a2_recv_data"])) end - - {:ok, agent1} = - ICEAgent.start_link(role: :controlled, ip_filter: ip_filter, ice_servers: ice_servers) - - {:ok, agent2} = - ICEAgent.start_link(role: :controlled, ip_filter: ip_filter, ice_servers: ice_servers) - - {:ok, a1_ufrag, a1_pwd} = ICEAgent.get_local_credentials(agent1) - {:ok, a2_ufrag, a2_pwd} = ICEAgent.get_local_credentials(agent2) - - :ok = ICEAgent.set_remote_credentials(agent2, a1_ufrag, a1_pwd) - :ok = ICEAgent.set_remote_credentials(agent1, a2_ufrag, a2_pwd) - - :ok = ICEAgent.gather_candidates(agent1) - :ok = ICEAgent.gather_candidates(agent2) - - a1_fd = File.open!(Path.join([tmp_dir, "a1_recv_data"]), [:append]) - a2_fd = File.open!(Path.join([tmp_dir, "a2_recv_data"]), [:append]) - - a1_status = %{fd: a1_fd, data_send: false, completed: false, data_recv: false} - a2_status = %{fd: a2_fd, data_send: false, completed: false, data_recv: false} - - assert p2p(agent1, agent2, a1_status, a2_status) - - assert File.read!(Path.join([tmp_dir, "a1_recv_data"])) == - File.read!("./test/fixtures/lotr.txt") - - assert File.read!(Path.join([tmp_dir, "a2_recv_data"])) == - File.read!("./test/fixtures/lotr.txt") end @tag :tmp_dir From 06582832058fe474ecc0f460bb791a786c12213d Mon Sep 17 00:00:00 2001 From: Jakub Pisarek <99591440+sgfn@users.noreply.github.com> Date: Thu, 19 Feb 2026 13:43:04 +0100 Subject: [PATCH 6/9] wip --- lib/ex_ice/priv/ice_agent.ex | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/ex_ice/priv/ice_agent.ex b/lib/ex_ice/priv/ice_agent.ex index 51608cf..ff454cc 100644 --- a/lib/ex_ice/priv/ice_agent.ex +++ b/lib/ex_ice/priv/ice_agent.ex @@ -1580,8 +1580,9 @@ defmodule ExICE.Priv.ICEAgent do {:ok, role_attr} <- get_role_attribute(msg), {:ok, use_cand_attr} <- get_use_cand_attribute(msg), {:ok, ice_agent} <- check_req_role_conflict(ice_agent, role_attr) do + # OOOO NIEEEEEE {remote_cand, ice_agent} = - get_or_create_remote_cand(ice_agent, src_ip, src_port, prio_attr) + get_or_create_remote_cand(ice_agent, local_cand, src_ip, src_port, prio_attr) pair = CandidatePair.new(local_cand, remote_cand, ice_agent.role, :waiting, last_seen: now()) @@ -2344,14 +2345,17 @@ defmodule ExICE.Priv.ICEAgent do end end - defp get_or_create_remote_cand(ice_agent, src_ip, src_port, prio_attr) do + defp get_or_create_remote_cand(ice_agent, local_cand, src_ip, src_port, prio_attr) do case find_remote_cand(Map.values(ice_agent.remote_cands), src_ip, src_port) do nil -> + # OOOOO NIEEEEEE + IO.inspect(local_cand, label: :LOCAL_CANDIDATE) cand = ExICE.Candidate.new(:prflx, address: src_ip, port: src_port, - priority: prio_attr.priority + priority: prio_attr.priority, + transport: local_cand.transport ) Logger.debug("Adding new remote prflx candidate: #{inspect(cand)}") From 1bb3ace15830f5fb8a33200b3b38a6833be39dba Mon Sep 17 00:00:00 2001 From: Jakub Pisarek <99591440+sgfn@users.noreply.github.com> Date: Thu, 19 Feb 2026 13:45:46 +0100 Subject: [PATCH 7/9] wip2 --- lib/ex_ice/priv/ice_agent.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ex_ice/priv/ice_agent.ex b/lib/ex_ice/priv/ice_agent.ex index ff454cc..da7b409 100644 --- a/lib/ex_ice/priv/ice_agent.ex +++ b/lib/ex_ice/priv/ice_agent.ex @@ -2355,7 +2355,7 @@ defmodule ExICE.Priv.ICEAgent do address: src_ip, port: src_port, priority: prio_attr.priority, - transport: local_cand.transport + transport: local_cand.base.transport ) Logger.debug("Adding new remote prflx candidate: #{inspect(cand)}") From 337225a2034ed7ba58b5bad0b026492a1d68d748 Mon Sep 17 00:00:00 2001 From: Jakub Pisarek <99591440+sgfn@users.noreply.github.com> Date: Thu, 19 Feb 2026 13:52:36 +0100 Subject: [PATCH 8/9] Fix remote prflx candidate being created with invalid transport --- lib/ex_ice/priv/ice_agent.ex | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/lib/ex_ice/priv/ice_agent.ex b/lib/ex_ice/priv/ice_agent.ex index da7b409..2ae7dce 100644 --- a/lib/ex_ice/priv/ice_agent.ex +++ b/lib/ex_ice/priv/ice_agent.ex @@ -1580,7 +1580,6 @@ defmodule ExICE.Priv.ICEAgent do {:ok, role_attr} <- get_role_attribute(msg), {:ok, use_cand_attr} <- get_use_cand_attribute(msg), {:ok, ice_agent} <- check_req_role_conflict(ice_agent, role_attr) do - # OOOO NIEEEEEE {remote_cand, ice_agent} = get_or_create_remote_cand(ice_agent, local_cand, src_ip, src_port, prio_attr) @@ -2252,11 +2251,12 @@ defmodule ExICE.Priv.ICEAgent do end) end - defp tcp_types_ok?(nil, nil), do: true - defp tcp_types_ok?(:so, :so), do: true - defp tcp_types_ok?(:active, :passive), do: true - defp tcp_types_ok?(:passive, :active), do: true - defp tcp_types_ok?(_, _), do: false + defp tcp_types_ok?(t1, t2), do: get_matching_tcp_type(t1) == t2 + + defp get_matching_tcp_type(nil), do: nil + defp get_matching_tcp_type(:so), do: :so + defp get_matching_tcp_type(:active), do: :passive + defp get_matching_tcp_type(:passive), do: :active defp symmetric?(ice_agent, socket, response_src, conn_check_pair) do local_cand = Map.fetch!(ice_agent.local_cands, conn_check_pair.local_cand_id) @@ -2348,14 +2348,13 @@ defmodule ExICE.Priv.ICEAgent do defp get_or_create_remote_cand(ice_agent, local_cand, src_ip, src_port, prio_attr) do case find_remote_cand(Map.values(ice_agent.remote_cands), src_ip, src_port) do nil -> - # OOOOO NIEEEEEE - IO.inspect(local_cand, label: :LOCAL_CANDIDATE) cand = ExICE.Candidate.new(:prflx, address: src_ip, port: src_port, priority: prio_attr.priority, - transport: local_cand.base.transport + transport: local_cand.base.transport, + tcp_type: get_matching_tcp_type(local_cand.base.tcp_type) ) Logger.debug("Adding new remote prflx candidate: #{inspect(cand)}") From 76490f08b668f106cc0dbfb960820a41b3d21b8a Mon Sep 17 00:00:00 2001 From: Jakub Pisarek <99591440+sgfn@users.noreply.github.com> Date: Thu, 19 Feb 2026 14:00:10 +0100 Subject: [PATCH 9/9] recv_buffer is never nil --- lib/ex_ice/priv/transport/tcp_client.ex | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/lib/ex_ice/priv/transport/tcp_client.ex b/lib/ex_ice/priv/transport/tcp_client.ex index 2d2e758..322069c 100644 --- a/lib/ex_ice/priv/transport/tcp_client.ex +++ b/lib/ex_ice/priv/transport/tcp_client.ex @@ -169,13 +169,7 @@ defmodule ExICE.Priv.Transport.TCP.Client do conn_state.frame? -> # Framing according to RFC 4571 - previous = - case conn_state.recv_buffer do - nil -> <<>> - data -> data - end - - case previous <> packet do + case conn_state.recv_buffer <> packet do <> -> # HACK: this is dirty and means that, with framing, we're miscalculating # the bytes_sent and bytes_received counters