diff --git a/src/ered_client.erl b/src/ered_client.erl index cfdce31..04c6cba 100644 --- a/src/ered_client.erl +++ b/src/ered_client.erl @@ -34,7 +34,15 @@ { host :: host(), port :: inet:port_number(), - connection_opts = [] :: [ered_connection:opt()], + + %% From "connection opts" + batch_size = 16 :: non_neg_integer(), + transport = gen_tcp :: gen_tcp | ssl, + transport_opts = [] :: list(), + connect_timeout = infinity :: timeout(), + push_cb = fun(_) -> ok end :: push_cb(), + timeout = 10000 :: timeout(), % Response timeout + resp_version = 3 :: 2..3, use_cluster_id = false :: boolean(), auth = none :: {binary(), binary()} | none, @@ -51,30 +59,33 @@ -record(st, { - connect_loop_pid = none, - connection_pid = none, + connection_loop_pid = none, + socket = none, controlling_process :: pid(), last_status = none, + parser_state :: ered_parser:state(), waiting = q_new() :: command_queue(), pending = q_new() :: command_queue(), + allow_new_pending_request = true :: boolean(), cluster_id = undefined :: undefined | binary(), queue_full_event_sent = false :: boolean(), % set to true when full, false when reaching queue_ok_level - node_status = up :: up | node_down | node_deactivated, - + status :: init | up | node_down | node_deactivated, node_down_timer = none :: none | reference(), + connected_at = none :: none | integer(), % erlang:monotonic_time(millisecond) opts = #opts{} }). - -type command_error() :: queue_overflow | node_down | node_deactivated | {client_stopped, reason()}. -type command_item() :: {command, ered_command:redis_command(), reply_fun()}. -type command_queue() :: {Size :: non_neg_integer(), queue:queue(command_item())}. --type reply() :: {ok, ered_connection:result()} | {error, command_error()}. +-type result() :: ered_parser:parse_result(). +-type push_cb() :: fun((result()) -> any()). +-type reply() :: {ok, result()} | {error, command_error()}. -type reply_fun() :: fun((reply()) -> any()). -type host() :: ered:host(). @@ -106,7 +117,7 @@ -type opt() :: %% Options passed to the connection module - {connection_opts, [ered_connection:opt()]} | + {connection_opts, [connection_opt()]} | %% Max number of commands allowed to wait in queue. {max_waiting, non_neg_integer()} | %% Max number of commands to be pending, i.e. sent to client @@ -135,6 +146,61 @@ %% The SELECT command is only sent when non-zero. {select_db, non_neg_integer()}. +-type connection_opt() :: + %% If commands are queued up in the process message queue this is the max + %% amount of messages that will be received and sent in one call + {batch_size, non_neg_integer()} | + %% Timeout passed to gen_tcp:connect/4 or ssl:connect/4. + {connect_timeout, timeout()} | + %% Options passed to gen_tcp:connect/4. + {tcp_options, [gen_tcp:connect_option()]} | + %% Timeout passed to gen_tcp:connect/4. DEPRECATED. + {tcp_connect_timeout, timeout()} | + %% Options passed to ssl:connect/4. If this config parameter is present, + %% TLS is used. + {tls_options, [ssl:tls_client_option()]} | + %% Timeout passed to ssl:connect/4. DEPRECATED. + {tls_connect_timeout, timeout()} | + %% Callback for push notifications + {push_cb, push_cb()} | + %% Timeout when waiting for a response from Redis %% If commands are queued up in the process message queue this is the max + %% amount of messages that will be received and sent in one call + {batch_size, non_neg_integer()} | + %% Timeout passed to gen_tcp:connect/4 or ssl:connect/4. + {connect_timeout, timeout()} | + %% Options passed to gen_tcp:connect/4. + {tcp_options, [gen_tcp:connect_option()]} | + %% Timeout passed to gen_tcp:connect/4. DEPRECATED. + {tcp_connect_timeout, timeout()} | + %% Options passed to ssl:connect/4. If this config parameter is present, + %% TLS is used. + {tls_options, [ssl:tls_client_option()]} | + %% Timeout passed to ssl:connect/4. DEPRECATED. + {tls_connect_timeout, timeout()} | + %% Callback for push notifications + {push_cb, push_cb()} | + %% Timeout when waiting for a response from Redis. milliseconds + {response_timeout, non_neg_integer()}. + +%% Command in the waiting queue. +-record(command, {data, replyto}). + +%% Pending request, in flight, sent to server, waiting for reply/ies. +-record(pending_req, + { + command :: #command{}, + response_class :: ered_command:response_class() | + [ered_command:response_class()], + reply_acc = [] + }). + +%% Queue macro, can be used in guards. +-define(q_is_empty(Q), (element(1, Q) =:= 0)). + +%% Commands like SUBSCRIBE and UNSUBSCRIBE don't return anything, so we use this +%% return value. +-define(pubsub_reply, undefined). + %%%=================================================================== %%% API %%%=================================================================== @@ -220,16 +286,18 @@ command(ServerRef, Command, Timeout) -> %% client process and should not hang or perform any lengthy task. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command_async(ServerRef, Command, CallbackFun) -> - gen_server:cast(ServerRef, {command, ered_command:convert_to(Command), CallbackFun}). + gen_server:cast(ServerRef, #command{data = ered_command:convert_to(Command), + replyto = CallbackFun}). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== init({Host, Port, OptsList, User}) -> Opts = lists:foldl( - fun({connection_opts, Val}, S) -> S#opts{connection_opts = Val}; + fun({connection_opts, Val}, S) -> handle_connection_opts(S, Val); ({max_waiting, Val}, S) -> S#opts{max_waiting = Val}; ({max_pending, Val}, S) -> S#opts{max_pending = Val}; + ({batch_size, Val}, S) -> S#opts{batch_size = Val}; ({queue_ok_level, Val}, S) -> S#opts{queue_ok_level = Val}; ({reconnect_wait, Val}, S) -> S#opts{reconnect_wait = Val}; ({info_pid, Val}, S) -> S#opts{info_pid = Val}; @@ -244,21 +312,53 @@ init({Host, Port, OptsList, User}) -> OptsList), monitor(process, User), process_flag(trap_exit, true), - Pid = self(), - ConnectPid = spawn_link(fun() -> connect(Pid, Opts) end), - {ok, start_node_down_timer(#st{opts = Opts, - controlling_process = User, - connect_loop_pid = ConnectPid})}. + State0 = #st{opts = Opts, + controlling_process = User, + parser_state = ered_parser:init(), + status = init}, + State1 = start_connect_loop(now, State0), + {ok, start_node_down_timer(State1)}. + +%% "Connection opts" is second layer of options. +%% +%% TODO: Remove this layering and put them directly as client options. It's an +%% API change so we'll do it in an appropriate version. +handle_connection_opts(OptsRecord, Opts) -> + Valid = [batch_size, tcp_options, tls_options, push_cb, response_timeout, + tcp_connect_timeout, tls_connect_timeout, connect_timeout], + [error({badarg, BadOpt}) + || BadOpt <- proplists:get_keys(Opts) -- Valid], + BatchSize = proplists:get_value(batch_size, Opts, 16), + ResponseTimeout = proplists:get_value(response_timeout, Opts, 10000), + PushCb = proplists:get_value(push_cb, Opts, fun(_) -> ok end), + TcpOptions = proplists:get_value(tcp_options, Opts, []), + TlsOptions = proplists:get_value(tls_options, Opts, []), + TcpTimeout = proplists:get_value(tcp_connect_timeout, Opts, infinity), + TlsTimeout = proplists:get_value(tls_connect_timeout, Opts, infinity), + {Transport, Options, Timeout0} = case TlsOptions of + [] -> + {gen_tcp, TcpOptions, TcpTimeout}; + _ -> + {ssl, TlsOptions, TlsTimeout} + end, + ConnectTimeout = proplists:get_value(connect_timeout, Opts, Timeout0), + OptsRecord#opts{batch_size = BatchSize, + transport = Transport, transport_opts = Options, + connect_timeout = ConnectTimeout, + timeout = ResponseTimeout, + push_cb = PushCb}. handle_call({command, Command}, From, State) -> Fun = fun(Reply) -> gen_server:reply(From, Reply) end, - handle_cast({command, Command, Fun}, State). + handle_cast(#command{data = Command, replyto = Fun}, State). -handle_cast(Command = {command, _, _}, State) -> - case State#st.node_status of - up -> - {noreply, process_commands(State#st{waiting = q_in(Command, State#st.waiting)})}; +handle_cast(Command = #command{}, State) -> + case State#st.status of + Up when Up =:= up; Up =:= init -> + State1 = State#st{waiting = q_in(Command, State#st.waiting)}, + State2 = process_commands(State1), + {noreply, State2, response_timeout(State2)}; NodeProblem when NodeProblem =:= node_down; NodeProblem =:= node_deactivated -> reply_command(Command, {error, NodeProblem}), {noreply, State} @@ -268,66 +368,123 @@ handle_cast(deactivate, State) -> State1 = cancel_node_down_timer(State), State2 = report_connection_status(node_deactivated, State1), State3 = reply_all({error, node_deactivated}, State2), - {noreply, process_commands(State3#st{node_status = node_deactivated})}; + {noreply, process_commands(State3#st{status = node_deactivated})}; -handle_cast(reactivate, #st{connection_pid = none} = State) -> +handle_cast(reactivate, #st{socket = none} = State) -> {noreply, start_node_down_timer(State)}; handle_cast(reactivate, State) -> - {noreply, State#st{node_status = up}}. - + {noreply, State#st{status = up}}. -handle_info({{command_reply, Pid}, Reply}, State = #st{pending = Pending, connection_pid = Pid}) -> - case q_out(Pending) of - empty -> - {noreply, State}; - {Command, NewPending} -> - reply_command(Command, {ok, Reply}), - {noreply, process_commands(State#st{pending = NewPending})} +handle_info({Type, Socket, Data}, #st{socket = Socket} = State) + when Type =:= tcp; Type =:= ssl -> + %% Receive data from current socket. + State1 = handle_data(Data, State), + State2 = process_commands(State1), + {noreply, State2, response_timeout(State2)}; + +handle_info({Passive, Socket}, #st{socket = Socket} = State) + when Passive =:= tcp_passive; Passive =:= ssl_passive -> + %% Socket switched to passive mode due to {active, N}. + %% TODO: Add config for N. + N = 100, + case setopts(State, [{active, N}]) of + ok -> + {noreply, State, response_timeout(State)}; + {error, Reason} -> + Transport = State#st.opts#opts.transport, + Transport:close(Socket), + {noreply, connection_down({socket_closed, Reason}, State#st{socket = undefined})} end; -handle_info({{command_reply, _Pid}, _Reply}, State) -> - %% Stray message from a defunct client? ignore! - {noreply, State}; - -handle_info(Reason = {connect_error, _ErrorReason}, State) -> - {noreply, connection_down(Reason, State)}; - -handle_info(Reason = {init_error, _Errors}, State) -> - {noreply, connection_down(Reason, State)}; - -handle_info(Reason = {socket_closed, _CloseReason}, State) -> - {noreply, connection_down(Reason, State)}; - -handle_info({connected, Pid, ClusterId}, State) -> - State1 = cancel_node_down_timer(State), - State2 = State1#st{connection_pid = Pid, cluster_id = ClusterId, - node_status = case State1#st.node_status of - node_down -> up; - OldStatus -> OldStatus - end}, - State3 = report_connection_status(connection_up, State2), - {noreply, process_commands(State3)}; +handle_info({Error, Socket, Reason}, #st{socket = Socket} = State) + when Error =:= tcp_error; Error =:= ssl_error -> + %% Socket errors. If the network or peer is down, the error is not + %% always followed by a tcp_closed. + %% + %% TLS 1.3: Called after a connect when the client certificate has expired + Transport = State#st.opts#opts.transport, + Transport:close(Socket), + {noreply, connection_down({socket_closed, Reason}, State#st{socket = none})}; + +handle_info({Closed, Socket}, #st{socket = Socket} = State) + when Closed =:= tcp_closed; Closed =:= ssl_closed -> + %% Socket got closed by the server. + {noreply, connection_down({socket_closed, Closed}, State#st{socket = none})}; + +handle_info(ConnectError = {connect_error, _Reason}, State) -> + %% Message from the connect loop process. It will retry. + {noreply, connection_down(ConnectError, State)}; + +handle_info({connected, Socket}, State) -> + %% Sent from connect loop process when just before it exits. + State1 = abort_pending_commands(State), + State2 = State1#st{socket = Socket, + connected_at = erlang:monotonic_time(millisecond), + status = init}, + State3 = init_connection(State2), + {noreply, State3, response_timeout(State3)}; + +handle_info({init_command_reply, {ok, Replies}}, State) -> + case [Reason || {error, Reason} <- Replies] of + [] -> + %% No errors + ClusterId = case State#st.opts#opts.use_cluster_id of + true -> + hd(Replies); + false -> + undefined + end, + State1 = cancel_node_down_timer(State), + NodeStatus = case State1#st.status of + node_down -> up; + init -> up; + OldStatus -> OldStatus + end, + State2 = State1#st{status = NodeStatus, cluster_id = ClusterId}, + State3 = report_connection_status(connection_up, State2), + {noreply, process_commands(State3), response_timeout(State3)}; + Errors -> + {noreply, connection_down({init_error, Errors}, State)} + end; +handle_info({init_command_reply, {error, Reason}}, State) -> + {noreply, connection_down({init_error, Reason}, State)}; handle_info({timeout, TimerRef, node_down}, State) when TimerRef == State#st.node_down_timer -> + %% Node down timeout State1 = report_connection_status({connection_down, node_down_timeout}, State), State2 = reply_all({error, node_down}, State1), - {noreply, process_commands(State2#st{node_status = node_down})}; + {noreply, process_commands(State2#st{status = node_down})}; -handle_info({timeout, _TimerRef, _Msg}, State) -> - {noreply, State}; +handle_info(timeout, #st{socket = Socket} = State) when Socket =/= none -> + %% Request timeout + Transport = State#st.opts#opts.transport, + Transport:close(Socket), + {noreply, connection_down({socket_closed, timeout}, State#st{socket = none})}; handle_info({'DOWN', _Mon, process, Pid, ExitReason}, State = #st{controlling_process = Pid}) -> {stop, ExitReason, State}; +handle_info({'EXIT', Pid, normal}, #st{connection_loop_pid = Pid} = State) -> + State1 = State#st{connection_loop_pid = none}, + State2 = case State1#st.socket of + none -> + %% Corner case. The new connection was lost before this + %% exit signal arrived. Start reconnect loop again. + start_connect_loop(now, State1); + _Socket -> + State1 + end, + {noreply, State2, response_timeout(State2)}; + handle_info({'EXIT', _From, Reason}, State) -> + %% Supervisor exited. {stop, Reason, State}; handle_info(_Ignore, State) -> - {noreply, State}. + {noreply, State, response_timeout(State)}. terminate(Reason, State) -> - exit(State#st.connect_loop_pid, kill), reply_all({error, {client_stopped, Reason}}, State), report_connection_status({connection_down, {client_stopped, Reason}}, State), ok. @@ -338,8 +495,128 @@ code_change(_OldVsn, State = #st{opts = #opts{}}, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +setopts(#st{opts = #opts{transport = gen_tcp}, socket = Socket}, Opts) -> + inet:setopts(Socket, Opts); +setopts(#st{opts = #opts{transport = ssl}, socket = Socket}, Opts) -> + ssl:setopts(Socket, Opts). + +%% Data received from the server +handle_data(Data, #st{parser_state = ParserState} = State) -> + handle_parser_result(ered_parser:continue(Data, ParserState), State). + +handle_parser_result({need_more, _BytesNeeded, ParserState}, State) -> + State#st{parser_state = ParserState}; +handle_parser_result({done, Value, ParserState}, State0) -> + State1 = handle_result(Value, State0), + handle_parser_result(ered_parser:next(ParserState), State1); +handle_parser_result({parse_error, Reason}, State) -> + Transport = State#st.opts#opts.transport, + Transport:close(State#st.socket), + connection_down({socket_closed, {parse_error, Reason}}, State#st{socket = none}). + +handle_result({push, Value = [Type|_]}, State) -> + %% Pub/sub in RESP3 is a bit quirky. The push is supposed to be out of band + %% data not connected to any request but for subscribe and unsubscribe + %% requests, a successful command is signalled as one or more push messages. + PushCB = State#st.opts#opts.push_cb, + PushCB(Value), + State1 = case is_subscribe_push(Type) of + true -> + handle_subscribe_push(Value, State); + false -> + State + end, + State1; +handle_result(Value, #st{pending = PendingQueue} = State) + when not ?q_is_empty(PendingQueue) -> + {PendingReq, PendingQueue1} = q_out(PendingQueue), + #pending_req{command = Command, + response_class = RespClass, + reply_acc = Acc} = PendingReq, + %% Check how many replies expected (list = pipeline) + case RespClass of + Single when not is_list(Single) -> + reply_command(Command, {ok, Value}), + State#st{pending = PendingQueue1}; + [_] -> + %% Last one, send the reply + reply_command(Command, {ok, lists:reverse([Value | Acc])}), + State#st{pending = PendingQueue1}; + [_ | TailClasses] -> + %% Need more replies. Save the reply and keep going. + PendingReq1 = PendingReq#pending_req{response_class = TailClasses, + reply_acc = [Value | Acc]}, + PendingQueue2 = q_in_r(PendingReq1, PendingQueue1), + State#st{pending = PendingQueue2} + end; +handle_result(_Value, #st{pending = PendingQueue}) + when ?q_is_empty(PendingQueue) -> + error(unexpected_reply). + +is_subscribe_push(<<"subscribe">>) -> + true; +is_subscribe_push(<>) when X >= $a, X =< $z -> + true; +is_subscribe_push(<<"unsubscribe">>) -> + true; +is_subscribe_push(<>) when X >= $a, X =< $z -> + true; +is_subscribe_push(_) -> + false. + +handle_subscribe_push(PushMessage, #st{pending = PendingQueue} = State) -> + case q_out(PendingQueue) of + {PendingReq, PendingQueue1} -> + State1 = State#st{pending = PendingQueue1}, + handle_subscribed_popped_pending(PushMessage, PendingReq, State1); + empty -> + %% No commands pending. It's may be a server initiated unsubscribe. + State + end. + +handle_subscribed_popped_pending(Push, + #pending_req{command = Command, + response_class = ExpectClass, + reply_acc = Acc} = Req, + State) -> + case {ExpectClass, hd(Push)} of + {{Type, N}, Type} % simple command + when N =:= 0; % unsubscribing from all channels + N =:= 1 -> % or subscribed to all channels + reply_command(Command, {ok, ?pubsub_reply}), + State; + {{Type, N}, Type} % simple command + when N > 1 -> % not yet subscribed all channels + Req1 = Req#pending_req{response_class = {Type, N - 1}}, + pending_in_r(Req1, State); + {[{Type, N}], Type} % last command in pipeline + when N =:= 0; % unsubscribing from all channels + N =:= 1 -> % or subscribed to all channels + reply_command(Command, {ok, lists:reverse([?pubsub_reply | Acc])}), + State; + {[{Type, N} | Classes], Type} % pipeline, not the last command + when N =:= 0; % unsubscribing from all channels + N =:= 1 -> % or subscribed to all channels + Req1 = Req#pending_req{response_class = Classes, + reply_acc = [?pubsub_reply | Acc]}, + pending_in_r(Req1, State); + {[{Type, N} | Classes], Type} % pipeline + when N > 1 -> % not yet subscribed all channels + Req1 = Req#pending_req{response_class = [{Type, N - 1} | Classes]}, + pending_in_r(Req1, State); + _Otherwise -> + %% Not expecting this particular push message for Req. Put it back in queue. + pending_in_r(Req, State) + end. + +%% Add in the front of the pending queue (like queue:in_r). +pending_in_r(ReplyInfo, #st{pending = Pending0} = State) -> + Pending1 = q_in_r(ReplyInfo, Pending0), + State#st{pending = Pending1}. + reply_all(Reply, State) -> - [reply_command(Command, Reply) || Command <- q_to_list(State#st.pending)], + [reply_command(Req#pending_req.command, Reply) || Req <- q_to_list(State#st.pending)], [reply_command(Command, Reply) || Command <- q_to_list(State#st.waiting)], State#st{waiting = q_new(), pending = q_new()}. @@ -355,41 +632,97 @@ cancel_node_down_timer(#st{node_down_timer = TimerRef} = State) -> erlang:cancel_timer(TimerRef), State#st{node_down_timer = none}. +%% Move pending commands back to the waiting queue. Discard partial replies. +%% +%% This is risky behavior. If some of the commands sent are not idempotent, we +%% can't just reconnect and send them again. We may just want to return an error +%% instead. +abort_pending_commands(State) -> + PendingReqs = [Req#pending_req.command || Req <- q_to_list(State#st.pending)], + State#st{waiting = q_in_r(PendingReqs, State#st.waiting), + pending = q_new(), + parser_state = ered_parser:init(), + allow_new_pending_request = true}. + connection_down(Reason, State) -> - State1 = State#st{waiting = q_join(State#st.pending, State#st.waiting), - pending = q_new(), - connection_pid = none}, + State1 = abort_pending_commands(State), State2 = process_commands(State1), State3 = report_connection_status({connection_down, Reason}, State2), - start_node_down_timer(State3). + State4 = start_connect_loop(now, State3), + start_node_down_timer(State4). - -%%%%%% +-spec process_commands(#st{}) -> #st{}. process_commands(State) -> NumWaiting = q_len(State#st.waiting), NumPending = q_len(State#st.pending), + BatchSize = State#st.opts#opts.batch_size, + + PendingLimit = max(State#st.opts#opts.max_pending - BatchSize, 1), if - (NumWaiting > 0) and (NumPending < State#st.opts#opts.max_pending) and (State#st.connection_pid /= none) -> - {Command, NewWaiting} = q_out(State#st.waiting), - Data = get_command_payload(Command), - ered_connection:command_async(State#st.connection_pid, Data, {command_reply, State#st.connection_pid}), - process_commands(State#st{pending = q_in(Command, State#st.pending), - waiting = NewWaiting}); - - (NumWaiting > State#st.opts#opts.max_waiting) and (State#st.queue_full_event_sent) -> + not State#st.allow_new_pending_request, NumPending < PendingLimit -> + process_commands(State#st{allow_new_pending_request=true}); + + State#st.status =:= up, State#st.socket =/= none, + NumWaiting > 0, State#st.allow_new_pending_request -> + %% TODO: Add request timeout timestamp to PendingReq. + {Commands, NewWaiting} = q_multi_out(BatchSize, State#st.waiting), + {BatchedData, PendingRequests} = + lists:foldr(fun(Command, {B, P}) -> + RespCommand = Command#command.data, + ResponseClass = ered_command:get_response_class(RespCommand), + + NewBatchedData = ered_command:get_data(RespCommand), + NewPendingRequest = #pending_req{command = Command, + response_class = ResponseClass}, + {[NewBatchedData | B], [NewPendingRequest | P]} + end, + {[],[]}, + Commands), + Transport = State#st.opts#opts.transport, + case Transport:send(State#st.socket, BatchedData) of + ok -> + NewPending = q_in(PendingRequests, State#st.pending), + NewState = State#st{waiting = NewWaiting, + pending = NewPending, + allow_new_pending_request = q_len(NewPending) < State#st.opts#opts.max_pending}, + process_commands(NewState); + {error, _Reason} -> + %% Send FIN and handle replies in fligh before reconnecting. + Transport:shutdown(State#st.socket, read_write), + start_connect_loop(now, State#st{status = init}) + end; + + NumWaiting > State#st.opts#opts.max_waiting, State#st.queue_full_event_sent -> drop_commands(State); NumWaiting > State#st.opts#opts.max_waiting -> drop_commands( report_connection_status(queue_full, State#st{queue_full_event_sent = true})); - (NumWaiting < State#st.opts#opts.queue_ok_level) and (State#st.queue_full_event_sent) -> + NumWaiting < State#st.opts#opts.queue_ok_level, State#st.queue_full_event_sent -> report_connection_status(queue_ok, State#st{queue_full_event_sent = false}); true -> State end. +start_connect_loop(_When, State) when is_pid(State#st.connection_loop_pid) -> + State; +start_connect_loop(When0, State) -> + Self = self(), + Now = erlang:monotonic_time(millisecond), + ConnectedAt = State#st.connected_at, + %% Don't reconnect immediately if the last connect was too recently. + When = if + is_integer(ConnectedAt), + Now - ConnectedAt < State#st.opts#opts.reconnect_wait -> + wait; + true -> + When0 + end, + ConnectPid = spawn_link(fun () -> connect_loop(When, Self, State#st.opts) end), + State#st{connection_loop_pid = ConnectPid}. + drop_commands(State) -> case q_len(State#st.waiting) > State#st.opts#opts.max_waiting of true -> @@ -404,11 +737,18 @@ drop_commands(State) -> q_new() -> {0, queue:new()}. +q_in(Items, {Size, Q1}) when is_list(Items) -> + Q2 = queue:from_list(Items), + {Size + queue:len(Q2), queue:join(Q1, Q2)}; q_in(Item, {Size, Q}) -> {Size+1, queue:in(Item, Q)}. -q_join({Size1, Q1}, {Size2, Q2}) -> - {Size1 + Size2, queue:join(Q1, Q2)}. + +q_in_r(Items, {Size, Q1}) when is_list(Items) -> + Q2 = queue:from_list(Items), + {Size + queue:len(Q2), queue:join(Q2, Q1)}; +q_in_r(Item, {Size, Q}) -> + {Size + 1, queue:in_r(Item, Q)}. q_out({Size, Q}) -> case queue:out(Q) of @@ -416,22 +756,40 @@ q_out({Size, Q}) -> {{value, Val}, NewQ} -> {Val, {Size-1, NewQ}} end. +q_multi_out(N, {Size, Q}) when N =< Size -> + {Out, Rest} = queue:split(N, Q), + {queue:to_list(Out),{Size - N, Rest}}; +q_multi_out(_, {_, Q}) -> + {queue:to_list(Q), {0,queue:new()}}. + q_to_list({_Size, Q}) -> queue:to_list(Q). q_len({Size, _Q}) -> Size. +response_timeout(State) when not ?q_is_empty(State#st.pending) -> + %% FIXME: Store req timeout in each pending item + State#st.opts#opts.timeout; +response_timeout(_State) -> + infinity. -reply_command({command, _, Fun}, Reply) -> +reply_command(#command{replyto = Fun} = _Command, Reply) -> Fun(Reply). -get_command_payload({command, Command, _Fun}) -> - Command. - -spec report_connection_status(status(), #st{}) -> #st{}. report_connection_status(Status, State = #st{last_status = Status}) -> State; +report_connection_status({connection_down, {init_error, node_down}}, + #st{last_status = {connection_down, _}} = State) -> + %% Silence additional init error cased by connection down. The lost + %% connection was already reported in another status message. + State; +report_connection_status({connection_down, {init_error, InitReason}}, + #st{last_status = node_deactivated} = State) + when InitReason =:= node_deactivated; InitReason =:= node_down -> + %% Silence additional init error when node is deactivated. + State; report_connection_status(Status, State) -> send_info(Status, State), case Status of @@ -474,32 +832,36 @@ send_info(Status, #st{opts = #opts{info_pid = Pid, send_info(_Msg, _State) -> ok. - -connect(Pid, Opts) -> - Result = ered_connection:connect(Opts#opts.host, Opts#opts.port, Opts#opts.connection_opts), - case Result of +%% Connect-wait-retry loop, to run in a separate spawned process. When +%% connected, transfers the socket to the OwnerPid, sends a message `{connected, +%% Socket}` and exits. On connect error, a message `{connect_error, Reason}` is +%% sent and connecting is retried periodically. +connect_loop(now, OwnerPid, + #opts{host = Host, port = Port, transport = Transport, + transport_opts = TransportOpts0, + connect_timeout = Timeout} = Opts) -> + TransportOpts = [{active, 100}, binary] ++ TransportOpts0, + case Transport:connect(Host, Port, TransportOpts, Timeout) of + {ok, Socket} -> + case Transport:controlling_process(Socket, OwnerPid) of + ok -> + OwnerPid ! {connected, Socket}; + {error, Reason} -> + OwnerPid ! {connect_error, Reason}, + Transport:close(Socket), + connect_loop(wait, OwnerPid, Opts) + end; {error, Reason} -> - Pid ! {connect_error, Reason}, - timer:sleep(Opts#opts.reconnect_wait); - - {ok, ConnectionPid} -> - case init(Pid, ConnectionPid, Opts) of - {socket_closed, ConnectionPid, Reason} -> - Pid ! {socket_closed, Reason}, - timer:sleep(Opts#opts.reconnect_wait); - {ok, ClusterId} -> - Pid ! {connected, ConnectionPid, ClusterId}, - receive - {socket_closed, ConnectionPid, Reason} -> - Pid ! {socket_closed, Reason} - end - end - - end, - connect(Pid, Opts). - + OwnerPid ! {connect_error, Reason}, + connect_loop(wait, OwnerPid, Opts) + end; +connect_loop(wait, OwnerPid, Opts) -> + timer:sleep(Opts#opts.reconnect_wait), + connect_loop(now, OwnerPid, Opts). -init(MainPid, ConnectionPid, Opts) -> +init_connection(State) -> + #st{opts = #opts{transport = Transport} = Opts, + socket = Socket} = State, Cmd1 = [[<<"CLUSTER">>, <<"MYID">>] || Opts#opts.use_cluster_id], Cmd2 = case {Opts#opts.resp_version, Opts#opts.auth} of {3, {Username, Password}} -> @@ -515,22 +877,29 @@ init(MainPid, ConnectionPid, Opts) -> Opts#opts.select_db > 0], case Cmd1 ++ Cmd2 ++ Cmd3 of [] -> - {ok, undefined}; - Commands -> - ered_connection:command_async(ConnectionPid, Commands, init_command_reply), - receive - {init_command_reply, Reply} -> - case [Reason || {error, Reason} <- Reply] of - [] when Opts#opts.use_cluster_id -> - {ok, hd(Reply)}; - [] -> - {ok, undefined}; - Errors -> - MainPid ! {init_error, Errors}, - timer:sleep(Opts#opts.reconnect_wait), - init(MainPid, ConnectionPid, Opts) - end; - Other -> - Other + self() ! {init_command_reply, {ok, []}}, + State; + Pipeline -> + %% Add to pending queue and send like any other commands. + ReplyFun = fun (Reply) -> + self() ! {init_command_reply, Reply} + end, + RespCommand = ered_command:convert_to(Pipeline), + Data = ered_command:get_data(RespCommand), + Command = #command{data = RespCommand, replyto = ReplyFun}, + Class = ered_command:get_response_class(RespCommand), + PendingReq = #pending_req{command = Command, response_class = Class}, + Transport = State#st.opts#opts.transport, + case Transport:send(State#st.socket, Data) of + ok -> + State1 = State#st{pending = q_in(PendingReq, State#st.pending), + status = init}, + %% Send commands immediately or wait for init reply first? + %% process_commands(State1); + State1; + {error, _Reason} -> + %% Send FIN and handle replies in flight before reconnecting. + Transport:shutdown(Socket, read_write), + start_connect_loop(wait, State) end end. diff --git a/src/ered_connection.erl b/src/ered_connection.erl deleted file mode 100644 index 13ebba1..0000000 --- a/src/ered_connection.erl +++ /dev/null @@ -1,384 +0,0 @@ --module(ered_connection). - -%% Managing the socket, sending commands and receiving replies. -%% Batches messages from the process queue. One process handles -%% writing to the socket and one handles the reading and decoding. -%% After a command is sent in the sending process a message is sent to -%% the reading process informing it about how many replies to expect -%% and who expects the result. No reconnection handling, if there is -%% an error the processes will exit. - --export([connect/2, - connect/3, - connect_async/3, - command/2, command/3, - command_async/3]). - --export_type([opt/0, - result/0, - host/0]). - - -%%%=================================================================== -%%% Definitions -%%%=================================================================== --record(recv_st, {transport :: gen_tcp | ssl, - socket :: gen_tcp:socket() | ssl:sslsocket(), - push_cb :: push_cb(), - timeout :: non_neg_integer(), % milliseconds - waiting = [] :: [wait_info()], - waiting_since :: undefined | integer() % erlang:monotonic_time(millisecond) - }). - --type opt() :: - %% If commands are queued up in the process message queue this is the max - %% amount of messages that will be received and sent in one call - {batch_size, non_neg_integer()} | - %% Timeout passed to gen_tcp:connect/4 or ssl:connect/4. - {connect_timeout, timeout()} | - %% Options passed to gen_tcp:connect/4. - {tcp_options, [gen_tcp:connect_option()]} | - %% Timeout passed to gen_tcp:connect/4. DEPRECATED. - {tcp_connect_timeout, timeout()} | - %% Options passed to ssl:connect/4. If this config parameter is present, - %% TLS is used. - {tls_options, [ssl:tls_client_option()]} | - %% Timeout passed to ssl:connect/4. DEPRECATED. - {tls_connect_timeout, timeout()} | - %% Callback for push notifications - {push_cb, push_cb()} | - %% Timeout when waiting for a response from Redis. milliseconds - {response_timeout, non_neg_integer()}. - --type result() :: ered_parser:parse_result(). --type push_cb() :: fun((result()) -> any()). --type wait_info() :: - {ered_command:response_class() | [ered_command:response_class()], - pid(), - Ref :: any(), - Acc :: [result()]}. % Acc used to store partial pipeline results --type host() :: ered:host(). --type connect_result() :: {ok, connection_ref()} | {error, timeout | inet:posix()}. --type connection_ref() :: pid(). - -%% Commands like SUBSCRIBE and UNSUBSCRIBE don't return anything, so we use this -%% return value. --define(pubsub_reply, undefined). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec connect(host(), inet:port_number()) -> connect_result(). --spec connect(host(), inet:port_number(), [opt()]) -> connect_result(). -%% -%% Connect to Redis node. Start send and receive process. -%% When the connection is closed a socket_closed message will be sent. -%% to the calling process. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -connect(Host, Port) -> - connect(Host, Port, []). - -connect(Host, Port, Opts) -> - Pid = connect_async(Host, Port, Opts), - receive - {connected, Pid} -> - {ok, Pid}; - {connect_error, Pid, Reason} -> - {error, Reason} - end. - -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec connect_async(host(), inet:port_number(), [opt()]) -> connection_ref(). -%% -%% Connect to Redis node. Start send and receive process. -%% The function will return before connect is completed and a connected or -%% connect_error message will be sent to the calling process. -%% When the connection is closed a socket_closed message will be sent. -%% to the calling process. -%% -%% Deprecated options: -%% tcp_connect_timeout - replaced by connect_timeout. -%% tls_connect_timeout - replaced by connect_timeout. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -connect_async(Addr, Port, Opts) -> - [error({badarg, BadOpt}) - || BadOpt <- proplists:get_keys(Opts) -- [batch_size, tcp_options, tls_options, push_cb, response_timeout, - tcp_connect_timeout, tls_connect_timeout, connect_timeout]], - BatchSize = proplists:get_value(batch_size, Opts, 16), - ResponseTimeout = proplists:get_value(response_timeout, Opts, 10000), - PushCb = proplists:get_value(push_cb, Opts, fun(_) -> ok end), - TcpOptions = proplists:get_value(tcp_options, Opts, []), - TlsOptions = proplists:get_value(tls_options, Opts, []), - TcpTimeout = proplists:get_value(tcp_connect_timeout, Opts, infinity), - TlsTimeout = proplists:get_value(tls_connect_timeout, Opts, infinity), - {Transport, Options, Timeout0} = case TlsOptions of - [] -> - {gen_tcp, TcpOptions, TcpTimeout}; - _ -> - {ssl, TlsOptions, TlsTimeout} - end, - Timeout = proplists:get_value(connect_timeout, Opts, Timeout0), - Master = self(), - spawn_link( - fun() -> - SendPid = self(), - case catch Transport:connect(Addr, Port, [{active, false}, binary] ++ Options, Timeout) of - {ok, Socket} -> - Master ! {connected, SendPid}, - Pid = spawn_link(fun() -> - ExitReason = recv_loop(Transport, Socket, PushCb, ResponseTimeout), - %% Inform sending process about exit - SendPid ! ExitReason - end), - ExitReason = send_loop(Transport, Socket, Pid, BatchSize), - Master ! {socket_closed, SendPid, ExitReason}; - {error, Reason} -> - Master ! {connect_error, SendPid, Reason}; - Other -> % {'EXIT',_} - Master ! {connect_error, SendPid, Other} - end - end). - -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec command(connection_ref(), ered_command:command()) -> result(). --spec command(connection_ref(), ered_command:command(), timeout()) -> result(). -%% -%% Send a command to the connected Redis node. The argument can be a -%% single command as a list of binaries, a pipeline of command as a -%% list of commands or a formatted redis_command. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -command(Connection, Command) -> - command(Connection, Command, 10000). - -command(Connection, Command, Timeout) -> - link(Connection), - Ref = make_ref(), - Connection ! {send, self(), Ref, ered_command:convert_to(Command)}, - receive {Ref, Value} -> - unlink(Connection), - Value - after Timeout -> - unlink(Connection), - {error, timeout} - end. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec command_async(connection_ref(), ered_command:command(), any()) -> ok. -%% -%% Send a command to the connected Redis node in asynchronous -%% fashion. The provided callback function will be called with the -%% reply. Note that the callback function will executing in the redis -%% client process and should not hang or perform any lengthy task. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -command_async(Connection, Data, Ref) -> - Connection ! {send, self(), Ref, ered_command:convert_to(Data)}, - ok. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - -%% -%% Receive logic -%% - -recv_loop(Transport, Socket, PushCB, Timeout) -> - ParseInit = ered_parser:next(ered_parser:init()), - State = #recv_st{transport = Transport, socket = Socket, push_cb = PushCB, timeout = Timeout}, - try - recv_loop({ParseInit, State}) - catch - %% handle done, parse error, recv error - throw:Reason -> - {recv_exit, Reason} - end. - -recv_loop({ParseResult, State}) -> - Next = case ParseResult of - {need_more, BytesNeeded, ParserState} -> - read_socket(BytesNeeded, ParserState, State); - {done, Value, ParserState} -> - handle_result(Value, ParserState, State) - end, - recv_loop(Next). - -read_socket(BytesNeeded, ParserState, State) -> - State1 = update_waiting(0, State), - WaitTime = get_timeout(State1), - Transport = State1#recv_st.transport, - case Transport:recv(State1#recv_st.socket, BytesNeeded, WaitTime) of - {ok, Data} -> - {ered_parser:continue(Data, ParserState), State1}; - {error, timeout} when State1#recv_st.waiting == [] -> - %% no command pending, try again - read_socket(BytesNeeded, ParserState, State1); - {error, Reason} -> - throw(Reason) - end. - -handle_result({push, Value = [Type|_]}, ParserState, State) -> - %% Pub/sub in RESP3 is a bit quirky. The push is supposed to be out of bound - %% data not connected to any request but for subscribe and unsubscribe - %% requests, a successful command is signalled as one or more push messages. - PushCB = State#recv_st.push_cb, - PushCB(Value), - State1 = case is_subscribe_push(Type) of - true -> - handle_subscribe_push(Value, State); - false -> - State - end, - {ered_parser:next(ParserState), State1}; -handle_result(Value, ParserState, State) -> - {{RespClass, Pid, Ref, Acc}, State1} = pop_waiting(State), - %% Check how many replies expected (list = pipeline) - case RespClass of - Single when not is_list(Single) -> - Pid ! {Ref, Value}, - {ered_parser:next(ParserState), State1}; - [_] -> - %% Last one, send the reply - Pid ! {Ref, lists:reverse([Value | Acc])}, - {ered_parser:next(ParserState), State1}; - [_ | RespClasses] -> - %% More left, save the reply and keep going - State2 = push_waiting({RespClasses, Pid, Ref, [Value | Acc]}, State1), - {ered_parser:next(ParserState), State2} - end. - -is_subscribe_push(<<"subscribe">>) -> - true; -is_subscribe_push(<>) when X >= $a, X =< $z -> - true; -is_subscribe_push(<<"unsubscribe">>) -> - true; -is_subscribe_push(<>) when X >= $a, X =< $z -> - true; -is_subscribe_push(_) -> - false. - -handle_subscribe_push(PushMessage, State) -> - case try_pop_waiting(State) of - {PoppedWaiting, State1} -> - handle_subscribed_popped_waiting(PushMessage, PoppedWaiting, State1); - none -> - %% No commands pending. - State - end. - -handle_subscribed_popped_waiting(Push, Waiting = {ExpectClass, Pid, Ref, Acc}, State) -> - case {ExpectClass, hd(Push)} of - {{Type, N}, Type} % simple command - when N =:= 0; % unsubscribing from all channels - N =:= 1 -> % or subscribed to all channels - Pid ! {Ref, ?pubsub_reply}, - State; - {{Type, N}, Type} % simple command - when N > 1 -> % not yet subscribed all channels - push_waiting({{Type, N - 1}, Pid, Ref, Acc}, State); - {[{Type, N}], Type} % last command in pipeline - when N =:= 0; % unsubscribing from all channels - N =:= 1 -> % or subscribed to all channels - Pid ! {Ref, lists:reverse([?pubsub_reply | Acc])}, - State; - {[{Type, N} | Classes], Type} % pipeline, not the last command - when N =:= 0; % unsubscribing from all channels - N =:= 1 -> % or subscribed to all channels - push_waiting({Classes, Pid, Ref, [?pubsub_reply | Acc]}, State); - {[{Type, N} | Classes], Type} % pipeline - when N > 1 -> % not yet subscribed all channels - push_waiting({[{Type, N - 1} | Classes], Pid, Ref, Acc}, State); - _Otherwise -> - %% Not waiting for this particular push message. - push_waiting(Waiting, State) - end. - -get_timeout(State) -> - case State#recv_st.waiting_since of - undefined -> - State#recv_st.timeout; - Since -> - case State#recv_st.timeout - (erlang:monotonic_time(millisecond) - Since) of - T when T < 0 -> 0; - T -> T - end - end. - -pop_waiting(State) -> - State1 = update_waiting(infinity, State), - [WaitInfo | Rest] = State1#recv_st.waiting, - {WaitInfo, State1#recv_st{waiting = Rest}}. - -try_pop_waiting(State) -> - State1 = update_waiting(0, State), - case State1#recv_st.waiting of - [WaitInfo | Rest] -> - {WaitInfo, State1#recv_st{waiting = Rest}}; - [] -> - none - end. - -push_waiting(WaitInfo,State) -> - State#recv_st{waiting = [WaitInfo | State#recv_st.waiting]}. - -update_waiting(Timeout, State) when State#recv_st.waiting == [] -> - case receive Msg -> Msg after Timeout -> timeout end of - {requests, Req, Time} -> - State#recv_st{waiting = Req, waiting_since = Time}; - timeout -> - State#recv_st{waiting_since = undefined}; - close_down -> - throw(done) - end; -update_waiting(_Timeout, State) -> - State. - -%% -%% Send logic -%% - -send_loop(Transport, Socket, RecvPid, BatchSize) -> - case receive_data(BatchSize) of - {recv_exit, Reason} -> - {recv_exit, Reason}; - {data, {Refs, Data}} -> - Time = erlang:monotonic_time(millisecond), - case Transport:send(Socket, Data) of - ok -> - %% send to recv proc to fetch the response - RecvPid ! {requests, Refs, Time}, - send_loop(Transport, Socket, RecvPid, BatchSize); - {error, Reason} -> - %% Give recv_loop time to finish processing - %% This will shut down recv_loop if it is waiting on socket - Transport:shutdown(Socket, read_write), - %% This will shut down recv_loop if it is waiting for a reference - RecvPid ! close_down, - %% Ok, recv done, time to die - receive {recv_exit, _Reason} -> ok end, - {send_exit, Reason} - end - end. - -receive_data(N) -> - receive_data(N, infinity, []). - -receive_data(0, _Time, Acc) -> - {data, lists:unzip(lists:reverse(Acc))}; -receive_data(N, Time, Acc) -> - receive - {recv_exit, Reason} -> - {recv_exit, Reason}; - {send, Pid, Ref, Commands} -> - Data = ered_command:get_data(Commands), - Class = ered_command:get_response_class(Commands), - RefInfo = {Class, Pid, Ref, []}, - Acc1 = [{RefInfo, Data} | Acc], - receive_data(N - 1, 0, Acc1); - _Ignore -> - %% Mitigate OTP TLS 1.3 bug #10273 leaking a message {Ref, ok}. - receive_data(N, 0, Acc) - after Time -> - receive_data(0, 0, Acc) - end. diff --git a/src/ered_parser.erl b/src/ered_parser.erl index 629a5b5..11146a3 100644 --- a/src/ered_parser.erl +++ b/src/ered_parser.erl @@ -7,7 +7,8 @@ continue/2]). -export_type([parse_return/0, - parse_result/0 + parse_result/0, + state/0 ]). %%%=================================================================== @@ -25,10 +26,12 @@ -type parse_result() :: binary() | {error, binary()} | integer() | undefined | [parse_result()] | inf | neg_inf | nan | float() | true | false | #{parse_result() => parse_result()} | sets:set(parse_result()) | - {attribute, parse_result(), parse_result()} | {push | parse_result()}. + {attribute, parse_result(), parse_result()} | {push, parse_result()}. - --type parse_return() :: {done, parse_result(), #parser_state{}} | {need_more, bytes_needed(), #parser_state{}}. +-opaque state() :: #parser_state{}. +-type parse_return() :: {done, parse_result(), state()} | + {need_more, bytes_needed(), state()} | + {parse_error, any()}. -if(?OTP_RELEASE >= 24). -define(sets_new, sets:new([{version, 2}])). @@ -41,7 +44,7 @@ %%%=================================================================== %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec init() -> #parser_state{}. +-spec init() -> state(). %% %% Init empty parser continuation %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -49,7 +52,7 @@ init() -> #parser_state{}. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec next(#parser_state{}) -> parse_return(). +-spec next(state()) -> parse_return(). %% %% Get next result or continuation. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -57,7 +60,7 @@ next(State) -> parse(State). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec continue(binary(), #parser_state{}) -> parse_return(). +-spec continue(binary(), state()) -> parse_return(). %% %% Feed more data to the parser. Get next result or continuation. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -70,16 +73,21 @@ continue(NewData, State) -> %%%=================================================================== parse(State=#parser_state{data=Data, next=Fun, bytes_needed=Bytes}) -> - case token(Data, Bytes) of - {need_more, LackingBytes} -> - {need_more, LackingBytes, State}; - {Token, Rest} -> - case Fun(Token) of - {done, Result} -> - {done, Result, #parser_state{data=Rest}}; - {cont, NextFun, NextBytes} -> - parse(#parser_state{data=Rest, next=NextFun, bytes_needed=NextBytes}) - end + try + case token(Data, Bytes) of + {need_more, LackingBytes} -> + {need_more, LackingBytes, State}; + {Token, Rest} -> + case Fun(Token) of + {done, Result} -> + {done, Result, #parser_state{data=Rest}}; + {cont, NextFun, NextBytes} -> + parse(#parser_state{data=Rest, next=NextFun, bytes_needed=NextBytes}) + end + end + catch + {parse_error, _Reason} = ParseError -> + ParseError end. token(Data, 0) -> diff --git a/test/ered_client_tests.erl b/test/ered_client_tests.erl index 51ceab6..b410dd1 100644 --- a/test/ered_client_tests.erl +++ b/test/ered_client_tests.erl @@ -40,7 +40,8 @@ request_t() -> fail_connect_t() -> {ok,Pid} = ered_client:start_link("127.0.0.1", 0, [{info_pid, self()}]), - {connect_error,econnrefused} = expect_connection_down(Pid), + {connect_error, Reason} = expect_connection_down(Pid), + true = Reason =:= econnrefused orelse Reason =:= eaddrnotavail, %% make sure there are no more connection down messages timeout = receive M -> M after 500 -> timeout end. @@ -66,7 +67,7 @@ fail_parse_t() -> Pid ! ered_client:command(Client, [<<"ping">>]) end), expect_connection_up(Client), - Reason = {recv_exit, {parse_error,{invalid_data,<<"&pong">>}}}, + Reason = {parse_error, {invalid_data, <<"&pong">>}}, receive #{msg_type := socket_closed, reason := Reason} -> ok end, expect_connection_up(Client), {ok, <<"pong">>} = get_msg(). @@ -75,18 +76,22 @@ fail_parse_t() -> server_close_socket_t() -> {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), {ok, Port} = inet:port(ListenSock), - spawn_link(fun() -> - {ok, Sock} = gen_tcp:accept(ListenSock), - gen_tcp:close(Sock), - - %% resend from client - {ok, _Sock2} = gen_tcp:accept(ListenSock), - receive ok -> ok end - end), + ServerPid = + spawn_link(fun() -> + {ok, Sock} = gen_tcp:accept(ListenSock), + receive continue -> ok end, + gen_tcp:close(Sock), + + %% resend from client + {ok, _Sock2} = gen_tcp:accept(ListenSock), + receive done -> ok end + end), Client = start_client(Port), expect_connection_up(Client), - receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end, - expect_connection_up(Client). + ServerPid ! continue, + receive #{msg_type := socket_closed, reason := tcp_closed} -> ok end, + expect_connection_up(Client), + ServerPid ! done. %% Suppress warning from command 'bad_request' @@ -113,7 +118,7 @@ server_buffer_full_t() -> Expected = iolist_to_binary(lists:duplicate(5, Ping)), {ok, Expected} = gen_tcp:recv(Sock, size(Expected)), %% should be nothing more since only 5 pending - {error, timeout} = gen_tcp:recv(Sock, 0, 0), + ?assertEqual({error, timeout}, gen_tcp:recv(Sock, 0, 0)), timer:sleep(500), @@ -127,15 +132,15 @@ server_buffer_full_t() -> receive ok -> ok end end), - Client = start_client(Port, [{max_waiting, 5}, {max_pending, 5}, {queue_ok_level,1}]), + Client = start_client(Port, [{batch_size, 16},{max_waiting, 5}, {max_pending, 5}, {queue_ok_level,1}]), expect_connection_up(Client), Pid = self(), [ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)], receive #{msg_type := queue_full} -> ok end, - {6, {error, queue_overflow}} = get_msg(), + ?assertEqual({6, {error, queue_overflow}}, get_msg()), receive #{msg_type := queue_ok} -> ok end, - [{N, {ok, <<"pong">>}} = get_msg()|| N <- [1,2,3,4,5,7,8,9,10,11]], + [?assertEqual({N, {ok, <<"pong">>}} , get_msg()) || N <- [1,2,3,4,5,7,8,9,10,11]], no_more_msgs(). @@ -163,7 +168,7 @@ server_buffer_full_reconnect_t() -> receive ok -> ok end end), - Client = start_client(Port, [{max_waiting, 5}, {max_pending, 5}, {queue_ok_level,1}]), + Client = start_client(Port, [{batch_size, 16}, {max_waiting, 5}, {max_pending, 5}, {queue_ok_level,1}]), expect_connection_up(Client), Pid = self(), @@ -172,7 +177,7 @@ server_buffer_full_reconnect_t() -> receive #{msg_type := queue_full} -> ok end, %% 1 message over the limit, first one in queue gets kicked out {6, {error, queue_overflow}} = get_msg(), - receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end, + receive #{msg_type := socket_closed, reason := tcp_closed} -> ok end, %% when connection goes down the pending messages will be put in the queue and the queue %% will overflow kicking out the oldest first [{N, {error, queue_overflow}} = get_msg() || N <- [1,2,3,4,5]], @@ -201,9 +206,9 @@ server_buffer_full_node_goes_down_t() -> Pid = self(), [ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)], receive #{msg_type := queue_full} -> ok end, - {6, {error, queue_overflow}} = get_msg(), - receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end, - [{N, {error, queue_overflow}} = get_msg() || N <- [1,2,3,4,5]], + ?assertEqual({6, {error, queue_overflow}}, get_msg()), + receive #{msg_type := socket_closed, reason := tcp_closed} -> ok end, + [?assertEqual({N, {error, queue_overflow}}, get_msg()) || N <- [1,2,3,4,5]], receive #{msg_type := queue_ok} -> ok end, receive #{msg_type := connect_error, reason := econnrefused} -> ok end, receive #{msg_type := node_down_timeout} -> ok end, @@ -242,7 +247,7 @@ send_timeout_t() -> Pid = self(), ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {reply, Reply} end), %% this should come after max 1000ms - receive #{msg_type := socket_closed, reason := {recv_exit, timeout}} -> ok after 2000 -> timeout_error() end, + receive #{msg_type := socket_closed, reason := timeout} -> ok after 2000 -> timeout_error() end, expect_connection_up(Client), {reply, {ok, <<"pong">>}} = get_msg(), no_more_msgs(). @@ -256,15 +261,12 @@ fail_hello_t() -> {ok, <<"*2\r\n$5\r\nHELLO\r\n$1\r\n3\r\n">>} = gen_tcp:recv(Sock, 0), ok = gen_tcp:send(Sock, <<"-NOPROTO unsupported protocol version\r\n">>), - %% test resend - {ok, <<"*2\r\n$5\r\nHELLO\r\n$1\r\n3\r\n">>} = gen_tcp:recv(Sock, 0), - ok = gen_tcp:send(Sock, <<"-NOPROTO unsupported protocol version\r\n">>), - - Pid ! done + Pid ! done, + {error, closed} = gen_tcp:recv(Sock, 0) end), {ok,Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}]), - {init_error, [<<"NOPROTO unsupported protocol version">>]} = expect_connection_down(Client), receive done -> ok end, + {init_error, [<<"NOPROTO unsupported protocol version">>]} = expect_connection_down(Client), no_more_msgs(). hello_with_auth_t() -> @@ -294,11 +296,13 @@ hello_with_auth_t() -> "$6\r\nmaster\r\n" "$7\r\nmodules\r\n" "*0\r\n">>), - Pid ! done + Pid ! done, + receive ok -> ok end end), - {ok, _Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}, - {auth, {<<"ali">>, <<"sesame">>}}]), + {ok, Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}, + {auth, {<<"ali">>, <<"sesame">>}}]), receive done -> ok end, + expect_connection_up(Client), no_more_msgs(). hello_with_auth_fail_t() -> @@ -333,12 +337,14 @@ auth_t() -> "$6\r\nsesame\r\n">>} = gen_tcp:recv(Sock, 0), ok = gen_tcp:send(Sock, <<"+OK\r\n">>), - Pid ! done + Pid ! done, + receive ok -> ok end end), - {ok, _Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}, - {resp_version, 2}, - {auth, {<<"ali">>, <<"sesame">>}}]), + {ok, Client} = ered_client:start_link("127.0.0.1", Port, [{info_pid, self()}, + {resp_version, 2}, + {auth, {<<"ali">>, <<"sesame">>}}]), receive done -> ok end, + expect_connection_up(Client), no_more_msgs(). auth_fail_t() -> diff --git a/test/ered_cluster_SUITE.erl b/test/ered_cluster_SUITE.erl index 66666a7..0c4f966 100644 --- a/test/ered_cluster_SUITE.erl +++ b/test/ered_cluster_SUITE.erl @@ -272,7 +272,7 @@ t_hard_failover(_) -> ct:pal("~p\n", [ered_cluster:command_all(R, [<<"CLUSTER">>, <<"SLOTS">>])]), ct:pal(os:cmd("docker stop " ++ Pod)), - ?MSG(#{msg_type := socket_closed, addr := {"127.0.0.1", Port}, reason := {recv_exit, closed}}), + ?MSG(#{msg_type := socket_closed, addr := {"127.0.0.1", Port}, reason := tcp_closed}), ?MSG(#{msg_type := connect_error, addr := {"127.0.0.1", Port}, reason := econnrefused}), ?MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", Port}}, 2500), @@ -363,7 +363,7 @@ t_manual_failover_then_old_master_down(_) -> ?MSG(#{addr := {"127.0.0.1", Port}, master := true, msg_type := socket_closed, - reason := {recv_exit, closed}}), + reason := tcp_closed}), %% Ered prefers the replica of the disconnected node for slot map update, %% since it is likely to know about a failover first; it is the new master. @@ -420,7 +420,7 @@ t_blackhole(_) -> ered:command_async(ClientRef, [<<"PING">>], fun(Reply) -> TestPid ! {ping_reply, Reply} end), - ?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}, master := true}, + ?MSG(#{msg_type := socket_closed, reason := timeout, master := true}, ResponseTimeout + 1000), ?MSG({ping_reply, {error, _Reason}}, % node_down or node_deactivated NodeDownTimeout + 1000), @@ -432,6 +432,8 @@ t_blackhole(_) -> ?OPTIONAL_MSG(#{msg_type := cluster_ok}), ?MSG(#{msg_type := client_stopped, reason := shutdown, master := false}, CloseWait + 1000), + ?OPTIONAL_MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", Port}}), + no_more_msgs(), ct:pal("Unpausing container: " ++ os:cmd("docker unpause " ++ Pod)), timer:sleep(500), @@ -477,7 +479,7 @@ t_blackhole_all_nodes(_) -> fun(Reply) -> TestPid ! {ping_reply, Reply} end) end, AddrToClient), - [?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}, addr := {"127.0.0.1", Port}}, + [?MSG(#{msg_type := socket_closed, reason := timeout, addr := {"127.0.0.1", Port}}, ResponseTimeout + 1000) || Port <- ?PORTS], ?MSG({ping_reply, {error, _Reason1}}, NodeDownTimeout + 1000), ?MSG({ping_reply, {error, _Reason2}}, NodeDownTimeout + 1000), @@ -526,11 +528,14 @@ t_init_timeout(_) -> ct:pal("~p\n", [os:cmd("redis-cli -p 30001 CLIENT PAUSE 10000")]), {ok, _P} = ered_cluster:connect([{localhost, 30001}], [{info_pid, [self()]}] ++ Opts), - ?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}}, 3500), + ?MSG(#{msg_type := socket_closed, reason := timeout}, 3500), ?MSG(#{msg_type := node_down_timeout, addr := {localhost, 30001}}, 2500), %% Does not work on Redis before 6.2.0. ct:pal("~p\n", [os:cmd("redis-cli -p 30001 CLIENT UNPAUSE")]), + %% Maybe we were waiting for init commands when the node down timeout fired. + ?OPTIONAL_MSG(#{msg_type := init_error, reason := node_down}), + ?MSG(#{msg_type := connected, addr := {localhost, 30001}}), ?MSG(#{msg_type := slot_map_updated}), @@ -760,8 +765,8 @@ t_kill_client(_) -> ct:pal("~p\n",[os:cmd("redis-cli -p " ++ integer_to_list(Port) ++ " CLIENT KILL TYPE NORMAL")]), ?MSG(#{msg_type := socket_closed, addr := {_, Port}}), - %% connection reestablished - ?MSG(#{msg_type := connected, addr := {_, Port}}), + %% Waits until 1000ms after the first connect before reconnecting. + ?MSG(#{msg_type := connected, addr := {_, Port}}, 2000), no_more_msgs(). t_new_cluster_master(_) -> diff --git a/test/ered_cluster_tls_SUITE.erl b/test/ered_cluster_tls_SUITE.erl index 475aa5e..e0ab5e1 100644 --- a/test/ered_cluster_tls_SUITE.erl +++ b/test/ered_cluster_tls_SUITE.erl @@ -190,8 +190,10 @@ t_expired_cert_tls_1_3(_) -> [{info_pid, [self()]}, {client_opts, ClientOpts}]), ?MSG(#{msg_type := socket_closed, addr := {"127.0.0.1", 31001}, - reason := {recv_exit, - {tls_alert, - {certificate_expired, _}}}}), + reason := {tls_alert, {certificate_expired, _}}}), ?MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", 31001}}, 2500), + timer:sleep(10), % Wait for the optional messages. + ?OPTIONAL_MSG(#{msg_type := init_error, reason := node_down}), + ?OPTIONAL_MSG(#{msg_type := socket_closed, + reason := {tls_alert, {certificate_expired, _}}}), no_more_msgs(). diff --git a/test/ered_connection_tests.erl b/test/ered_connection_tests.erl deleted file mode 100644 index c9ef7ec..0000000 --- a/test/ered_connection_tests.erl +++ /dev/null @@ -1,86 +0,0 @@ --module(ered_connection_tests). - --include_lib("eunit/include/eunit.hrl"). - -split_data_test() -> - Data = << <<"A">> || _ <- lists:seq(1, 3000) >>, - {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), - {ok, Port} = inet:port(ListenSock), - spawn_link(fun() -> - {ok, Sock} = gen_tcp:accept(ListenSock), - {ok, <<"*2\r\n$5\r\nhello\r\n$1\r\n3\r\n">>} = gen_tcp:recv(Sock, 0), - HelloReply = <<"%7\r\n", - "$6\r\nserver\r\n", "$6\r\nvalkey\r\n", - "$7\r\nversion\r\n", "$5\r\n9.0.0\r\n", - "$5\r\nproto\r\n", ":3\r\n" - "$2\r\nid\r\n", ":2\r\n", - "$4\r\nmode\r\n", "$10\r\nstandalone\r\n" - "$4\r\nrole\r\n", "$6\r\nmaster\r\n" - "$7\r\nmodules\r\n", "*0\r\n">>, - ok = gen_tcp:send(Sock, HelloReply), - SetCommand = <<"*3\r\n$3\r\nset\r\n$4\r\nkey1\r\n$3000\r\n", Data/binary, "\r\n">>, - {ok, SetCommand} = gen_tcp:recv(Sock, size(SetCommand)), - ok = gen_tcp:send(Sock, <<"+OK\r\n">>), - {ok, <<"*2\r\n$3\r\nget\r\n$4\r\nkey1\r\n">>} = gen_tcp:recv(Sock, 0), - ok = gen_tcp:send(Sock, <<"$3000\r\n", Data/binary, "\r\n">>), - {error, closed} = gen_tcp:recv(Sock, 0), - %% ok = gen_tcp:shutdown(Sock, write), - ok - end), - {ok, Conn1} = ered_connection:connect("127.0.0.1", Port), - ered_connection:command(Conn1, [<<"hello">>, <<"3">>]), - <<"OK">> = ered_connection:command(Conn1, [<<"set">>, <<"key1">>, Data]), - Data = ered_connection:command(Conn1, [<<"get">>, <<"key1">>]). - -%% Suppress warnings due to expected failures from MalformedCommand. --dialyzer({[no_fail_call, no_return], trailing_reply_test/0}). -trailing_reply_test() -> - Pid = self(), - %% 277124 byte nested array, it takes a non-trivial time to parse - BigNastyData = iolist_to_binary(nested_list(8)), - ?debugFmt("~w", [size(BigNastyData)]), - - spawn_link(fun() -> - {ok, ListenSock} = gen_tcp:listen(0, [binary, {active , false}]), - {ok, Port} = inet:port(ListenSock), - Pid ! {port, Port}, - {ok, Sock} = gen_tcp:accept(ListenSock), - {ok, <<"*1\r\n$4\r\nping\r\n">>} = gen_tcp:recv(Sock, 0), - ok = gen_tcp:send(Sock, BigNastyData), - ok = gen_tcp:shutdown(Sock, write), - Pid ! sent_big_nasty, - receive ok -> ok end - end), - {port, Port} = receive_msg(), - %% increase receive buffer to fit the whole nasty data package - {ok, Conn1} = ered_connection:connect("127.0.0.1", Port, [{batch_size, 1}, - {tcp_options, [{recbuf, 524288}]}]), - ?debugFmt("~w", [Conn1]), - ered_connection:command_async(Conn1, [<<"ping">>], ping1), - receive sent_big_nasty -> ok end, - MalformedCommand = {redis_command, pipeline, [undefined]}, - ered_connection:command_async(Conn1, MalformedCommand, no_ref), - - %% make sure the ping is received before the connection is shut down - - ?debugMsg("waiting for ping"), - - receive {ping1, _} -> ok after 2000 -> exit(waiting_for_ping) end, - ?debugMsg("got ping"), - {socket_closed, Conn1, {send_exit, einval}} = receive Msg -> Msg end, - ensure_empty(). - - -receive_msg() -> - receive Msg -> Msg end. - -%% This function is used from trailing_reply_test() --dialyzer({no_unused, ensure_empty/0}). -ensure_empty() -> - empty = receive Msg -> Msg after 0 -> empty end. - - -nested_list(1) -> - <<"+A\r\n">>; -nested_list(N) -> - ["*", integer_to_list(N), "\r\n", [nested_list(N-1) || _ <- lists:seq(1, N)]]. diff --git a/test/ered_parser_tests.erl b/test/ered_parser_tests.erl index 3b063b9..2dc318e 100644 --- a/test/ered_parser_tests.erl +++ b/test/ered_parser_tests.erl @@ -85,12 +85,11 @@ parse_fail_test_() -> decode_err(In, Expected) -> fun() -> - try - A = ered_parser:continue(In, ered_parser:init()), - exit({unexpected_success, A}) - catch - throw:{parse_error, Err} -> - ?assertEqual(Expected, Err) + case ered_parser:continue(In, ered_parser:init()) of + {parse_error, Err} -> + ?assertEqual(Expected, Err); + A -> + exit({unexpected_success, A}) end end.