Skip to content

Commit b6582e5

Browse files
author
ttt161
committed
TECH-61: bump progressor
1 parent 6fc91d0 commit b6582e5

3 files changed

Lines changed: 31 additions & 64 deletions

File tree

apps/hg_progressor/src/hg_progressor.erl

Lines changed: 29 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,15 @@ call_automaton('Start', {NS, ID, Args}) ->
3737
call_automaton('Call', {MachineDesc, Args}) ->
3838
#mg_stateproc_MachineDescriptor{
3939
ns = NS,
40-
ref = {id, ID}
40+
ref = {id, ID},
41+
range = HistoryRange
4142
} = MachineDesc,
4243
Req = #{
4344
ns => erlang:binary_to_atom(NS),
4445
id => ID,
4546
args => maybe_unmarshal(term, Args),
46-
context => get_context()
47+
context => get_context(),
48+
range => unmarshal(history_range, HistoryRange)
4749
},
4850
case progressor:call(Req) of
4951
{ok, _Response} = Ok ->
@@ -59,17 +61,17 @@ call_automaton('GetMachine', {MachineDesc}) ->
5961
#mg_stateproc_MachineDescriptor{
6062
ns = NS,
6163
ref = {id, ID},
62-
range = Range
64+
range = HistoryRange
6365
} = MachineDesc,
6466
Req = #{
6567
ns => erlang:binary_to_atom(NS),
6668
id => ID,
67-
range => unmarshal(range, Range)
69+
range => unmarshal(history_range, HistoryRange)
6870
},
6971
case progressor:get(Req) of
7072
{ok, Process} ->
7173
Machine = marshal(process, Process#{ns => NS}),
72-
{ok, Machine#mg_stateproc_Machine{history_range = Range}};
74+
{ok, Machine};
7375
{error, <<"process not found">>} ->
7476
{error, notfound};
7577
{error, {exception, _, _} = Exception} ->
@@ -116,10 +118,10 @@ cleanup() ->
116118
%% Processor
117119

118120
-spec process({task_t(), encoded_args(), process()}, map(), encoded_ctx()) -> process_result().
119-
process({CallType, BinArgs, #{history := History} = Process}, #{ns := NS} = Options, Ctx) ->
121+
process({CallType, BinArgs, Process}, #{ns := NS} = Options, Ctx) ->
120122
_ = set_context(Ctx),
121-
{_, LastEventId} = Range = get_range(History),
122-
Machine = marshal(process, Process#{ns => NS, history_range => Range}),
123+
#{last_event_id := LastEventId} = Process,
124+
Machine = marshal(process, Process#{ns => NS}),
123125
Func = marshal(function, CallType),
124126
Args = marshal(args, {CallType, BinArgs, Machine}),
125127
handle_result(hg_machine:handle_function(Func, {Args}, Options), LastEventId).
@@ -138,7 +140,7 @@ handle_result(
138140
) ->
139141
{ok,
140142
genlib_map:compact(#{
141-
events => unmarshal(events, {Events, undef_to_zero(LastEventId)}),
143+
events => unmarshal(events, {Events, LastEventId}),
142144
aux_state => maybe_unmarshal(term, AuxState),
143145
action => maybe_unmarshal(action, Action)
144146
})};
@@ -156,7 +158,7 @@ handle_result(
156158
{ok,
157159
genlib_map:compact(#{
158160
response => Response,
159-
events => unmarshal(events, {Events, undef_to_zero(LastEventId)}),
161+
events => unmarshal(events, {Events, LastEventId}),
160162
aux_state => maybe_unmarshal(term, AuxState),
161163
action => maybe_unmarshal(action, Action)
162164
})};
@@ -174,7 +176,7 @@ handle_result(
174176
{ok,
175177
genlib_map:compact(#{
176178
response => Response,
177-
events => unmarshal(events, {Events, undef_to_zero(LastEventId)}),
179+
events => unmarshal(events, {Events, LastEventId}),
178180
aux_state => maybe_unmarshal(term, AuxState),
179181
action => maybe_unmarshal(action, Action)
180182
})};
@@ -199,27 +201,6 @@ set_context(<<>>) ->
199201
set_context(BinContext) ->
200202
hg_context:save(marshal(term, BinContext)).
201203

202-
get_range([]) ->
203-
{undefined, undefined};
204-
get_range(History) ->
205-
lists:foldl(
206-
fun(#{event_id := Id}, {Min, Max}) ->
207-
{erlang:min(Id, Min), erlang:max(Id, Max)}
208-
end,
209-
{infinity, 0},
210-
History
211-
).
212-
213-
zero_to_undef(0) ->
214-
undefined;
215-
zero_to_undef(Value) ->
216-
Value.
217-
218-
undef_to_zero(undefined) ->
219-
0;
220-
undef_to_zero(Value) ->
221-
Value.
222-
223204
%% Marshalling
224205

225206
maybe_marshal(_, undefined) ->
@@ -236,20 +217,14 @@ marshal(
236217
history := History
237218
} = Process
238219
) ->
239-
Range = maps:get(history_range, Process, undefined),
220+
Range = maps:get(range, Process, #{}),
240221
AuxState = maps:get(aux_state, Process, term_to_binary(?EMPTY_CONTENT)),
241222
Detail = maps:get(detail, Process, undefined),
242223
MarshalledEvents = lists:map(fun(Ev) -> marshal(event, Ev) end, History),
243-
SortedEvents = lists:sort(
244-
fun(#mg_stateproc_Event{id = Id1}, #mg_stateproc_Event{id = Id2}) ->
245-
Id1 < Id2
246-
end,
247-
MarshalledEvents
248-
),
249224
#mg_stateproc_Machine{
250225
ns = NS,
251226
id = ID,
252-
history = SortedEvents,
227+
history = MarshalledEvents,
253228
history_range = marshal(history_range, Range),
254229
status = marshal(status, {Status, Detail}),
255230
aux_state = maybe_marshal(term, AuxState)
@@ -269,17 +244,11 @@ marshal(
269244
format_version = format_version(Meta),
270245
data = marshal(term, Payload)
271246
};
272-
marshal(history_range, {undefined, undefined}) ->
273-
#mg_stateproc_HistoryRange{direction = forward};
274-
marshal(history_range, undefined) ->
275-
#mg_stateproc_HistoryRange{direction = forward};
276-
marshal(history_range, {Min, Max}) ->
277-
Offset = Min - 1,
278-
Count = Max - Offset,
247+
marshal(history_range, Range) ->
279248
#mg_stateproc_HistoryRange{
280-
'after' = zero_to_undef(Offset),
281-
limit = Count,
282-
direction = forward
249+
'after' = maps:get(offset, Range, undefined),
250+
limit = maps:get(limit, Range, undefined),
251+
direction = maps:get(direction, Range, forward)
283252
};
284253
marshal(status, {<<"running">>, _Detail}) ->
285254
{'working', #mg_stateproc_MachineStatusWorking{}};
@@ -329,15 +298,13 @@ unmarshal(events, {[], _}) ->
329298
[];
330299
unmarshal(events, {Events, LastEventId}) ->
331300
Ts = erlang:system_time(second),
332-
[#mg_stateproc_Content{format_version = Format, data = Data} | Rest] = Events,
333-
ConvertedFirst = genlib_map:compact(#{
334-
event_id => LastEventId + 1,
335-
timestamp => Ts,
336-
metadata => #{<<"format_version">> => Format},
337-
payload => unmarshal(term, Data)
338-
}),
339301
lists:foldl(
340-
fun(#mg_stateproc_Content{format_version = Ver, data = Payload}, [#{event_id := PrevId} | _] = Acc) ->
302+
fun(#mg_stateproc_Content{format_version = Ver, data = Payload}, Acc) ->
303+
PrevId =
304+
case Acc of
305+
[] -> LastEventId;
306+
[#{event_id := Id} | _] -> Id
307+
end,
341308
[
342309
genlib_map:compact(#{
343310
event_id => PrevId + 1,
@@ -348,8 +315,8 @@ unmarshal(events, {Events, LastEventId}) ->
348315
| Acc
349316
]
350317
end,
351-
[ConvertedFirst],
352-
Rest
318+
[],
319+
Events
353320
);
354321
unmarshal(action, #mg_stateproc_ComplexAction{
355322
timer = {set_timer, #mg_stateproc_SetTimerAction{timer = Timer}},
@@ -381,9 +348,9 @@ unmarshal(term, Term) ->
381348
erlang:term_to_binary(Term);
382349
unmarshal(remove_action, #mg_stateproc_RemoveAction{}) ->
383350
true;
384-
unmarshal(range, undefined) ->
351+
unmarshal(history_range, undefined) ->
385352
#{};
386-
unmarshal(range, #mg_stateproc_HistoryRange{'after' = Offset, limit = Limit, direction = Direction}) ->
353+
unmarshal(history_range, #mg_stateproc_HistoryRange{'after' = Offset, limit = Limit, direction = Direction}) ->
387354
genlib_map:compact(#{
388355
offset => Offset,
389356
limit => Limit,

rebar.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
{fault_detector_proto, {git, "https://github.com/valitydev/fault-detector-proto.git", {branch, "master"}}},
4343
{limiter_proto, {git, "https://github.com/valitydev/limiter-proto.git", {branch, "master"}}},
4444
{herd, {git, "https://github.com/wgnet/herd.git", {tag, "1.3.4"}}},
45-
{progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v0.0.4"}}},
45+
{progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v0.0.6"}}},
4646
{prometheus, "4.8.1"},
4747
{prometheus_cowboy, "0.1.8"},
4848

rebar.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@
102102
0},
103103
{<<"progressor">>,
104104
{git,"https://github.com/valitydev/progressor.git",
105-
{ref,"cb87a174f80fb5ad68a87d91f0ba2291d1759f6a"}},
105+
{ref,"ce7bcbddc7e9b97a3bb24f45fb8ef455896a9cbb"}},
106106
0},
107107
{<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.8.1">>},0},
108108
{<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.8">>},0},

0 commit comments

Comments
 (0)