Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ddf1040
feat: support `socket`-based transport in `esockd_proxy_protocol`
keynslug Jun 29, 2025
d34c286
feat: generalize acceptor fsm to support `socket`-based transport
keynslug Jun 29, 2025
0b6a1a6
feat: add `socket`-based listener support
keynslug Jun 29, 2025
940ddcb
fix(examples): fix async recv echo server
keynslug Jun 29, 2025
c3102dc
chore: fix dialyzer warnings
keynslug Jun 29, 2025
072674c
chore: fix Makefile target definition
keynslug Jun 29, 2025
4b087f5
fix(tcpsocket): handle supplied inet family correctly
keynslug Jun 30, 2025
d3c0e29
feat(tcpsocket): implement `esockd_socket:type/1`
keynslug Jun 30, 2025
f1604f5
chore: make `esockd_socket` success typed w/o dialyzer hacks
keynslug Jul 1, 2025
16f5766
feat(tcpsocket): support `tune_fun` option
keynslug Jul 1, 2025
8ce9dfb
fix(proxy): handle partial proxy protocol header for `esockd_socket`s
keynslug Jul 1, 2025
a1913cf
chore(acceptor): update state diagram
keynslug Jul 1, 2025
a82f047
Merge pull request #204 from keynslug/perf/EMQX-14333/erl-socket
keynslug Jul 2, 2025
2f82a7a
chore: EMQ X -> EMQX
terry-xiaoyu Jul 9, 2025
297f358
chore: refine typespecs
keynslug Jul 15, 2025
eb0ffd1
chore: refine naming and add more documentation
keynslug Jul 15, 2025
679fc47
chore(acceptor-fsm): abstract listener socket into callback impl
keynslug Jul 15, 2025
99a4b68
chore: update copyright year
keynslug Jul 15, 2025
aa40618
fix(socket): handle error condition gracefully
keynslug Jul 15, 2025
7468453
chore: refine naming
keynslug Jul 15, 2025
a0ffc5c
fix(acceptor-fsm): do not attempt to close already closed socket
keynslug Jul 15, 2025
8d6e138
fix(acceptor-fsm): handle ignore case on connection start
keynslug Jul 15, 2025
c62ecd1
chore: refine naming
keynslug Jul 15, 2025
382beff
fix(proxy-protocol): properly mark function as test-only
keynslug Jul 15, 2025
7bdd258
feat(socket): document and provide `getopts/2` / `setopts/2` adapters
keynslug Jul 15, 2025
d0c3edc
Merge pull request #205 from keynslug/chore/EMQX-14333/post-review
keynslug Jul 17, 2025
94ec530
fix(socket): return `unsupported` when listening socket options change
keynslug Jul 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ xref:

.PHONY: eunit
eunit: compile
$(REBAR) eunit verbose=truen
$(REBAR) eunit --verbose

.PHONY: ct
ct: compile
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,5 @@ Apache License Version 2.0

## Author

EMQ X Team.
EMQX Team.

2 changes: 1 addition & 1 deletion examples/async_recv/async_recv_echo_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ start_link(Transport, Sock) ->
init([Transport, Sock]) ->
case Transport:wait(Sock) of
{ok, NewSock} ->
Transport:async_recv(Sock, 0, infinity),
Transport:async_recv(NewSock, 0, infinity),
State = #state{transport = Transport, socket = NewSock},
gen_server:enter_loop(?MODULE, [], State);
Error -> Error
Expand Down
2 changes: 1 addition & 1 deletion include/esockd.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
| {pp2_ssl, list(pp2_additional_ssl_field())}).

-record(proxy_socket, {inet :: inet4 | inet6 | 'unix' | 'unspec',
socket :: inet:socket() | #ssl_socket{},
socket :: inet:socket() | #ssl_socket{} | socket:socket(),
src_addr :: inet:ip_address() | undefined,
dst_addr :: inet:ip_address() | undefined,
src_port :: inet:port_number() | undefined,
Expand Down
2 changes: 1 addition & 1 deletion src/esockd.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{mod, {esockd_app, []}},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQ X Team <contact@emqx.io>"]},
{maintainers, ["EMQX Team <contact@emqx.io>"]},
{links, [{"Homepage", "https://emqx.io/"},
{"Github", "https://github.com/emqx/esockd"}
]}
Expand Down
14 changes: 14 additions & 0 deletions src/esockd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
-export([ open/3
, open_udp/3
, open_dtls/3
, open_tcpsocket/3
, close/2
, close/1
%% Legacy API
Expand All @@ -37,6 +38,7 @@
]).

-export([ child_spec/3
, tcpsocket_child_spec/3
, udp_child_spec/3
, dtls_child_spec/3
%% Legacy API
Expand Down Expand Up @@ -163,6 +165,12 @@ tcp_options(Opts) ->
open(Proto, Port, Opts, MFA) ->
open(Proto, Port, merge_mfargs(Opts, MFA)).

%% @doc Open a Socket API based TCP listener
-spec open_tcpsocket(atom(), listen_on(), options())
-> {ok, pid()} | {error, term()}.
open_tcpsocket(Proto, ListenOn, Opts) ->
esockd_sup:start_child(tcpsocket_child_spec(Proto, ListenOn, Opts)).

%% @doc Open a UDP listener
-spec open_udp(atom(), listen_on(), [option()])
-> {ok, pid()} | {error, term()}.
Expand Down Expand Up @@ -221,6 +229,12 @@ child_spec(Proto, ListenOn, Opts) when is_atom(Proto) ->
child_spec(Proto, ListenOn, Opts, MFA) when is_atom(Proto) ->
child_spec(Proto, ListenOn, merge_mfargs(Opts, MFA)).

%% @doc Create a Child spec for a Socket API based TCP listener.
-spec tcpsocket_child_spec(atom(), listen_on(), options())
-> supervisor:child_spec().
tcpsocket_child_spec(Proto, ListenOn, Opts) when is_atom(Proto) ->
esockd_sup:tcpsocket_child_spec(Proto, fixaddr(ListenOn), Opts).

%% @doc Create a Child spec for a UDP Listener.
-spec udp_child_spec(atom(), listen_on(), options())
-> supervisor:child_spec().
Expand Down
134 changes: 134 additions & 0 deletions src/esockd_accept_inet.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2025 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(esockd_accept_inet).

-export([
init/2,
async_accept/1,
async_accept_result/3,
post_accept/2,
sockname/1,
fast_close/1
]).

-export([
mk_tune_socket_fun/1,
tune_socket/2
]).

-type socket() :: esockd_transport:listen_socket().
-type async_ref() :: reference().

-type tune_socket_fun() ::
{fun((socket(), Opts) -> {ok, socket()} | {error, any()}), Opts}.

-record(ctx, {
lsock :: socket(),
sock_mod :: module(),
tune_fun :: tune_socket_fun()
}).

-type ctx() :: #ctx{}.

%%

-spec init(socket(), _Opts) -> ctx().
init(LSock, TuneFun) ->
{ok, SockMod} = inet_db:lookup_socket(LSock),
#ctx{
lsock = LSock,
sock_mod = SockMod,
tune_fun = TuneFun
}.

-spec async_accept(ctx()) ->
{async, async_ref()} | {error, atom()}.
async_accept(#ctx{lsock = LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{async, Ref};
{error, Reason} ->
{error, Reason}
end.

-spec async_accept_result(Message, async_ref(), ctx()) ->
{ok, socket()} | {error, atom()} | Message.
async_accept_result({inet_async, _LSock, Ref, {ok, Sock}}, Ref, _Ctx) ->
{ok, Sock};
async_accept_result({inet_async, _LSock, Ref, {error, Reason}}, Ref, _Ctx) ->
{error, Reason};
async_accept_result(Info, _Ref, _Ctx) ->
Info.

-spec post_accept(socket(), ctx()) -> {ok, esockd_transport, socket()} | {error, atom()}.
post_accept(Sock, #ctx{sock_mod = SockMod, tune_fun = TuneFun}) ->
%% make it look like gen_tcp:accept
inet_db:register_socket(Sock, SockMod),
eval_tune_socket_fun(TuneFun, Sock).

return_socket(Sock) ->
{ok, esockd_transport, Sock}.

eval_tune_socket_fun({Fun, Opts}, Sock) ->
Fun(Sock, Opts).

-spec mk_tune_socket_fun([esockd:option()]) -> tune_socket_fun().
mk_tune_socket_fun(Opts) ->
TuneOpts = [{Name, Val} || {Name, Val} <- Opts,
Name =:= tune_buffer orelse
Name =:= tune_fun],
{fun ?MODULE:tune_socket/2, TuneOpts}.

tune_socket(Sock, []) ->
return_socket(Sock);
tune_socket(Sock, [{tune_buffer, true}|More]) ->
case esockd_transport:getopts(Sock, [sndbuf, recbuf, buffer]) of
{ok, BufSizes} ->
BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
case esockd_transport:setopts(Sock, [{buffer, BufSz}]) of
ok ->
tune_socket(Sock, More);
Error ->
Error
end;
Error ->
Error
end;
tune_socket(Sock, [{tune_fun, {M, F, A}} | More]) ->
%% NOTE: Socket is not part of the argument list, backward compatibility.
case apply(M, F, A) of
ok ->
tune_socket(Sock, More);
Error ->
Error
end.

-spec sockname(ctx()) ->
{ok, {inet:ip_address(), inet:port_number()}} | {error, inet:posix()}.
sockname(#ctx{lsock = LSock}) ->
esockd_transport:sockname(LSock).

-spec fast_close(socket()) -> ok.
fast_close(Sock) ->
try
%% NOTE
%% Port-close leads to a TCP reset which cuts out TCP graceful close overheads.
_ = port_close(Sock),
receive {'EXIT', Sock, _} -> ok after 1 -> ok end
catch
error:_ -> ok
end.
169 changes: 169 additions & 0 deletions src/esockd_accept_socket.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2025 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(esockd_accept_socket).

-export([
init/2,
async_accept/1,
async_accept_result/3,
post_accept/2,
sockname/1,
fast_close/1
]).

-export([
mk_tune_socket_fun/1,
tune_socket/2
]).

-type socket() :: socket:socket().
-type async_ref() :: socket:select_info().

-type tune_socket_fun() ::
{fun((socket(), Opts) -> {ok, socket()} | {error, any()}), Opts}.

-record(ctx, {
lsock :: socket(),
tune_fun :: tune_socket_fun()
}).

-type ctx() :: #ctx{}.

-define(DEFAULT_SOCK_OPTIONS, [{nodelay, true}]).

%%

-spec init(socket(), _Opts) -> ctx().
init(LSock, TuneFun) ->
#ctx{
lsock = LSock,
tune_fun = TuneFun
}.

-spec async_accept(ctx()) ->
{ok, socket()} | {async, async_ref()} | {error, atom()}.
async_accept(#ctx{lsock = LSock}) ->
case socket:accept(LSock, nowait) of
{ok, Sock} ->
{ok, Sock};
{error, Reason} ->
{error, Reason};
{select, {_Info, _Tag, Handle}} ->
{async, Handle}
end.

-spec async_accept_result(Info, async_ref(), _Opts) ->
{ok, socket()} | {error, atom()} | {async, async_ref()} | Info.
async_accept_result({'$socket', LSock, select, Handle}, Handle, _Opts) ->
case socket:accept(LSock, Handle) of
{ok, Sock} ->
{ok, Sock};
{error, Reason} ->
{error, Reason};
{select, {_Info, _Tag, NHandle}} ->
{async, NHandle}
end;
async_accept_result({'$socket', _LSock, abort, {Handle, Reason}}, Handle, _Opts) ->
{error, Reason};
async_accept_result(Info, _Handle, _Opts) ->
Info.

-spec post_accept(socket(), ctx()) -> {ok, esockd_socket, socket()} | {error, atom()}.
post_accept(Sock, #ctx{tune_fun = TuneFun}) ->
eval_tune_socket_fun(Sock, TuneFun).

return_socket(Sock) ->
{ok, esockd_socket, Sock}.

eval_tune_socket_fun(Sock, {Fun, Opts}) ->
Fun(Sock, Opts).

-spec mk_tune_socket_fun([esockd:option()]) -> tune_socket_fun().
mk_tune_socket_fun(Opts) ->
TcpOpts = proplists:get_value(tcp_options, Opts, []),
SockOpts = lists:flatten([sock_opt(O) || O <- merge_sock_defaults(TcpOpts)]),
TuneOpts = [{Name, Val} || {Name, Val} <- Opts,
Name =:= tune_buffer orelse
Name =:= tune_fun],
{fun ?MODULE:tune_socket/2, [{setopts, SockOpts} | TuneOpts]}.

tune_socket(Sock, [{setopts, SockOpts} | Rest]) ->
case esockd_socket:setopts(Sock, SockOpts) of
ok ->
tune_socket(Sock, Rest);
Error ->
Error
end;
tune_socket(Sock, [{tune_buffer, true} | Rest]) ->
try
BufRecv = ensure(socket:getopt(Sock, {socket, rcvbuf})),
Buffer = ensure(socket:getopt(Sock, {otp, rcvbuf})),
Max = max(Buffer, BufRecv),
ok = ensure(socket:setopt(Sock, {otp, rcvbuf}, Max)),
tune_socket(Sock, Rest)
catch
Error -> Error
end;
tune_socket(Sock, [{tune_fun, {M, F, A}} | Rest]) ->
%% NOTE: Socket is not part of the argument list, backward compatibility.
case apply(M, F, A) of
ok ->
tune_socket(Sock, Rest);
Error ->
Error
end;
tune_socket(Sock, _) ->
return_socket(Sock).

ensure(ok) -> ok;
ensure({ok, Result}) -> Result;
ensure(Error) -> throw(Error).

merge_sock_defaults(Opts) ->
esockd:merge_opts(?DEFAULT_SOCK_OPTIONS, Opts).

sock_opt(binary) ->
%% Meaningless.
[];
sock_opt({nodelay, Flag}) ->
{{tcp, nodelay}, Flag};
sock_opt({linger, {Flag, N}}) ->
{{socket, linger}, #{onoff => Flag, linger => N}};
sock_opt({recbuf, Size}) ->
{{socket, rcvbuf}, Size};
sock_opt({sndbuf, Size}) ->
{{socket, sndbuf}, Size};
sock_opt({buffer, Size}) ->
{{otp, rcvbuf}, Size};
sock_opt({reuseaddr, _}) ->
%% Listener option.
[];
sock_opt({backlog, _}) ->
%% Listener option.
[];
sock_opt(_Opt) ->
%% TODO: Ignored, need to notify user.
[].

-spec sockname(ctx()) ->
{ok, {inet:ip_address(), inet:port_number()}} | {error, inet:posix() | closed}.
sockname(#ctx{lsock = LSock}) ->
esockd_socket:sockname(LSock).

-spec fast_close(socket()) -> ok.
fast_close(Sock) ->
esockd_socket:fast_close(Sock).
Loading