|
| 1 | +-module(hg_progressor_handler). |
| 2 | + |
| 3 | +-export([handle_function/3]). |
| 4 | + |
| 5 | +-spec handle_function(_, _, _) -> _. |
| 6 | +handle_function('ProcessTrace', {NS, ProcessID}, Opts) -> |
| 7 | + case progressor:trace(#{ns => NS, id => ProcessID}) of |
| 8 | + {ok, RawTrace} -> |
| 9 | + Format = maps:get(format, Opts, internal), |
| 10 | + unmarshal_trace(NS, ProcessID, RawTrace, Format); |
| 11 | + {error, _} = Error -> |
| 12 | + Error |
| 13 | + end. |
| 14 | + |
| 15 | +unmarshal_trace(NS, ProcessID, RawTrace, internal = Format) -> |
| 16 | + lists:map(fun(RawTraceUnit) -> unmarshal_trace_unit(NS, ProcessID, RawTraceUnit, Format) end, RawTrace); |
| 17 | +unmarshal_trace(NS, ProcessID, RawTrace, jaeger = Format) -> |
| 18 | + Spans = lists:map(fun(RawTraceUnit) -> unmarshal_trace_unit(NS, ProcessID, RawTraceUnit, Format) end, RawTrace), |
| 19 | + #{ |
| 20 | + data => [ |
| 21 | + #{ |
| 22 | + traceID => trace_id(NS, ProcessID), |
| 23 | + spans => Spans, |
| 24 | + processes => #{ |
| 25 | + ProcessID => #{ |
| 26 | + service_name => service_name(NS), |
| 27 | + tags => [] |
| 28 | + } |
| 29 | + } |
| 30 | + } |
| 31 | + ] |
| 32 | + }. |
| 33 | + |
| 34 | +unmarshal_trace_unit(NS, _ProcessID, #{task_type := TaskType} = TraceUnit, internal = Format) -> |
| 35 | + BinArgs = maps:get(args, TraceUnit, <<>>), |
| 36 | + BinEvents = maps:get(events, TraceUnit, []), |
| 37 | + OtelTraceID = extract_trace_id(TraceUnit), |
| 38 | + Error = extract_error(TraceUnit), |
| 39 | + (maps:without([response, context], TraceUnit))#{ |
| 40 | + args => unmarshal_args(NS, TaskType, BinArgs), |
| 41 | + events => unmarshal_events(BinEvents, Format), |
| 42 | + otel_trace_id => OtelTraceID, |
| 43 | + error => Error |
| 44 | + }; |
| 45 | +unmarshal_trace_unit(NS, ProcessID, #{task_type := TaskType, task_id := TaskID} = TraceUnit, jaeger = Format) -> |
| 46 | + BinArgs = maps:get(args, TraceUnit, <<>>), |
| 47 | + BinEvents = maps:get(events, TraceUnit, []), |
| 48 | + #{ |
| 49 | + processID => ProcessID, |
| 50 | + process => #{ |
| 51 | + service_name => service_name(NS), |
| 52 | + tags => [] |
| 53 | + }, |
| 54 | + warnings => [], |
| 55 | + traceId => trace_id(NS, ProcessID), |
| 56 | + span_id => integer_to_binary(TaskID), |
| 57 | + operationName => TaskType, |
| 58 | + startTime => start_time(TraceUnit), |
| 59 | + duration => duration(TraceUnit), |
| 60 | + tags => [ |
| 61 | + #{ |
| 62 | + key => <<"task.status">>, |
| 63 | + type => <<"string">>, |
| 64 | + value => maps:get(task_status, TraceUnit) |
| 65 | + }, |
| 66 | + #{ |
| 67 | + key => <<"task.retries">>, |
| 68 | + type => <<"int64">>, |
| 69 | + value => maps:get(retry_attempts, TraceUnit) |
| 70 | + }, |
| 71 | + #{ |
| 72 | + key => <<"task.input">>, |
| 73 | + type => <<"string">>, |
| 74 | + value => unicode:characters_to_binary(json:encode(unmarshal_args(NS, TaskType, BinArgs))) |
| 75 | + } |
| 76 | + ] ++ error_tag(TraceUnit), |
| 77 | + logs => unmarshal_events(BinEvents, Format) |
| 78 | + }. |
| 79 | + |
| 80 | +unmarshal_args(_, _, <<>>) -> |
| 81 | + <<>>; |
| 82 | +unmarshal_args(invoice, <<"init">> = _TaskType, BinArgs) -> |
| 83 | + {bin, B} = binary_to_term(BinArgs), |
| 84 | + UnwrappedArgs = binary_to_term(B), |
| 85 | + Type = {struct, struct, {dmsl_domain_thrift, 'Invoice'}}, |
| 86 | + Args = hg_invoice:unmarshal_invoice(UnwrappedArgs), |
| 87 | + #{ |
| 88 | + content_type => <<"thrift_call">>, |
| 89 | + content => #{ |
| 90 | + call => #{service => 'Invoicing', function => 'Create'}, |
| 91 | + params => to_maps(term_to_object(Args, Type)) |
| 92 | + } |
| 93 | + }; |
| 94 | +unmarshal_args(invoice_template, <<"init">> = _TaskType, BinArgs) -> |
| 95 | + {bin, B} = binary_to_term(BinArgs), |
| 96 | + UnwrappedArgs = binary_to_term(B), |
| 97 | + Type = {struct, struct, {dmsl_payproc_thrift, 'InvoiceTemplateCreateParams'}}, |
| 98 | + Args = hg_invoice_template:unmarshal_invoice_template_params(UnwrappedArgs), |
| 99 | + #{ |
| 100 | + content_type => <<"thrift_call">>, |
| 101 | + content => #{ |
| 102 | + call => #{service => 'InvoiceTemplating', function => 'Create'}, |
| 103 | + params => to_maps(term_to_object(Args, Type)) |
| 104 | + } |
| 105 | + }; |
| 106 | +unmarshal_args(_, TaskType, BinArgs) when |
| 107 | + TaskType =:= <<"call">>; |
| 108 | + TaskType =:= <<"repair">>; |
| 109 | + TaskType =:= <<"timeout">> |
| 110 | +-> |
| 111 | + {bin, B} = binary_to_term(BinArgs), |
| 112 | + case binary_to_term(B) of |
| 113 | + {schemaless_call, Args} -> |
| 114 | + maybe_format(Args); |
| 115 | + {thrift_call, ServiceName, FunctionRef, EncodedArgs} -> |
| 116 | + {Service, Function} = FunctionRef, |
| 117 | + {Module, Service} = hg_proto:get_service(ServiceName), |
| 118 | + Type = Module:function_info(Service, Function, params_type), |
| 119 | + Args = hg_proto_utils:deserialize(Type, EncodedArgs), |
| 120 | + #{ |
| 121 | + content_type => <<"thrift_call">>, |
| 122 | + content => #{ |
| 123 | + call => #{service => Service, function => Function}, |
| 124 | + params => to_maps(term_to_object(Args, Type)) |
| 125 | + } |
| 126 | + }; |
| 127 | + Args -> |
| 128 | + maybe_format(Args) |
| 129 | + end. |
| 130 | + |
| 131 | +unmarshal_events(BinEvents, Format) -> |
| 132 | + lists:map( |
| 133 | + fun(#{event_payload := BinPayload} = Event) -> |
| 134 | + {bin, BinChanges} = binary_to_term(BinPayload), |
| 135 | + Type = {struct, union, {dmsl_payproc_thrift, 'EventPayload'}}, |
| 136 | + Changes = hg_proto_utils:deserialize(Type, BinChanges), |
| 137 | + Payload = to_maps(term_to_object(Changes, Type)), |
| 138 | + unmarshal_event(Event, Payload, Format) |
| 139 | + end, |
| 140 | + BinEvents |
| 141 | + ). |
| 142 | + |
| 143 | +unmarshal_event(Event, Payload, internal) -> |
| 144 | + Event#{event_payload => Payload}; |
| 145 | +unmarshal_event(#{event_id := EventID, event_timestamp := Ts}, Payload, jaeger) -> |
| 146 | + #{ |
| 147 | + timestamp => Ts, |
| 148 | + fields => [ |
| 149 | + #{ |
| 150 | + key => <<"event.id">>, |
| 151 | + type => <<"int64">>, |
| 152 | + value => EventID |
| 153 | + }, |
| 154 | + #{ |
| 155 | + key => <<"event.payload">>, |
| 156 | + type => <<"string">>, |
| 157 | + value => unicode:characters_to_binary(json:encode(Payload)) |
| 158 | + } |
| 159 | + ] |
| 160 | + }. |
| 161 | + |
| 162 | +maybe_format(Data) when is_binary(Data) -> |
| 163 | + case is_printable_string(Data) of |
| 164 | + true -> |
| 165 | + #{ |
| 166 | + content_type => <<"text">>, |
| 167 | + content => Data |
| 168 | + }; |
| 169 | + false -> |
| 170 | + to_maps(term_to_object_content(Data)) |
| 171 | + end; |
| 172 | +maybe_format(Data) -> |
| 173 | + #{ |
| 174 | + content_type => <<"unknown">>, |
| 175 | + content => format(Data) |
| 176 | + }. |
| 177 | + |
| 178 | +format(Data) -> |
| 179 | + unicode:characters_to_binary(io_lib:format("~p", [Data])). |
| 180 | + |
| 181 | +extract_trace_id(#{context := <<>>}) -> |
| 182 | + null; |
| 183 | +extract_trace_id(#{context := BinContext}) -> |
| 184 | + try binary_to_term(BinContext) of |
| 185 | + #{<<"otel">> := [TraceID | _]} -> |
| 186 | + TraceID; |
| 187 | + _ -> |
| 188 | + null |
| 189 | + catch |
| 190 | + _:_ -> |
| 191 | + null |
| 192 | + end. |
| 193 | + |
| 194 | +extract_error(#{task_status := <<"error">>, response := {error, ReasonTerm}}) -> |
| 195 | + #{content := Content} = maybe_format(ReasonTerm), |
| 196 | + Content; |
| 197 | +extract_error(_) -> |
| 198 | + null. |
| 199 | + |
| 200 | +service_name(invoice) -> |
| 201 | + <<"hellgate_invoice">>; |
| 202 | +service_name(invoice_template) -> |
| 203 | + <<"hellgate_invoice_template">>. |
| 204 | + |
| 205 | +trace_id(NS, ProcessID) -> |
| 206 | + NsBin = erlang:atom_to_binary(NS), |
| 207 | + HexList = [io_lib:format("~2.16.0b", [B]) || <<B>> <= <<NsBin/binary, ProcessID/binary>>], |
| 208 | + Hex = lists:flatten(HexList), |
| 209 | + io:format(user, "HEX: ~p~n", [Hex]), |
| 210 | + case length(Hex) of |
| 211 | + Len when Len < 32 -> unicode:characters_to_binary(lists:duplicate(32 - Len, $0) ++ Hex); |
| 212 | + Len when Len > 32 -> unicode:characters_to_binary(string:slice(Hex, 0, 32)); |
| 213 | + _ -> unicode:characters_to_binary(Hex) |
| 214 | + end. |
| 215 | + |
| 216 | +start_time(#{running := Ts}) -> |
| 217 | + Ts; |
| 218 | +start_time(_) -> |
| 219 | + null. |
| 220 | + |
| 221 | +duration(#{running := Running, finished := Finished}) -> |
| 222 | + Finished - Running; |
| 223 | +duration(_) -> |
| 224 | + null. |
| 225 | + |
| 226 | +error_tag(#{task_status := <<"error">>, response := {error, ReasonTerm}}) -> |
| 227 | + #{content := Content} = maybe_format(ReasonTerm), |
| 228 | + [ |
| 229 | + #{ |
| 230 | + key => <<"task.error">>, |
| 231 | + type => <<"string">>, |
| 232 | + value => Content |
| 233 | + } |
| 234 | + ]; |
| 235 | +error_tag(_) -> |
| 236 | + []. |
| 237 | + |
| 238 | +-define(is_integer(T), (T == byte orelse T == i8 orelse T == i16 orelse T == i32 orelse T == i64)). |
| 239 | +-define(is_number(T), (?is_integer(T) orelse T == double)). |
| 240 | +-define(is_scalar(T), (?is_number(T) orelse T == string orelse element(1, T) == enum)). |
| 241 | + |
| 242 | +-spec term_to_object(term(), dmt_thrift:thrift_type()) -> jsone:json_value(). |
| 243 | +term_to_object(Term, Type) -> |
| 244 | + term_to_object(Term, Type, []). |
| 245 | + |
| 246 | +term_to_object(Term, {list, Type}, Stack) when is_list(Term) -> |
| 247 | + [term_to_object(T, Type, [N | Stack]) || {N, T} <- enumerate(0, Term)]; |
| 248 | +term_to_object(Term, {set, Type}, Stack) -> |
| 249 | + term_to_object(ordsets:to_list(Term), {list, Type}, Stack); |
| 250 | +term_to_object(Term, {map, KType, VType}, Stack) when is_map(Term), ?is_scalar(KType) -> |
| 251 | + maps:fold( |
| 252 | + fun(K, V, A) -> |
| 253 | + [{genlib:to_binary(K), term_to_object(V, VType, [value, V | Stack])} | A] |
| 254 | + end, |
| 255 | + [], |
| 256 | + Term |
| 257 | + ); |
| 258 | +term_to_object(Term, {map, KType, VType}, Stack) when is_map(Term) -> |
| 259 | + maps:fold( |
| 260 | + fun(K, V, A) -> |
| 261 | + [ |
| 262 | + [ |
| 263 | + {<<"key">>, term_to_object(K, KType, [key, K | Stack])}, |
| 264 | + {<<"value">>, term_to_object(V, VType, [value, V | Stack])} |
| 265 | + ] |
| 266 | + | A |
| 267 | + ] |
| 268 | + end, |
| 269 | + [], |
| 270 | + Term |
| 271 | + ); |
| 272 | +term_to_object(Term, {struct, union, {Mod, Name}}, Stack) when is_atom(Mod), is_atom(Name) -> |
| 273 | + {struct, _, StructDef} = Mod:struct_info(Name), |
| 274 | + union_to_object(Term, StructDef, Stack); |
| 275 | +term_to_object(Term, {struct, _, {Mod, Name}}, Stack) when is_atom(Mod), is_atom(Name), is_tuple(Term) -> |
| 276 | + {struct, _, StructDef} = Mod:struct_info(Name), |
| 277 | + struct_to_object(Term, StructDef, Stack); |
| 278 | +term_to_object(Term, {struct, struct, List}, Stack) when is_tuple(Term), is_list(List) -> |
| 279 | + Data = lists:zip(List, tuple_to_list(Term)), |
| 280 | + [{atom_to_binary(Name), term_to_object(V, Type, Stack)} || {{_Pos, _, Type, Name, _}, V} <- Data]; |
| 281 | +term_to_object(Term, {enum, _}, _Stack) when is_atom(Term) -> |
| 282 | + Term; |
| 283 | +term_to_object(Term, Type, _Stack) when is_integer(Term), ?is_integer(Type) -> |
| 284 | + Term; |
| 285 | +term_to_object(Term, double, _Stack) when is_number(Term) -> |
| 286 | + float(Term); |
| 287 | +term_to_object(Term, string, _Stack) when is_binary(Term) -> |
| 288 | + case is_printable_string(Term) of |
| 289 | + true -> |
| 290 | + Term; |
| 291 | + false -> |
| 292 | + term_to_object_content(Term) |
| 293 | + end; |
| 294 | +term_to_object(Term, bool, _Stack) when is_boolean(Term) -> |
| 295 | + Term; |
| 296 | +term_to_object(Term, Type, _Stack) -> |
| 297 | + erlang:error({badarg, Term, Type}). |
| 298 | + |
| 299 | +union_to_object({Fn, Term}, StructDef, Stack) -> |
| 300 | + {_N, _Req, Type, Fn, _Def} = lists:keyfind(Fn, 4, StructDef), |
| 301 | + [{Fn, term_to_object(Term, Type, [Fn | Stack])}]. |
| 302 | + |
| 303 | +struct_to_object(Struct, StructDef, Stack) -> |
| 304 | + [_ | Fields] = tuple_to_list(Struct), |
| 305 | + lists:foldr( |
| 306 | + fun |
| 307 | + ({undefined, _}, A) -> |
| 308 | + A; |
| 309 | + ({Term, {_N, _Req, Type, Fn, _Def}}, A) -> |
| 310 | + [{Fn, term_to_object(Term, Type, [Fn | Stack])} | A] |
| 311 | + end, |
| 312 | + [], |
| 313 | + lists:zip(Fields, StructDef) |
| 314 | + ). |
| 315 | + |
| 316 | +term_to_object_content(Term) -> |
| 317 | + term_to_object_content(<<"base64">>, base64:encode(Term)). |
| 318 | + |
| 319 | +term_to_object_content(CType, Term) -> |
| 320 | + [ |
| 321 | + {<<"content_type">>, CType}, |
| 322 | + {<<"content">>, Term} |
| 323 | + ]. |
| 324 | + |
| 325 | +enumerate(_, []) -> |
| 326 | + []; |
| 327 | +enumerate(N, [H | T]) -> |
| 328 | + [{N, H} | enumerate(N + 1, T)]. |
| 329 | + |
| 330 | +is_printable_string(V) -> |
| 331 | + try unicode:characters_to_binary(V) of |
| 332 | + B when is_binary(B) -> |
| 333 | + true; |
| 334 | + _ -> |
| 335 | + false |
| 336 | + catch |
| 337 | + _:_ -> |
| 338 | + false |
| 339 | + end. |
| 340 | + |
| 341 | +to_maps(Data) -> |
| 342 | + to_maps(Data, #{}). |
| 343 | + |
| 344 | +to_maps([], Acc) -> |
| 345 | + Acc; |
| 346 | +to_maps([{K, [{_, _} | _] = V} | Rest], Acc) -> |
| 347 | + to_maps(Rest, Acc#{K => to_maps(V)}); |
| 348 | +to_maps([{K, V} | Rest], Acc) when is_list(V) -> |
| 349 | + to_maps(Rest, Acc#{K => lists:map(fun(E) -> to_maps(E) end, V)}); |
| 350 | +to_maps([{K, V} | Rest], Acc) -> |
| 351 | + to_maps(Rest, Acc#{K => V}). |
0 commit comments