Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,16 @@ options are wrapped in `{client_opts, [...]}` and included in cluster options.

Options passed to the connection module. See [Connection options](#connection-options) below.

* `{max_waiting, non_neg_integer()}`
* `{max_waiting, pos_integer()}`

Max number of commands allowed to wait in queue. Default 5000.

* `{max_pending, non_neg_integer()}`
* `{max_pending, pos_integer()}`

Max number of commands to be pending, i.e. sent to client
and waiting for a response. Default 128.

* `{queue_ok_level, non_neg_integer()}`
* `{queue_ok_level, pos_integer()}`

If the queue has been full then it is considered ok
again when it reaches this level. Default 2000.
Expand Down Expand Up @@ -331,7 +331,7 @@ wrapped in `{connection_opts, [ered_connection:opt()]}`.
For `ered_cluster:connect/2`, the connection options are included under client
options, as `{client_opts, [{connection_opts, [...]}]}`.

* `{batch_size, non_neg_integer()}`
* `{batch_size, pos_integer()}`

If commands are queued up in the process message queue, this is the maximum
number of messages that will be received and sent in one call. Default 16.
Expand Down
75 changes: 52 additions & 23 deletions src/ered_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
command/2, command/3,
command_async/3]).

%% testing/debugging
-export([state_to_map/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
Expand All @@ -36,7 +39,7 @@
port :: inet:port_number(),

%% From "connection opts"
batch_size = 16 :: non_neg_integer(),
batch_size = 16 :: pos_integer(),
transport = gen_tcp :: gen_tcp | ssl,
transport_opts = [] :: list(),
connect_timeout = infinity :: timeout(),
Expand All @@ -51,10 +54,10 @@

node_down_timeout = 2000 :: non_neg_integer(),
info_pid = none :: none | pid(),
queue_ok_level = 2000 :: non_neg_integer(),
queue_ok_level = 2000 :: pos_integer(),

max_waiting = 5000 :: non_neg_integer(),
max_pending = 128 :: non_neg_integer()
max_waiting = 5000 :: pos_integer(),
max_pending = 128 :: pos_integer()
}).

-record(st,
Expand All @@ -68,6 +71,11 @@
waiting = q_new() :: command_queue(),
pending = q_new() :: command_queue(),

%% Batching. When pending queue is full,
%% set we don't send more until another
%% complete batch can fit in the pending queue.
filling_batch = true :: boolean(),
Comment thread
zuiderkwast marked this conversation as resolved.

cluster_id = undefined :: undefined | binary(),

queue_full_event_sent = false :: boolean(), % set to true when full, false when reaching queue_ok_level
Expand Down Expand Up @@ -148,7 +156,7 @@
-type connection_opt() ::
%% If commands are queued up in the process message queue this is the max
%% amount of messages that will be received and sent in one call
{batch_size, non_neg_integer()} |
{batch_size, pos_integer()} |
%% Timeout passed to gen_tcp:connect/4 or ssl:connect/4.
{connect_timeout, timeout()} |
%% Options passed to gen_tcp:connect/4.
Expand Down Expand Up @@ -288,6 +296,13 @@ command_async(ServerRef, Command, CallbackFun) ->
gen_server:cast(ServerRef, #command{data = ered_command:convert_to(Command),
replyto = CallbackFun}).

%% Converts a state record to a map, for easier testing.
%% Used in tests, after calling sys:get_state(EredClientPid).
state_to_map(#st{} = State) ->
Fields = record_info(fields, st),
[st | Values] = tuple_to_list(State),
maps:from_list(lists:zip(Fields, Values)).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
Expand Down Expand Up @@ -639,7 +654,8 @@ abort_pending_commands(State) ->
PendingReqs = [Req#pending_req.command || Req <- q_to_list(State#st.pending)],
State#st{waiting = q_join(q_from_list(PendingReqs), State#st.waiting),
pending = q_new(),
parser_state = ered_parser:init()}.
parser_state = ered_parser:init(),
filling_batch = true}.

connection_down(Reason, State) ->
State1 = abort_pending_commands(State),
Expand All @@ -652,30 +668,43 @@ connection_down(Reason, State) ->
process_commands(State) ->
NumWaiting = q_len(State#st.waiting),
NumPending = q_len(State#st.pending),
BatchSize = State#st.opts#opts.batch_size,
LowWaterMark = max(State#st.opts#opts.max_pending - BatchSize, 1),

if
State#st.status =:= up, State#st.socket =/= none,
NumWaiting > 0, NumPending < State#st.opts#opts.max_pending ->
%% TODO: Pop multiple from queue and send them in a batch. Use the batch_size option.
%% Use q_split, q_join and q_to_list.
NumWaiting > 0, State#st.filling_batch ->
%% TODO: Add request timeout timestamp to PendingReq.
{Command, NewWaiting} = q_out(State#st.waiting),
RespCommand = Command#command.data,
Data = ered_command:get_data(RespCommand),
Class = ered_command:get_response_class(RespCommand),
{CommandQueue, NewWaiting} = q_split(min(BatchSize, NumWaiting), State#st.waiting),
{BatchedData, PendingRequests} =
lists:foldr(fun(Command, {DataAcc, PendingAcc}) ->
RespCommand = Command#command.data,
ResponseClass = ered_command:get_response_class(RespCommand),

NewBatchedData = ered_command:get_data(RespCommand),
NewPendingRequest = #pending_req{command = Command,
response_class = ResponseClass},
{[NewBatchedData | DataAcc] , q_in_r(NewPendingRequest, PendingAcc)}
end,
{[], q_new()},
q_to_list(CommandQueue)),
Transport = State#st.opts#opts.transport,
case Transport:send(State#st.socket, Data) of
case Transport:send(State#st.socket, BatchedData) of
ok ->
PendingReq = #pending_req{command = Command,
response_class = Class},
State1 = State#st{pending = q_in(PendingReq, State#st.pending),
waiting = NewWaiting},
process_commands(State1);
NewPending = q_join(State#st.pending, PendingRequests),
NewState = State#st{waiting = NewWaiting,
pending = NewPending,
filling_batch = q_len(NewPending) < State#st.opts#opts.max_pending},
process_commands(NewState);
{error, _Reason} ->
%% Send FIN and handle replies in fligh before reconnecting.
Transport:shutdown(State#st.socket, read_write),
start_connect_loop(now, State#st{status = init})
end;

not State#st.filling_batch, NumPending < LowWaterMark ->
process_commands(State#st{filling_batch = true});

NumWaiting > State#st.opts#opts.max_waiting, State#st.queue_full_event_sent ->
drop_commands(State);

Expand Down Expand Up @@ -705,7 +734,7 @@ start_connect_loop(When0, State) ->
When0
end,
ConnectPid = spawn_link(fun () -> connect_loop(When, Self, State#st.opts) end),
State#st{connection_loop_pid = ConnectPid}.
State#st{connection_loop_pid = ConnectPid}.

drop_commands(State) ->
case q_len(State#st.waiting) > State#st.opts#opts.max_waiting of
Expand Down Expand Up @@ -736,9 +765,9 @@ q_out({Size, Q}) ->
{{value, Val}, NewQ} -> {Val, {Size-1, NewQ}}
end.

%% q_split(N, {Size, Q}) when N =< Size ->
%% {A, B} = queue:split(N, Q),
%% {{N, A}, {Size - N, B}}.
q_split(N, {Size, Q}) when N =< Size ->
{A, B} = queue:split(N, Q),
{{N, A}, {Size - N, B}}.

q_to_list({_Size, Q}) ->
queue:to_list(Q).
Expand Down
73 changes: 61 additions & 12 deletions test/ered_client_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ run_test_() ->
{spawn, fun server_close_socket_t/0},
{spawn, fun bad_request_t/0},
{spawn, fun server_buffer_full_t/0},
{spawn, fun low_high_watermark_t/0},
{spawn, fun bad_option_t/0},
{spawn, fun bad_connection_option_t/0},
{spawn, fun server_buffer_full_reconnect_t/0},
Expand Down Expand Up @@ -118,7 +119,7 @@ server_buffer_full_t() ->
Expected = iolist_to_binary(lists:duplicate(5, Ping)),
{ok, Expected} = gen_tcp:recv(Sock, size(Expected)),
%% should be nothing more since only 5 pending
{error, timeout} = gen_tcp:recv(Sock, 0, 0),
?assertEqual({error, timeout}, gen_tcp:recv(Sock, 0, 0)),

timer:sleep(500),

Expand All @@ -138,29 +139,78 @@ server_buffer_full_t() ->
Pid = self(),
[ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)],
receive #{msg_type := queue_full} -> ok end,
{6, {error, queue_overflow}} = get_msg(),
?assertMatch({6, {error, queue_overflow}}, get_msg()),
receive #{msg_type := queue_ok} -> ok end,
[{N, {ok, <<"pong">>}} = get_msg()|| N <- [1,2,3,4,5,7,8,9,10,11]],
[?assertMatch({N, {ok, <<"pong">>}}, get_msg()) || N <- [1,2,3,4,5,7,8,9,10,11]],
no_more_msgs().

low_high_watermark_t() ->
{ok, ListenSock} = gen_tcp:listen(0, [binary, {active, false}]),
{ok, Port} = inet:port(ListenSock),
ServerPid = spawn_link(fun() ->
{ok, Sock} = gen_tcp:accept(ListenSock),
Ping = <<"*1\r\n$4\r\nping\r\n">>,
FivePing = iolist_to_binary(lists:duplicate(5, Ping)),
{ok, FivePing} = gen_tcp:recv(Sock, size(FivePing)),

%% should be nothing more since only 5 pending
?assertEqual({error, timeout}, gen_tcp:recv(Sock, 0, 0)),

gen_tcp:send(Sock, lists:duplicate(4, <<"+pong\r\n">>)),
receive send_one_more_pong -> ok end,
gen_tcp:send(Sock, lists:duplicate(1, <<"+pong\r\n">>)),

{ok, FivePing} = gen_tcp:recv(Sock, 0),
gen_tcp:send(Sock, lists:duplicate(5, <<"+pong\r\n">>)),
receive ok -> ok end
Comment thread
zuiderkwast marked this conversation as resolved.
end),
Client = start_client(Port, [{connection_opts, [{batch_size,5}]}, {max_waiting, 10}, {max_pending, 5}, {queue_ok_level,1}]),
expect_connection_up(Client),

Pid = self(),
[ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,10)],
[?assertEqual({N, {ok, <<"pong">>}}, get_msg()) || N <- [1,2,3,4]],

%% high water mark is hit, and we can not fill more until we reach low water-mark.
?assertMatch(#{pending := {1, _},
waiting := {5, _},
filling_batch := false},
ered_client:state_to_map(sys:get_state(Client))),

ServerPid ! send_one_more_pong,
[?assertEqual({N, {ok, <<"pong">>}}, get_msg()) || N <- [5]],

%% low water mark reached, pending should now be filled.
?assertMatch(#{pending := {5, _},
waiting := {0, _},
filling_batch := false},
ered_client:state_to_map(sys:get_state(Client))),

[?assertEqual({N, {ok, <<"pong">>}}, get_msg()) || N <- [6,7,8,9,10]],

?assertMatch(#{pending := {0, _},
waiting := {0, _},
filling_batch := true},
ered_client:state_to_map(sys:get_state(Client))),

no_more_msgs().
Comment thread
zuiderkwast marked this conversation as resolved.


server_buffer_full_reconnect_t() ->
{ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]),
{ok, Port} = inet:port(ListenSock),
spawn_link(fun() ->
{ok, Sock} = gen_tcp:accept(ListenSock),
%% expect 5 ping
Ping = <<"*1\r\n$4\r\nping\r\n">>,
Expected = iolist_to_binary(lists:duplicate(5, Ping)),
{ok, Expected} = gen_tcp:recv(Sock, size(Expected)),
FivePing = iolist_to_binary(lists:duplicate(5, Ping)),
{ok, FivePing} = gen_tcp:recv(Sock, size(FivePing)),
%% should be nothing more since only 5 pending
{error, timeout} = gen_tcp:recv(Sock, 0, 0),

gen_tcp:close(Sock),

{ok, Sock2} = gen_tcp:accept(ListenSock),
{ok, Expected} = gen_tcp:recv(Sock2, size(Expected)),
{ok, FivePing} = gen_tcp:recv(Sock2, size(FivePing)),

gen_tcp:send(Sock2, lists:duplicate(5, <<"+pong\r\n">>)),
%% should be nothing more since only 5 pending
Expand Down Expand Up @@ -192,10 +242,9 @@ server_buffer_full_node_goes_down_t() ->
{ok, Port} = inet:port(ListenSock),
spawn_link(fun() ->
{ok, Sock} = gen_tcp:accept(ListenSock),
%% expect 5 ping
Ping = <<"*1\r\n$4\r\nping\r\n">>,
Expected = iolist_to_binary(lists:duplicate(5, Ping)),
{ok, Expected} = gen_tcp:recv(Sock, size(Expected)),
FivePing = iolist_to_binary(lists:duplicate(5, Ping)),
{ok, FivePing} = gen_tcp:recv(Sock, size(FivePing)),
%% should be nothing more since only 5 pending
{error, timeout} = gen_tcp:recv(Sock, 0, 0),
gen_tcp:close(ListenSock)
Expand All @@ -206,9 +255,9 @@ server_buffer_full_node_goes_down_t() ->
Pid = self(),
[ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)],
receive #{msg_type := queue_full} -> ok end,
{6, {error, queue_overflow}} = get_msg(),
?assertEqual({6, {error, queue_overflow}}, get_msg()),
receive #{msg_type := socket_closed, reason := tcp_closed} -> ok end,
[{N, {error, queue_overflow}} = get_msg() || N <- [1,2,3,4,5]],
[?assertEqual({N, {error, queue_overflow}}, get_msg()) || N <- [1,2,3,4,5]],
receive #{msg_type := queue_ok} -> ok end,
receive #{msg_type := connect_error, reason := econnrefused} -> ok end,
receive #{msg_type := node_down_timeout} -> ok end,
Expand Down
Loading