Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
611 changes: 490 additions & 121 deletions src/ered_client.erl

Large diffs are not rendered by default.

384 changes: 0 additions & 384 deletions src/ered_connection.erl

This file was deleted.

42 changes: 25 additions & 17 deletions src/ered_parser.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
continue/2]).

-export_type([parse_return/0,
parse_result/0
parse_result/0,
state/0
]).

%%%===================================================================
Expand All @@ -25,10 +26,12 @@

-type parse_result() :: binary() | {error, binary()} | integer() | undefined | [parse_result()] | inf | neg_inf | nan |
float() | true | false | #{parse_result() => parse_result()} | sets:set(parse_result()) |
{attribute, parse_result(), parse_result()} | {push | parse_result()}.
{attribute, parse_result(), parse_result()} | {push, parse_result()}.


-type parse_return() :: {done, parse_result(), #parser_state{}} | {need_more, bytes_needed(), #parser_state{}}.
-opaque state() :: #parser_state{}.
-type parse_return() :: {done, parse_result(), state()} |
{need_more, bytes_needed(), state()} |
{parse_error, any()}.

-if(?OTP_RELEASE >= 24).
-define(sets_new, sets:new([{version, 2}])).
Expand All @@ -41,23 +44,23 @@
%%%===================================================================

%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec init() -> #parser_state{}.
-spec init() -> state().
%%
%% Init empty parser continuation
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
init() ->
#parser_state{}.

%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec next(#parser_state{}) -> parse_return().
-spec next(state()) -> parse_return().
%%
%% Get next result or continuation.
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
next(State) ->
parse(State).

%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec continue(binary(), #parser_state{}) -> parse_return().
-spec continue(binary(), state()) -> parse_return().
%%
%% Feed more data to the parser. Get next result or continuation.
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Expand All @@ -70,16 +73,21 @@ continue(NewData, State) ->
%%%===================================================================

parse(State=#parser_state{data=Data, next=Fun, bytes_needed=Bytes}) ->
case token(Data, Bytes) of
{need_more, LackingBytes} ->
{need_more, LackingBytes, State};
{Token, Rest} ->
case Fun(Token) of
{done, Result} ->
{done, Result, #parser_state{data=Rest}};
{cont, NextFun, NextBytes} ->
parse(#parser_state{data=Rest, next=NextFun, bytes_needed=NextBytes})
end
try
case token(Data, Bytes) of
{need_more, LackingBytes} ->
{need_more, LackingBytes, State};
{Token, Rest} ->
case Fun(Token) of
{done, Result} ->
{done, Result, #parser_state{data=Rest}};
{cont, NextFun, NextBytes} ->
parse(#parser_state{data=Rest, next=NextFun, bytes_needed=NextBytes})
end
end
catch
{parse_error, _Reason} = ParseError ->
ParseError
end.

token(Data, 0) ->
Expand Down
76 changes: 41 additions & 35 deletions test/ered_client_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ request_t() ->

fail_connect_t() ->
{ok,Pid} = ered_client:start_link("127.0.0.1", 0, [{info_pid, self()}]),
{connect_error,econnrefused} = expect_connection_down(Pid),
{connect_error, Reason} = expect_connection_down(Pid),
true = Reason =:= econnrefused orelse Reason =:= eaddrnotavail,
%% make sure there are no more connection down messages
timeout = receive M -> M after 500 -> timeout end.

Expand All @@ -66,7 +67,7 @@ fail_parse_t() ->
Pid ! ered_client:command(Client, [<<"ping">>])
end),
expect_connection_up(Client),
Reason = {recv_exit, {parse_error,{invalid_data,<<"&pong">>}}},
Reason = {parse_error, {invalid_data, <<"&pong">>}},
receive #{msg_type := socket_closed, reason := Reason} -> ok end,
expect_connection_up(Client),
{ok, <<"pong">>} = get_msg().
Expand All @@ -75,18 +76,22 @@ fail_parse_t() ->
server_close_socket_t() ->
{ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]),
{ok, Port} = inet:port(ListenSock),
spawn_link(fun() ->
{ok, Sock} = gen_tcp:accept(ListenSock),
gen_tcp:close(Sock),

%% resend from client
{ok, _Sock2} = gen_tcp:accept(ListenSock),
receive ok -> ok end
end),
ServerPid =
spawn_link(fun() ->
{ok, Sock} = gen_tcp:accept(ListenSock),
receive continue -> ok end,
gen_tcp:close(Sock),

%% resend from client
{ok, _Sock2} = gen_tcp:accept(ListenSock),
receive done -> ok end
end),
Client = start_client(Port),
expect_connection_up(Client),
receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end,
expect_connection_up(Client).
ServerPid ! continue,
receive #{msg_type := socket_closed, reason := tcp_closed} -> ok end,
expect_connection_up(Client),
ServerPid ! done.


%% Suppress warning from command 'bad_request'
Expand All @@ -113,7 +118,7 @@ server_buffer_full_t() ->
Expected = iolist_to_binary(lists:duplicate(5, Ping)),
{ok, Expected} = gen_tcp:recv(Sock, size(Expected)),
%% should be nothing more since only 5 pending
{error, timeout} = gen_tcp:recv(Sock, 0, 0),
?assertEqual({error, timeout}, gen_tcp:recv(Sock, 0, 0)),

timer:sleep(500),

Expand All @@ -127,15 +132,15 @@ server_buffer_full_t() ->

receive ok -> ok end
end),
Client = start_client(Port, [{max_waiting, 5}, {max_pending, 5}, {queue_ok_level,1}]),
Client = start_client(Port, [{batch_size, 16},{max_waiting, 5}, {max_pending, 5}, {queue_ok_level,1}]),
expect_connection_up(Client),

Pid = self(),
[ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)],
receive #{msg_type := queue_full} -> ok end,
{6, {error, queue_overflow}} = get_msg(),
?assertEqual({6, {error, queue_overflow}}, get_msg()),
receive #{msg_type := queue_ok} -> ok end,
[{N, {ok, <<"pong">>}} = get_msg()|| N <- [1,2,3,4,5,7,8,9,10,11]],
[?assertEqual({N, {ok, <<"pong">>}} , get_msg()) || N <- [1,2,3,4,5,7,8,9,10,11]],
no_more_msgs().


Expand Down Expand Up @@ -163,7 +168,7 @@ server_buffer_full_reconnect_t() ->
receive ok -> ok end

end),
Client = start_client(Port, [{max_waiting, 5}, {max_pending, 5}, {queue_ok_level,1}]),
Client = start_client(Port, [{batch_size, 16}, {max_waiting, 5}, {max_pending, 5}, {queue_ok_level,1}]),
expect_connection_up(Client),

Pid = self(),
Expand All @@ -172,7 +177,7 @@ server_buffer_full_reconnect_t() ->
receive #{msg_type := queue_full} -> ok end,
%% 1 message over the limit, first one in queue gets kicked out
{6, {error, queue_overflow}} = get_msg(),
receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end,
receive #{msg_type := socket_closed, reason := tcp_closed} -> ok end,
%% when connection goes down the pending messages will be put in the queue and the queue
%% will overflow kicking out the oldest first
[{N, {error, queue_overflow}} = get_msg() || N <- [1,2,3,4,5]],
Expand Down Expand Up @@ -201,9 +206,9 @@ server_buffer_full_node_goes_down_t() ->
Pid = self(),
[ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)],
receive #{msg_type := queue_full} -> ok end,
{6, {error, queue_overflow}} = get_msg(),
receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end,
[{N, {error, queue_overflow}} = get_msg() || N <- [1,2,3,4,5]],
?assertEqual({6, {error, queue_overflow}}, get_msg()),
receive #{msg_type := socket_closed, reason := tcp_closed} -> ok end,
[?assertEqual({N, {error, queue_overflow}}, get_msg()) || N <- [1,2,3,4,5]],
receive #{msg_type := queue_ok} -> ok end,
receive #{msg_type := connect_error, reason := econnrefused} -> ok end,
receive #{msg_type := node_down_timeout} -> ok end,
Expand Down Expand Up @@ -242,7 +247,7 @@ send_timeout_t() ->
Pid = self(),
ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {reply, Reply} end),
%% this should come after max 1000ms
receive #{msg_type := socket_closed, reason := {recv_exit, timeout}} -> ok after 2000 -> timeout_error() end,
receive #{msg_type := socket_closed, reason := timeout} -> ok after 2000 -> timeout_error() end,
expect_connection_up(Client),
{reply, {ok, <<"pong">>}} = get_msg(),
no_more_msgs().
Expand All @@ -256,15 +261,12 @@ fail_hello_t() ->
{ok, <<"*2\r\n$5\r\nHELLO\r\n$1\r\n3\r\n">>} = gen_tcp:recv(Sock, 0),
ok = gen_tcp:send(Sock, <<"-NOPROTO unsupported protocol version\r\n">>),

%% test resend
{ok, <<"*2\r\n$5\r\nHELLO\r\n$1\r\n3\r\n">>} = gen_tcp:recv(Sock, 0),
ok = gen_tcp:send(Sock, <<"-NOPROTO unsupported protocol version\r\n">>),

Pid ! done
Pid ! done,
{error, closed} = gen_tcp:recv(Sock, 0)
end),
{ok,Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}]),
{init_error, [<<"NOPROTO unsupported protocol version">>]} = expect_connection_down(Client),
receive done -> ok end,
{init_error, [<<"NOPROTO unsupported protocol version">>]} = expect_connection_down(Client),
no_more_msgs().

hello_with_auth_t() ->
Expand Down Expand Up @@ -294,11 +296,13 @@ hello_with_auth_t() ->
"$6\r\nmaster\r\n"
"$7\r\nmodules\r\n"
"*0\r\n">>),
Pid ! done
Pid ! done,
receive ok -> ok end
end),
{ok, _Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()},
{auth, {<<"ali">>, <<"sesame">>}}]),
{ok, Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()},
{auth, {<<"ali">>, <<"sesame">>}}]),
receive done -> ok end,
expect_connection_up(Client),
no_more_msgs().

hello_with_auth_fail_t() ->
Expand Down Expand Up @@ -333,12 +337,14 @@ auth_t() ->
"$6\r\nsesame\r\n">>} = gen_tcp:recv(Sock, 0),
ok = gen_tcp:send(Sock, <<"+OK\r\n">>),

Pid ! done
Pid ! done,
receive ok -> ok end
end),
{ok, _Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()},
{resp_version, 2},
{auth, {<<"ali">>, <<"sesame">>}}]),
{ok, Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()},
{resp_version, 2},
{auth, {<<"ali">>, <<"sesame">>}}]),
receive done -> ok end,
expect_connection_up(Client),
no_more_msgs().

auth_fail_t() ->
Expand Down
19 changes: 12 additions & 7 deletions test/ered_cluster_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ t_hard_failover(_) ->
ct:pal("~p\n", [ered_cluster:command_all(R, [<<"CLUSTER">>, <<"SLOTS">>])]),
ct:pal(os:cmd("docker stop " ++ Pod)),

?MSG(#{msg_type := socket_closed, addr := {"127.0.0.1", Port}, reason := {recv_exit, closed}}),
?MSG(#{msg_type := socket_closed, addr := {"127.0.0.1", Port}, reason := tcp_closed}),
?MSG(#{msg_type := connect_error, addr := {"127.0.0.1", Port}, reason := econnrefused}),

?MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", Port}}, 2500),
Expand Down Expand Up @@ -363,7 +363,7 @@ t_manual_failover_then_old_master_down(_) ->
?MSG(#{addr := {"127.0.0.1", Port},
master := true,
msg_type := socket_closed,
reason := {recv_exit, closed}}),
reason := tcp_closed}),

%% Ered prefers the replica of the disconnected node for slot map update,
%% since it is likely to know about a failover first; it is the new master.
Expand Down Expand Up @@ -420,7 +420,7 @@ t_blackhole(_) ->
ered:command_async(ClientRef, [<<"PING">>],
fun(Reply) -> TestPid ! {ping_reply, Reply} end),

?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}, master := true},
?MSG(#{msg_type := socket_closed, reason := timeout, master := true},
ResponseTimeout + 1000),
?MSG({ping_reply, {error, _Reason}}, % node_down or node_deactivated
NodeDownTimeout + 1000),
Expand All @@ -432,6 +432,8 @@ t_blackhole(_) ->
?OPTIONAL_MSG(#{msg_type := cluster_ok}),
?MSG(#{msg_type := client_stopped, reason := shutdown, master := false},
CloseWait + 1000),
?OPTIONAL_MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", Port}}),
no_more_msgs(),

ct:pal("Unpausing container: " ++ os:cmd("docker unpause " ++ Pod)),
timer:sleep(500),
Expand Down Expand Up @@ -477,7 +479,7 @@ t_blackhole_all_nodes(_) ->
fun(Reply) -> TestPid ! {ping_reply, Reply} end)
end, AddrToClient),

[?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}, addr := {"127.0.0.1", Port}},
[?MSG(#{msg_type := socket_closed, reason := timeout, addr := {"127.0.0.1", Port}},
ResponseTimeout + 1000) || Port <- ?PORTS],
?MSG({ping_reply, {error, _Reason1}}, NodeDownTimeout + 1000),
?MSG({ping_reply, {error, _Reason2}}, NodeDownTimeout + 1000),
Expand Down Expand Up @@ -526,11 +528,14 @@ t_init_timeout(_) ->
ct:pal("~p\n", [os:cmd("redis-cli -p 30001 CLIENT PAUSE 10000")]),
{ok, _P} = ered_cluster:connect([{localhost, 30001}], [{info_pid, [self()]}] ++ Opts),

?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}}, 3500),
?MSG(#{msg_type := socket_closed, reason := timeout}, 3500),
?MSG(#{msg_type := node_down_timeout, addr := {localhost, 30001}}, 2500),
%% Does not work on Redis before 6.2.0.
ct:pal("~p\n", [os:cmd("redis-cli -p 30001 CLIENT UNPAUSE")]),

%% Maybe we were waiting for init commands when the node down timeout fired.
?OPTIONAL_MSG(#{msg_type := init_error, reason := node_down}),

?MSG(#{msg_type := connected, addr := {localhost, 30001}}),

?MSG(#{msg_type := slot_map_updated}),
Expand Down Expand Up @@ -760,8 +765,8 @@ t_kill_client(_) ->
ct:pal("~p\n",[os:cmd("redis-cli -p " ++ integer_to_list(Port) ++ " CLIENT KILL TYPE NORMAL")]),
?MSG(#{msg_type := socket_closed, addr := {_, Port}}),

%% connection reestablished
?MSG(#{msg_type := connected, addr := {_, Port}}),
%% Waits until 1000ms after the first connect before reconnecting.
?MSG(#{msg_type := connected, addr := {_, Port}}, 2000),
no_more_msgs().

t_new_cluster_master(_) ->
Expand Down
8 changes: 5 additions & 3 deletions test/ered_cluster_tls_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@ t_expired_cert_tls_1_3(_) ->
[{info_pid, [self()]}, {client_opts, ClientOpts}]),

?MSG(#{msg_type := socket_closed, addr := {"127.0.0.1", 31001},
reason := {recv_exit,
{tls_alert,
{certificate_expired, _}}}}),
reason := {tls_alert, {certificate_expired, _}}}),
?MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", 31001}}, 2500),
timer:sleep(10), % Wait for the optional messages.
?OPTIONAL_MSG(#{msg_type := init_error, reason := node_down}),
?OPTIONAL_MSG(#{msg_type := socket_closed,
reason := {tls_alert, {certificate_expired, _}}}),
no_more_msgs().
Loading
Loading