Add support for sending batched data#158
Add support for sending batched data#158WilliamVoong wants to merge 1 commit intoEricsson:single-process-clientfrom
Conversation
* Bonus:
Avoid pattern matching in
eunit tests to verify output/input,
leads to ambigious function clause
errors when clause does not match.
Change-Id: I9e234753c424e520c4d388039919aebc1c1f93f6
There was a problem hiding this comment.
Imlementation looks good. I have some comments about naming and style.
I don't see any tests where pending reaches the maximum and we start building up batches. Is this already covered in some cluster test? If not, we should probably do it in ered_client_tests.
| fun({connection_opts, Val}, S) -> handle_connection_opts(S, Val); | ||
| ({max_waiting, Val}, S) -> S#opts{max_waiting = Val}; | ||
| ({max_pending, Val}, S) -> S#opts{max_pending = Val}; | ||
| ({batch_size, Val}, S) -> S#opts{batch_size = Val}; |
There was a problem hiding this comment.
There's already a batch_size option, though it's specified under connection_opts currently. Let's not change it in this PR. We can do it separately, probably in a separate ered version.
|
|
||
| waiting = q_new() :: command_queue(), | ||
| pending = q_new() :: command_queue(), | ||
| allow_new_pending_request = true :: boolean(), |
There was a problem hiding this comment.
The name of this field is vague. It should include some information that it's about filling up a batch. It'd prefer a name like filling_batch, accumulating_batch or something like that.
We could also allow or not allow sending data to the socket because the send buffers are full.
| NumPending = q_len(State#st.pending), | ||
| BatchSize = State#st.opts#opts.batch_size, | ||
|
|
||
| PendingLimit = max(State#st.opts#opts.max_pending - BatchSize, 1), |
There was a problem hiding this comment.
The upper limit is max_pending. This new limit needs a better name.
Is this a lower limit? Or is it an upper limit when we're accumulating a batch? When pending is down to this level, send the next batch, right?
Let's make this clear in the name. Depending on which word we want to use for filling up a batch.
Idea: PendingLimitWhenFillingBatch
| q_in(Items, {Size, Q1}) when is_list(Items) -> | ||
| Q2 = queue:from_list(Items), | ||
| {Size + queue:len(Q2), queue:join(Q1, Q2)}; | ||
| q_in(Item, {Size, Q}) -> | ||
| {Size+1, queue:in(Item, Q)}. |
There was a problem hiding this comment.
I don't think it's a good idea to overload this with a different behavior, for three reasons:
- What if the elements in the queue are lists and we want to add another list to the queue?
- Even if we don't use lists as queue elements, it's a bit confusing to overload it.
- This
q_API is designed to be mirroring thequeueAPI in OTP.
Let's use a new function for adding multiple items instead. Two possibilities:
q_multi_in- let the caller just do
q_join(q_from_list(Items), Queue)(just mirror thequeuemodule API)
| abort_pending_commands(State) -> | ||
| PendingReqs = [Req#pending_req.command || Req <- q_to_list(State#st.pending)], | ||
| State#st{waiting = q_join(q_from_list(PendingReqs), State#st.waiting), | ||
| State#st{waiting = q_in_r(PendingReqs, State#st.waiting), |
There was a problem hiding this comment.
This looks wrong (if you don't know about the overloading). It looks like the list is being inserted as a single item in the queue.
I think the use of q_join and q_from_list is more clear and explicit.
| {Out, Rest} = queue:split(N, Q), | ||
| {queue:to_list(Out), {Size - N, Rest}}; | ||
| q_multi_out(_, {_, Q}) -> | ||
| {queue:to_list(Q), {0, queue:new()}}. |
There was a problem hiding this comment.
The idea behind q_split and q_join is that they mirror the queue:split and queue:join functions. That's why I think it's slightly cleaner to use these.
To handle N > Size, the caller can just use code like
q_split(min(BatchSize, NumWaiting), State#st.waiting)| RespCommand = Command#command.data, | ||
| Data = ered_command:get_data(RespCommand), | ||
| Class = ered_command:get_response_class(RespCommand), | ||
| {Commands, NewWaiting} = q_multi_out(BatchSize, State#st.waiting), |
There was a problem hiding this comment.
If we use q_split instead here, then we get the commands to send as a queue.
The benefit is that we mirror the queue API, which is familiar to people who know OTP.
I believe q_multi_out is only use in one place anyway, right?
| {Commands, NewWaiting} = q_multi_out(BatchSize, State#st.waiting), | |
| {CommandsQ, NewWaiting} = q_split(min(BatchSize, NumWaiting), | |
| State#st.waiting), | |
| Commands = q_to_list(CommandQ), |
| Class = ered_command:get_response_class(RespCommand), | ||
| {Commands, NewWaiting} = q_multi_out(BatchSize, State#st.waiting), | ||
| {BatchedData, PendingRequests} = | ||
| lists:foldr(fun(Command, {B, P}) -> |
There was a problem hiding this comment.
It's common to use variable names with Acc in these accumulators. For example:
| lists:foldr(fun(Command, {B, P}) -> | |
| lists:foldr(fun(Command, {DataAcc, PendingAcc}) -> |
Change-Id: I9e234753c424e520c4d388039919aebc1c1f93f6