Skip to content

Add support for sending batched data#158

Open
WilliamVoong wants to merge 1 commit intoEricsson:single-process-clientfrom
WilliamVoong:victor/single-process-client
Open

Add support for sending batched data#158
WilliamVoong wants to merge 1 commit intoEricsson:single-process-clientfrom
WilliamVoong:victor/single-process-client

Conversation

@WilliamVoong
Copy link

  • Bonus: Avoid pattern matching in eunit tests to verify output/input, leads to ambigious function clause errors when clause does not match.

Change-Id: I9e234753c424e520c4d388039919aebc1c1f93f6

* Bonus:
        Avoid pattern matching in
        eunit tests to verify output/input,
        leads to ambigious function clause
        errors when clause does not match.

Change-Id: I9e234753c424e520c4d388039919aebc1c1f93f6
@zuiderkwast zuiderkwast self-requested a review March 2, 2026 14:26
Copy link
Collaborator

@zuiderkwast zuiderkwast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imlementation looks good. I have some comments about naming and style.

I don't see any tests where pending reaches the maximum and we start building up batches. Is this already covered in some cluster test? If not, we should probably do it in ered_client_tests.

fun({connection_opts, Val}, S) -> handle_connection_opts(S, Val);
({max_waiting, Val}, S) -> S#opts{max_waiting = Val};
({max_pending, Val}, S) -> S#opts{max_pending = Val};
({batch_size, Val}, S) -> S#opts{batch_size = Val};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's already a batch_size option, though it's specified under connection_opts currently. Let's not change it in this PR. We can do it separately, probably in a separate ered version.


waiting = q_new() :: command_queue(),
pending = q_new() :: command_queue(),
allow_new_pending_request = true :: boolean(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this field is vague. It should include some information that it's about filling up a batch. It'd prefer a name like filling_batch, accumulating_batch or something like that.

We could also allow or not allow sending data to the socket because the send buffers are full.

NumPending = q_len(State#st.pending),
BatchSize = State#st.opts#opts.batch_size,

PendingLimit = max(State#st.opts#opts.max_pending - BatchSize, 1),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upper limit is max_pending. This new limit needs a better name.

Is this a lower limit? Or is it an upper limit when we're accumulating a batch? When pending is down to this level, send the next batch, right?

Let's make this clear in the name. Depending on which word we want to use for filling up a batch.

Idea: PendingLimitWhenFillingBatch

Comment on lines +740 to 744
q_in(Items, {Size, Q1}) when is_list(Items) ->
Q2 = queue:from_list(Items),
{Size + queue:len(Q2), queue:join(Q1, Q2)};
q_in(Item, {Size, Q}) ->
{Size+1, queue:in(Item, Q)}.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good idea to overload this with a different behavior, for three reasons:

  1. What if the elements in the queue are lists and we want to add another list to the queue?
  2. Even if we don't use lists as queue elements, it's a bit confusing to overload it.
  3. This q_ API is designed to be mirroring the queue API in OTP.

Let's use a new function for adding multiple items instead. Two possibilities:

  1. q_multi_in
  2. let the caller just do q_join(q_from_list(Items), Queue) (just mirror the queue module API)

abort_pending_commands(State) ->
PendingReqs = [Req#pending_req.command || Req <- q_to_list(State#st.pending)],
State#st{waiting = q_join(q_from_list(PendingReqs), State#st.waiting),
State#st{waiting = q_in_r(PendingReqs, State#st.waiting),
Copy link
Collaborator

@zuiderkwast zuiderkwast Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks wrong (if you don't know about the overloading). It looks like the list is being inserted as a single item in the queue.

I think the use of q_join and q_from_list is more clear and explicit.

{Out, Rest} = queue:split(N, Q),
{queue:to_list(Out), {Size - N, Rest}};
q_multi_out(_, {_, Q}) ->
{queue:to_list(Q), {0, queue:new()}}.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind q_split and q_join is that they mirror the queue:split and queue:join functions. That's why I think it's slightly cleaner to use these.

To handle N > Size, the caller can just use code like

    q_split(min(BatchSize, NumWaiting), State#st.waiting)

RespCommand = Command#command.data,
Data = ered_command:get_data(RespCommand),
Class = ered_command:get_response_class(RespCommand),
{Commands, NewWaiting} = q_multi_out(BatchSize, State#st.waiting),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use q_split instead here, then we get the commands to send as a queue.

The benefit is that we mirror the queue API, which is familiar to people who know OTP.

I believe q_multi_out is only use in one place anyway, right?

Suggested change
{Commands, NewWaiting} = q_multi_out(BatchSize, State#st.waiting),
{CommandsQ, NewWaiting} = q_split(min(BatchSize, NumWaiting),
State#st.waiting),
Commands = q_to_list(CommandQ),

Class = ered_command:get_response_class(RespCommand),
{Commands, NewWaiting} = q_multi_out(BatchSize, State#st.waiting),
{BatchedData, PendingRequests} =
lists:foldr(fun(Command, {B, P}) ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's common to use variable names with Acc in these accumulators. For example:

Suggested change
lists:foldr(fun(Command, {B, P}) ->
lists:foldr(fun(Command, {DataAcc, PendingAcc}) ->

@zuiderkwast zuiderkwast changed the title Add support for sending batched data. Add support for sending batched data Mar 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants