From 989c3c78cd7497d663ef5aba1840267fb72dfe76 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 17 Jun 2026 11:06:28 +0200 Subject: [PATCH 1/8] autocluster: Make certain callbacks optional --- src/classy_discovery_strategy.erl | 30 ++++++++++++++++++++--- src/discovery/classy_discovery_dns.erl | 16 ------------ src/discovery/classy_discovery_k8s.erl | 16 ------------ src/discovery/classy_discovery_static.erl | 16 ------------ test/classy_discovery_dns_SUITE.erl | 12 --------- test/classy_discovery_k8s_SUITE.erl | 12 --------- test/classy_discovery_static_SUITE.erl | 12 --------- 7 files changed, 26 insertions(+), 88 deletions(-) diff --git a/src/classy_discovery_strategy.erl b/src/classy_discovery_strategy.erl index 64183b7..85003be 100644 --- a/src/classy_discovery_strategy.erl +++ b/src/classy_discovery_strategy.erl @@ -43,6 +43,8 @@ This module defines a behavior of discovery strategy. -callback unregister(options()) -> ok | ignore | {error, term()}. +-optional_callbacks([lock/1, unlock/1, register/1, unregister/1]). + %%================================================================================ %% API functions %%================================================================================ @@ -65,22 +67,42 @@ discover(Mod, Options) -> -doc false. -spec lock(module(), options()) -> ok | ignore | {error, term()}. lock(Mod, Options) -> - safe_call(Mod, ?FUNCTION_NAME, Options). + case erlang:function_exported(Mod, ?FUNCTION_NAME, 1) of + true -> + safe_call(Mod, ?FUNCTION_NAME, Options); + false -> + ok + end. -doc false. -spec unlock(module(), options()) -> ok | ignore | {error, term()}. unlock(Mod, Options) -> - safe_call(Mod, ?FUNCTION_NAME, Options). + case erlang:function_exported(Mod, ?FUNCTION_NAME, 1) of + true -> + safe_call(Mod, ?FUNCTION_NAME, Options); + false -> + ok + end. -doc false. -spec register(module(), options()) -> ok | ignore | {error, term()}. register(Mod, Options) -> - safe_call(Mod, ?FUNCTION_NAME, Options). + case erlang:function_exported(Mod, ?FUNCTION_NAME, 1) of + true -> + safe_call(Mod, ?FUNCTION_NAME, Options); + false -> + ok + end. -doc false. -spec unregister(module(), options()) -> ok | ignore | {error, term()}. unregister(Mod, Options) -> - safe_call(Mod, ?FUNCTION_NAME, Options). + case erlang:function_exported(Mod, ?FUNCTION_NAME, 1) of + true -> + safe_call(Mod, ?FUNCTION_NAME, Options); + false -> + ok + end. %%================================================================================ %% Internal functions diff --git a/src/discovery/classy_discovery_dns.erl b/src/discovery/classy_discovery_dns.erl index c2f7047..16de587 100644 --- a/src/discovery/classy_discovery_dns.erl +++ b/src/discovery/classy_discovery_dns.erl @@ -20,10 +20,6 @@ %% Cluster strategy callbacks -export([ discover/1 - , lock/1 - , unlock/1 - , register/1 - , unregister/1 ]). -export_type([ opts/0 @@ -55,15 +51,3 @@ node_name(undefined, Host) -> node_name(classy_autocluster:app_name(), Host); node_name(NodeName, Host) -> list_to_atom(lists:concat([NodeName, "@", Host])). - -lock(_Options) -> - ok. - -unlock(_Options) -> - ok. - -register(_Options) -> - ok. - -unregister(_Options) -> - ok. diff --git a/src/discovery/classy_discovery_k8s.erl b/src/discovery/classy_discovery_k8s.erl index 17271fb..aab140c 100644 --- a/src/discovery/classy_discovery_k8s.erl +++ b/src/discovery/classy_discovery_k8s.erl @@ -20,10 +20,6 @@ %% Cluster strategy callbacks. -export([ discover/1 - , lock/1 - , unlock/1 - , register/1 - , unregister/1 ]). -export_type([opts/0]). @@ -77,18 +73,6 @@ node_name(App, Addr, _Service, dns, Namespace, Suffix) when length(Suffix) > 0 - node_name(App, Addr, _, _, _, _) -> list_to_atom(App ++ "@" ++ binary_to_list(Addr)). -lock(_Options) -> - ok. - -unlock(_Options) -> - ok. - -register(_Options) -> - ok. - -unregister(_Options) -> - ok. - %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/src/discovery/classy_discovery_static.erl b/src/discovery/classy_discovery_static.erl index f17f4c2..3d9aa97 100644 --- a/src/discovery/classy_discovery_static.erl +++ b/src/discovery/classy_discovery_static.erl @@ -19,23 +19,7 @@ -behaviour(classy_discovery_strategy). -export([ discover/1 - , lock/1 - , unlock/1 - , register/1 - , unregister/1 ]). discover(Options) -> {ok, maps:get(seeds, Options, [])}. - -lock(_Options) -> - ok. - -unlock(_Options) -> - ok. - -register(_Options) -> - ok. - -unregister(_Options) -> - ok. diff --git a/test/classy_discovery_dns_SUITE.erl b/test/classy_discovery_dns_SUITE.erl index 531e02d..721b6cc 100644 --- a/test/classy_discovery_dns_SUITE.erl +++ b/test/classy_discovery_dns_SUITE.erl @@ -75,15 +75,3 @@ t_discover(_) -> %% below test relies on rebar3 ct is run with '--name ct@127.0.0.1' {ok, ['ct@127.0.0.1']} = classy_discovery_strategy:discover(classy_discovery_dns, Options3), ok. - -t_lock(_) -> - ok = classy_discovery_strategy:lock(classy_discovery_dns, #{}). - -t_unlock(_) -> - ok = classy_discovery_strategy:unlock(classy_discovery_dns, #{}). - -t_register(_) -> - ok = classy_discovery_strategy:register(classy_discovery_dns, #{}). - -t_unregister(_) -> - ok = classy_discovery_strategy:unregister(classy_discovery_dns, #{}). diff --git a/test/classy_discovery_k8s_SUITE.erl b/test/classy_discovery_k8s_SUITE.erl index 1849f98..15a19af 100644 --- a/test/classy_discovery_k8s_SUITE.erl +++ b/test/classy_discovery_k8s_SUITE.erl @@ -50,15 +50,3 @@ t_discover(_) -> classy_discovery_k8s, ?OPTIONS)), ok = meck:unload(classy_httpc). - -t_lock(_) -> - ok = classy_discovery_static:lock([]). - -t_unlock(_) -> - ok = classy_discovery_static:unlock([]). - -t_register(_) -> - ok = classy_discovery_static:register([]). - -t_unregister(_) -> - ok = classy_discovery_static:unregister([]). diff --git a/test/classy_discovery_static_SUITE.erl b/test/classy_discovery_static_SUITE.erl index e67f16b..c02fcff 100644 --- a/test/classy_discovery_static_SUITE.erl +++ b/test/classy_discovery_static_SUITE.erl @@ -27,15 +27,3 @@ all() -> t_discover(_) -> Options = #{seeds => ['ekka@127.0.0.1']}, {ok, ['ekka@127.0.0.1']} = classy_discovery_static:discover(Options). - -t_lock(_) -> - ok = classy_discovery_static:lock([]). - -t_unlock(_) -> - ok = classy_discovery_static:unlock([]). - -t_register(_) -> - ok = classy_discovery_static:register([]). - -t_unregister(_) -> - ok = classy_discovery_static:unregister([]). From bc0e61dfa62ac221b376cdf0182aeca556a7b6da Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 17 Jun 2026 12:27:39 +0200 Subject: [PATCH 2/8] autocluster: Change the method of node registration --- doc/classy.texi | 11 ++++++--- src/classy_autocluster.erl | 14 +++--------- src/classy_builtin_hooks.erl | 17 ++++++++------ src/classy_discovery_strategy.erl | 28 +++++++++++++++++++++-- src/classy_hook.erl | 6 ++++- src/discovery/classy_autocluster_sup.erl | 2 +- src/discovery/classy_discovery_dns.erl | 10 ++++++++ src/discovery/classy_discovery_etcd.erl | 12 +++++++++- src/discovery/classy_discovery_k8s.erl | 10 ++++++++ src/discovery/classy_discovery_static.erl | 10 ++++++++ 10 files changed, 94 insertions(+), 26 deletions(-) diff --git a/doc/classy.texi b/doc/classy.texi index 76f0767..18a5b4f 100644 --- a/doc/classy.texi +++ b/doc/classy.texi @@ -123,14 +123,19 @@ The document was typeset with Peer discovery is done with a help of several modules implementing @ref{api/classy/classy_discovery_strategy, discovery_strategy} behavior. - Active strategy is selected by @ref{discovery_strategy} OTP application environment variable. + The strategy is configured via @ref{discovery_strategy} OTP application environment variable. This variable has a form of 2-tuple with the following elements: @enumerate - @item Strategy - @item Strategy-specific parameters + @item Discovery method, such as @code{static} + @item Method-specific parameters @end enumerate + New methods can be registered using @ref{classy_discovery_strategy:hook/2} function. + + Additionally, there is a hook that allows to filter and prioritize discovered nodes: + @ref{classy:pre_autocluster/2}. + Autocluster logic runs only on singleton sites. It is inactive on sites that already have peers (up or down). diff --git a/src/classy_autocluster.erl b/src/classy_autocluster.erl index d6ddbd9..9069d78 100644 --- a/src/classy_autocluster.erl +++ b/src/classy_autocluster.erl @@ -245,20 +245,12 @@ wakeup(After, S = #s{t = T0}) -> with_strategy(Fun) -> case classy_discovery_strategy:get() of - {manual, _} -> - ignore; - {singleton, _} -> - ignore; + {Module, Options} -> + Fun(Module, Options); undefined -> - ignore; - {Strategy, Options} -> - Fun(strategy_module(Strategy), Options) + ignore end. --spec strategy_module(atom()) -> module(). -strategy_module(Strategy) -> - list_to_atom("classy_discovery_" ++ atom_to_list(Strategy)). - -spec discovery_interval() -> pos_integer(). discovery_interval() -> application:get_env(classy, discovery_interval, 5_000). diff --git a/src/classy_builtin_hooks.erl b/src/classy_builtin_hooks.erl index c27219a..47f6c6b 100644 --- a/src/classy_builtin_hooks.erl +++ b/src/classy_builtin_hooks.erl @@ -83,13 +83,16 @@ log_run_level(From, To) -> , to => To }). -log_peer_connection_change(_Cluster, _Local, Remote, Node, true) -> - ?tp(notice, classy_peer_connected, - #{ remote => Remote - , node => Node - }); -log_peer_connection_change(_Cluster, _Local, Remote, Node, false) -> - ?tp(notice, classy_peer_disconnected, +log_peer_connection_change(_Cluster, Local, Remote, Node, ConnStatus) -> + Kind = case ConnStatus of + true -> classy_peer_connected; + false -> classy_peer_disconnected + end, + Level = case Remote of + Local -> debug; + _ -> notice + end, + ?tp(Level, Kind, #{ remote => Remote , node => Node }). diff --git a/src/classy_discovery_strategy.erl b/src/classy_discovery_strategy.erl index 85003be..89d021b 100644 --- a/src/classy_discovery_strategy.erl +++ b/src/classy_discovery_strategy.erl @@ -19,6 +19,7 @@ This module defines a behavior of discovery strategy. """. +-export([hook/2]). -export([get/0, discover/2, lock/2, unlock/2, register/2, unregister/2]). -export_type([options/0, t/0]). @@ -45,15 +46,38 @@ This module defines a behavior of discovery strategy. -optional_callbacks([lock/1, unlock/1, register/1, unregister/1]). +-define(hook, ?MODULE). + %%================================================================================ %% API functions %%================================================================================ -doc """ -Read value of @ref{discovery_strategy} environment variable (with default). +Register a discovery strategy. + +Callbacks registered here match on the @ref{discovery_strategy} configuration +and return callback module implementing @code{classy_discovery_strategy} behavior. + +The first hook that returns @code{@{ok, Module@}} wins +and handles all the callbacks for the next discovery cycle. """. +-spec hook(fun(({atom(), options()}) -> {ok, module()} | undefined), classy_hook:prio()) -> classy_hook:hook(). +hook(Fun, Prio) when is_function(Fun, 1), is_number(Prio) -> + classy_hook:insert(?hook, Fun, Prio). + +-doc """ +Read @ref{discovery_strategy} environment variable and decide which strategy to use. +""". +-spec get() -> t() | undefined. get() -> - application:get_env(classy, discovery_strategy, {manual, []}). + Conf = application:get_env(classy, discovery_strategy, {manual, []}), + case classy_hook:first_match(?hook, [Conf]) of + {ok, Module} -> + {_Method, Options} = Conf, + {Module, Options}; + undefined -> + undefined + end. %%================================================================================ %% Internal exports diff --git a/src/classy_hook.erl b/src/classy_hook.erl index 94c4268..5d5a84b 100644 --- a/src/classy_hook.erl +++ b/src/classy_hook.erl @@ -65,6 +65,11 @@ init() -> classy:on_membership_change(fun classy_builtin_hooks:log_membership_change/4, 100), classy:run_level(fun classy_builtin_hooks:log_run_level/2, -100), classy:on_peer_connection_change(fun classy_builtin_hooks:log_peer_connection_change/5, 100), + %% Discovery strategies: + classy_discovery_static:hook(), + classy_discovery_dns:hook(), + classy_discovery_k8s:hook(), + classy_discovery_etcd:hook(), %% User initialization: case application:get_env(classy, setup_hooks) of {ok, {Mod, Func, Args}} -> @@ -74,7 +79,6 @@ init() -> ok end. --doc false. -spec insert(hookpoint(), fun(), prio()) -> hook(). insert(Hookpoint, Hook, Prio) when is_atom(Hookpoint), is_integer(Prio), is_function(Hook) -> Key = {Hookpoint, -Prio, Hook}, diff --git a/src/discovery/classy_autocluster_sup.erl b/src/discovery/classy_autocluster_sup.erl index 2f1e4ee..d639bc3 100644 --- a/src/discovery/classy_autocluster_sup.erl +++ b/src/discovery/classy_autocluster_sup.erl @@ -61,7 +61,7 @@ init([]) -> , type => worker }, ETCD = case classy_discovery_strategy:get() of - {etcd, Options} -> + {classy_discovery_etcd, Options} -> case proplists:get_value(version, Options, v3) of v3 -> [#{id => classy_discovery_etcd, diff --git a/src/discovery/classy_discovery_dns.erl b/src/discovery/classy_discovery_dns.erl index 16de587..c49ee64 100644 --- a/src/discovery/classy_discovery_dns.erl +++ b/src/discovery/classy_discovery_dns.erl @@ -20,6 +20,7 @@ %% Cluster strategy callbacks -export([ discover/1 + , hook/0 ]). -export_type([ opts/0 @@ -31,6 +32,15 @@ , app => string() | atom() }. +hook() -> + classy_discovery_strategy:hook( + fun({dns, _}) -> + {ok, ?MODULE}; + (_) -> + undefined + end, + 0). + discover(Options) -> Defaults = #{ app => undefined , type => a diff --git a/src/discovery/classy_discovery_etcd.erl b/src/discovery/classy_discovery_etcd.erl index d1ece7f..3df2cd0 100644 --- a/src/discovery/classy_discovery_etcd.erl +++ b/src/discovery/classy_discovery_etcd.erl @@ -21,7 +21,8 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --export([ discover/1 +-export([ hook/0 + , discover/1 , lock/1 , unlock/1 , register/1 @@ -65,6 +66,15 @@ -define(LOG(Level, Format, Args), logger:Level("Classy(etcd): " ++ Format, Args)). +hook() -> + classy_discovery_strategy:hook( + fun({etcd, _}) -> + {ok, ?MODULE}; + (_) -> + undefined + end, + 0). + start_link(Options) -> gen_server:start_link({local, ?SERVER}, ?MODULE, Options, []). diff --git a/src/discovery/classy_discovery_k8s.erl b/src/discovery/classy_discovery_k8s.erl index aab140c..38b5bb1 100644 --- a/src/discovery/classy_discovery_k8s.erl +++ b/src/discovery/classy_discovery_k8s.erl @@ -20,6 +20,7 @@ %% Cluster strategy callbacks. -export([ discover/1 + , hook/0 ]). -export_type([opts/0]). @@ -39,6 +40,15 @@ , suffix => string() }. +hook() -> + classy_discovery_strategy:hook( + fun({k8s, _}) -> + {ok, ?MODULE}; + (_) -> + undefined + end, + 0). + %%-------------------------------------------------------------------- %% classy_discovery_strategy callbacks %%-------------------------------------------------------------------- diff --git a/src/discovery/classy_discovery_static.erl b/src/discovery/classy_discovery_static.erl index 3d9aa97..165ee5f 100644 --- a/src/discovery/classy_discovery_static.erl +++ b/src/discovery/classy_discovery_static.erl @@ -19,7 +19,17 @@ -behaviour(classy_discovery_strategy). -export([ discover/1 + , hook/0 ]). +hook() -> + classy_discovery_strategy:hook( + fun({static, _}) -> + {ok, ?MODULE}; + (_) -> + undefined + end, + 0). + discover(Options) -> {ok, maps:get(seeds, Options, [])}. From 5fa1ec857f8682eb69742c43dcbfc0f9eda5818b Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 17 Jun 2026 14:10:28 +0200 Subject: [PATCH 3/8] vote: Pass ID to all callbacks --- src/classy_vote.erl | 18 +++++++++--------- src/classy_vote_coordinator.erl | 2 +- src/classy_vote_participant.erl | 9 +++++---- test/classy_SUITE.erl | 25 +++++++++++++++++-------- 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/src/classy_vote.erl b/src/classy_vote.erl index b8e69c3..e990d8d 100644 --- a/src/classy_vote.erl +++ b/src/classy_vote.erl @@ -66,7 +66,7 @@ , verify_prepare/1 , verify_commit/1 , verify_rollback/1 - , verify_mfas/2 + , verify_mfas/3 , verify_mfa/3 , retry_interval/0 , on_fail/2 @@ -200,30 +200,30 @@ create_table() -> -doc false. verify_prepare(Prepare) -> - verify_mfa(bad_prepare, 1, Prepare). + verify_mfa(bad_prepare, 2, Prepare). -doc false. verify_commit(Commit) -> - verify_mfas(bad_commit, Commit). + verify_mfas(bad_commit, 1, Commit). -doc false. verify_rollback(Rollback) -> - verify_mfas(bad_rollback, Rollback). + verify_mfas(bad_rollback, 1, Rollback). -doc false. -verify_mfas(Reason, Commits) when is_list(Commits) -> +verify_mfas(Reason, NExtraArgs, L) when is_list(L) -> try - [case verify_mfa(Reason, 0, I) of + [case verify_mfa(Reason, NExtraArgs, I) of ok -> ok; Err -> throw(Err) - end || I <- Commits], + end || I <- L], ok catch Err -> Err end; -verify_mfas(Reason, Other) -> +verify_mfas(Reason, _, Other) -> {error, {Reason, Other}}. -doc false. @@ -288,7 +288,7 @@ with_defaults(UserOpts) when is_map(UserOpts) -> verify_post_vote(#{post_vote := PostVote}) -> maybe - ok = verify_mfa(bad_post_vote, 1, PostVote), + ok = verify_mfa(bad_post_vote, 2, PostVote), {ok, [PostVote]} end; verify_post_vote(#{}) -> diff --git a/src/classy_vote_coordinator.erl b/src/classy_vote_coordinator.erl index faa06b2..c9d3eb5 100644 --- a/src/classy_vote_coordinator.erl +++ b/src/classy_vote_coordinator.erl @@ -374,7 +374,7 @@ perform_post_commit(Outcome, [], D) -> ok = db_teardown(Outcome, D), {stop, normal}; perform_post_commit(Outcome, [{M, F, Args} | Rest], D) -> - case classy_lib:safe_apply(M, F, [Outcome | Args]) of + case classy_lib:safe_apply(M, F, [Outcome, D#d.id | Args]) of {ok, _} -> perform_post_commit(Outcome, Rest, D); Err -> diff --git a/src/classy_vote_participant.erl b/src/classy_vote_participant.erl index 80d6db9..7ad14a8 100644 --- a/src/classy_vote_participant.erl +++ b/src/classy_vote_participant.erl @@ -255,14 +255,14 @@ perform_rollback(D = #d{completed_actions = CA, prep = Prep}) -> perform_actions(_, [], D) -> db_teardown(D), {stop, normal, D}; -perform_actions(Stage, [MFA | Rest], D0 = #d{completed_actions = CA, vote = Vote, prep = Prep}) -> +perform_actions(Stage, [{Mod, Fun, Args} | Rest], D0 = #d{completed_actions = CA, vote = Vote, prep = Prep}) -> #prepare{id = ID, tag = Tag, on_fail = OnFail} = Prep, ?tp(debug, ?classy_vote_part_perform_action, #{ id => ID , stage => Stage , action_ctr => CA }), - case classy_lib:safe_apply(MFA) of + case classy_lib:safe_apply({Mod, Fun, [ID | Args]}) of {ok, _} -> {ok, D} = db_update(Stage, Vote, CA + 1, D0), perform_actions(Stage, Rest, D); @@ -296,7 +296,8 @@ do_real_vote(#d{prep = Prep} = D0) -> -spec do_prepare(#prepare{}, boolean()) -> {ok, boolean()} | {error, _}. do_prepare( - #prepare{ prepare = Prep + #prepare{ id = ID + , prepare = Prep , commit = Commit , rollback = Rollback , coordinator = Coordinator @@ -309,7 +310,7 @@ do_prepare( ok ?= classy_vote:verify_rollback(Rollback), ok ?= verify_coordinator(Coordinator), {M, F, Args} = Prep, - {ok, Vote} ?= classy_lib:safe_apply(M, F, [ForReal | Args]), + {ok, Vote} ?= classy_lib:safe_apply(M, F, [ForReal, ID | Args]), true ?= is_boolean(Vote) orelse {error, {bad_result, Vote}}, {ok, Vote} end. diff --git a/test/classy_SUITE.erl b/test/classy_SUITE.erl index e65eba6..c488b3b 100644 --- a/test/classy_SUITE.erl +++ b/test/classy_SUITE.erl @@ -1676,22 +1676,31 @@ make_post_vote(Ref) -> make_vote_on_fail(Ref) -> {?MODULE, vote_on_fail, [Ref]}. -vote_prepare(ForReal, HowToPreVote, HowToVote, Ref) -> +vote_prepare(ForReal, Id, HowToPreVote, HowToVote, Ref) -> Result = case ForReal of true -> HowToVote; false -> HowToPreVote end, - ?tp(classy_test_vote_prep, #{ref => Ref, vote => Result, for_real => ForReal}), + ?tp(classy_test_vote_prep, + #{ ref => Ref + , vote => Result + , for_real => ForReal + , id => Id + }), Result. -vote_commit(Step, Ref) -> - ?tp(classy_test_vote_commit, #{ref => Ref, step => Step}). +vote_commit(Id, Step, Ref) -> + ?tp(classy_test_vote_commit, + #{ ref => Ref + , step => Step + , id => Id + }). -vote_rollback(Ref) -> - ?tp(classy_test_vote_rollback, #{ref => Ref}). +vote_rollback(Id, Ref) -> + ?tp(classy_test_vote_rollback, #{ref => Ref, id => Id}). -post_vote(Result, Ref) -> - ?tp(classy_test_post_vote, #{ref => Ref, result => Result}). +post_vote(Result, Id, Ref) -> + ?tp(classy_test_post_vote, #{ref => Ref, result => Result, id => Id}). vote_on_fail(FailInfo, Ref) -> ?tp(classy_test_vote_on_fail, FailInfo#{test_ref => Ref}). From 1e638077de039258f720816d79dcd22d07b03c12 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 17 Jun 2026 12:27:39 +0200 Subject: [PATCH 4/8] vote: Improve documentation --- src/classy_vote.erl | 221 ++++++++++++++++++++++++++++++-------------- 1 file changed, 154 insertions(+), 67 deletions(-) diff --git a/src/classy_vote.erl b/src/classy_vote.erl index e990d8d..aa73b1a 100644 --- a/src/classy_vote.erl +++ b/src/classy_vote.erl @@ -2,57 +2,62 @@ %% Copyright (c) 2026 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -%% @doc This module implements a variation of 2-phase commit. -%% -%% Note: vote is rather heavy operation. -%% Do not use it when frequent coordination is needed. -%% -%% Each voting action uses the following following callbacks: -%% -%% -%%
  • -%% prepare: Executed on the participant sites. -%% Classy prepends an additional argument indicating whether the prepare action can have side effects. -%% Return value is a boolean indicating the participant's vote (`true' = `yes'). -%%
  • -%% -%%
  • -%% commit: List of actions executed on the participant sites -%% when the coordinator decides to go ahead with the commit. -%% Classy doesn't add additional arguments. -%% Return value is ignored. -%% -%% Un-executed or failed actions are retried after node restart. -%%
  • -%% -%%
  • -%% rollback: Executed on the participant sites -%% when the coordinator decides to abort the commit. -%% Classy doesn't add additional arguments. -%% Return value is ignored. -%% This callback can be retried on node restart. -%%
  • -%% -%%
  • -%% post_vote: Executed on the coordinator. -%% Classy prepends a boolean indicating result of the vote to the argument list. -%% Return value is ignored. -%% It's NOT guaranteed that all commit actions on the participants are finished by this time. -%% This callback can be retried on node restart. -%%
  • -%% -%%
  • -%% on_fail: Executed on both coordinator and participant if commit / rollback / post_commit actions fail. -%% This callback may be used signal failures to the business logic. -%% Classy prepends an argument of type `fail_info'. -%%
  • -%%
    -%% -%% WARNING: when `commit', `rollback' or `post_vote' actions fail, -%% coordinator or participant processes stop, but pending commit action will linger in the DB. -%% Classy will not attempt to recover the action until the next node restart. -%% Recovery of failed commit or rollback actions is left to the API consumers. -module(classy_vote). +-moduledoc """ +This module implements a variation of 2-phase commit. + +Important to note: + +@enumerate +@item All callbacks involved in the operations are persistently stored. + Certain callbacks may be retried after a node restart. + Hence, user must make sure that functions involved in the commit are not removed during upgrade. + +@item Vote is rather heavy operation. + Do not use it when frequent coordination is needed. + +@item Commit flows may hang for an unlimited time if the coordinator node fails +during the decision stage. +@end enumerate + +@section Error Handling + +This API uses both synchronous and asynchronous methods of status and error reporting. +Both methods must be handled in all cases. +Note: when @link{classy_vote:create/1,create/1} API returns @code{@{ok, _@}}, +it doesn't mean the commit has been completed. + +@enumerate +@item +If this function returns an error tuple, +it means the commit followed a "fast abort" path. +"Fast abort" path is synchronous, +and it implies that no persistent changes have been made to the involved sites +(participant and coordinator). + +@item +When the function returns @code{@{ok, _@}}, +it means commit flow entered "persistent" path. +Persistent path continues even after restart of any involved site. +Because of that, it uses asynchronous method of status reporting +via callbacks passed in the options. + +The coordinator is notified of the outcome via @code{post_vote} callback. + +The participants are notified via their respective @code{commit} or @code{rollback} callbacks. + +@item +Additionally, +if @code{post_vote}, @code{commit} or @code{rollback} callbacks throw an exception, +@code{on_fail} callback gets involved. + +Such mechanism is used because classy doesn't automatically retry any actions that failed on the persistent path: +there is a high risk that these actions will just keep failing repeatedly. +Instead, they are abandoned until the next node restart. +@code{on_fail} callback provides a mechanism to signal such failures back to the business logic. + +@end enumerate +""". %% API: -export([ create/1 @@ -101,24 +106,94 @@ %%================================================================================ -%% Lock tag associated with the operation. -%% It allows business logic to quickly enumerate ongoing votes of certain kind. +-doc """ +Arbitrary tag associated with the operation. +It allows business logic to quickly enumerate ongoing votes of certain kind. +""". -type tag() :: term(). -%% Unique ID of the vote. + +-doc "Unique ID of the vote.". -type id() :: classy_uid:cu_tuple(). -%% Strategy used to decide when to commit -%% -%% `all': All participant must vote `true' within the timeout +-doc """ +Strategy used to decide whether to commit. + +@code{all}: All participant must vote yes within the timeout. +""". -type strategy() :: {all, timeout()}. +-doc """ +Per-participant set of commit actions. + +@table @code +@item prepare + Callback that lets the participant decide whether to commit. + + Classy prepends two additional values to the user-specified argument list: + @enumerate + @item A boolean indicating whether the prepare action can have side effects. + It is set to @code{false} during pre-commit fast abort check + and to @code{true} during the persistent flow. + + @item ID of the vote. + @end enumerate + + The return value is a boolean indicating the participant's vote (@code{true} means ``yes''). + +@item commit + List of actions executed on the sites if the coordinator decides to go ahead with the commit. + Classy prepends vote ID to the user-specified argument list. + Return value is ignored. + +@item rollback + Action executed on the participant when the coordinator decides to abort the commit. + Classy prepends vote ID to the user-specified argument list. + Return value is ignored. +@end table +""". -type actions() :: #{ prepare := classy_lib:mfargs() , commit := [classy_lib:mfargs()] , rollback => classy_lib:mfargs() }. -%% Warning: MFA's are persistently stored! +-doc """ +Common vote options. + +@table @code +@item tag + An arbitrary tag identifying the commit action. + Ongoing commit actions can be efficiently filtered by the tag. + +@item actions + A map from @link{t:classy:site/0,site ID} to per-site @ref{t:classy_vote:actions/0, commit actions}. + Each site in the map becomes a vote participant. + Participants' actions may be non-uniform. + +@item post_vote + Callback that is executed on the coordinator after the decision is made. + Classy prepends two arguments to the user-specified argument list: + + @enumerate + @item A boolean indicating the decision + @item Vote ID + @end enumerate + + The return value is ignored. + + Note: it's NOT guaranteed that all commit actions on the participants are finished + by the time when @code{post_vote} is called. + This callback can be retried on node restart. + +@item strategy + @xref{t:classy_vote:strategy/0,strategy()}. + +@item on_fail + Executed on both coordinator and participant if commit / rollback / post_commit actions fail. + This callback may be used signal failures to the business logic. + Classy prepends an argument of type @ref{t:classy_vote:fail_info/0} to the user-specified argument list. +@end table +""". -type options() :: #{ tag := tag() , actions := #{classy:site() => actions()} @@ -148,14 +223,18 @@ %% API functions %%================================================================================ -%% @doc List all ongoing commit actions. +-doc """ +@xref{classy_vote:ls_votes/1}. No filtering. +""". -spec ls_votes() -> [vote_info()]. ls_votes() -> ls_votes('_'). -%% @doc List ongoing commit actions. -%% -%% Argument: match specification for the tag. +-doc """ +List all ongoing 2PC flows where the local site is either a coordinator or a participant. + +The argument is an ETS match expression that allows to filter on the tag. +""". -spec ls_votes(_TagMatch) -> [vote_info()]. ls_votes(TagMatch) -> fold_ongoing( @@ -163,9 +242,17 @@ ls_votes(TagMatch) -> [], TagMatch). -%% @doc Fold over ongoing commit actions. -%% -%% Argument: match specification for the tag. +-doc """ +Fold over ongoing 2PC flows where the local site is either a coordinator or a participant. + +Arguments: + +@enumerate +@item Fold function +@item Initial accumulator +@item ETS match expression for filtering the tag +@end enumerate +""". -spec fold_ongoing(fun((vote_info(), Acc) -> Acc), Acc, _TagMatchPattern) -> Acc. fold_ongoing(Fun, Acc0, TagMatch) -> classy_vote_participant:fold_ongoing( @@ -173,9 +260,9 @@ fold_ongoing(Fun, Acc0, TagMatch) -> classy_vote_coordinator:fold_ongoing(Fun, Acc0, TagMatch), TagMatch). -%% @doc Initiate a new vote. -%% -%% Note: This function returns immediately. +-doc """ +Initiate a new vote, @pxref{t:classy_vote:options/0,options/0}. +""". -spec create(options()) -> {ok, classy_vote:id()} | {error, _}. create(UserOptions) -> maybe From e97761d5c7c1c995e033826eb25d51ba9a749bb5 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 17 Jun 2026 14:10:43 +0200 Subject: [PATCH 5/8] membership: Fix flaky test --- test/classy_SUITE.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/classy_SUITE.erl b/test/classy_SUITE.erl index c488b3b..55ff7e2 100644 --- a/test/classy_SUITE.erl +++ b/test/classy_SUITE.erl @@ -377,6 +377,7 @@ t_071_membership_forget(_) -> S2 = <<"s2">>, S3 = <<"s3">>, S4 = <<"s4">>, + Sites = [S1, S2, S3, S4], ForgetAfterS = 1, WaitForget = ForgetAfterS * 1000 + 10, AppConf = {familiar_app, @@ -394,6 +395,7 @@ t_071_membership_forget(_) -> _N4 = create_start_site(S4, Conf), [ok = ?ON(I, classy:join_node(N1, join)) || I <- [S2, S3, S4]], #{cluster := Cluster} = ?ON(S1, classy_node:hello()), + [wait_site_joined(Sites, Cluster, I) || I <- [S2, S3, S4]], %% Stop S3. Its absence should prevent cleanup from doing anything. stop_site(S3), %% Kick S2, pass time and trigger cleanup at S1: From 463d7ef7cf5db1603006822ede6925ed76a29d26 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 17 Jun 2026 14:39:38 +0200 Subject: [PATCH 6/8] vote: Fix flaky test --- src/classy_internal.hrl | 1 + src/classy_vote.erl | 12 +++++++----- src/classy_vote_participant.erl | 11 +++++++++-- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/classy_internal.hrl b/src/classy_internal.hrl index 12b7554..eecce67 100644 --- a/src/classy_internal.hrl +++ b/src/classy_internal.hrl @@ -54,6 +54,7 @@ -define(classy_vote_part_perform_action, classy_vote_part_perform_action). -define(classy_vote_flow_start, classy_vote_flow_start). -define(classy_vote_part_established, classy_vote_part_established). +-define(classy_vote_part_recv_outcome, classy_vote_part_recv_outcome). -define(classy_vote_alloc_id, classy_vote_alloc_id). -define(classy_vote_part_recv, classy_vote_part_recv). -define(classy_vote_part_send_vote, classy_vote_part_send_vote). diff --git a/src/classy_vote.erl b/src/classy_vote.erl index aa73b1a..2fddc31 100644 --- a/src/classy_vote.erl +++ b/src/classy_vote.erl @@ -97,7 +97,7 @@ Instead, they are abandoned until the next node restart. -export([ test_wait_conclude/1 , trace_props/0 , prop_every_vote_concludes/1 - , prop_coord_receives_votes/1 + , prop_every_participant_receives_outcome/1 ]). -endif. @@ -461,7 +461,7 @@ test_wait_conclude(ID) -> trace_props() -> [ fun ?MODULE:prop_every_vote_concludes/1 - , fun ?MODULE:prop_coord_receives_votes/1 + , fun ?MODULE:prop_every_participant_receives_outcome/1 ]. prop_every_vote_concludes(Trace) -> @@ -471,10 +471,12 @@ prop_every_vote_concludes(Trace) -> K =:= ?classy_vote_coord_early_abort, Trace). -prop_coord_receives_votes(Trace) -> +%% This property should always hold, unless the coordinator is removed +%% from the cluster and the participants auto-abort. +prop_every_participant_receives_outcome(Trace) -> ?strict_causality( - #{?snk_kind := ?classy_vote_part_send_vote, id := _Id, vote := _Vote, from := _From, ?snk_span := start}, - #{?snk_kind := ?classy_vote_coord_recv, id := _Id, vote := _Vote, from := _From}, + #{?snk_kind := ?classy_vote_part_established, id := _Id, site := _Site}, + #{?snk_kind := ?classy_vote_part_recv_outcome, id := _Id, site := _Site}, Trace). -endif. diff --git a/src/classy_vote_participant.erl b/src/classy_vote_participant.erl index 7ad14a8..622320f 100644 --- a/src/classy_vote_participant.erl +++ b/src/classy_vote_participant.erl @@ -208,7 +208,13 @@ terminate(Reason, State, _Data) -> %% Internal functions %%================================================================================ -do_receive_outcome(From, #c_outcome{result = Result}, D0 = #d{vote = MyVote}) -> +do_receive_outcome(From, #c_outcome{result = Result}, D0 = #d{vote = MyVote, prep = Prep}) -> + {ok, Self} = classy_node:the_site(), + ?tp(debug, ?classy_vote_part_recv_outcome, + #{ outcome => Result + , id => Prep#prepare.id + , site => Self + }), NextStage = case Result of true -> ?s_commit; false -> ?s_rollback @@ -348,7 +354,8 @@ db_establish(Stage, Vote, CompletedActions, Prep) -> [ {w, DataKey, Prep} , {w, StateKey, State} ]), - ?tp(debug, ?classy_vote_part_established, #{id => ID, tag => Tag}), + {ok, Site} = classy_node:the_site(), + ?tp(debug, ?classy_vote_part_established, #{id => ID, tag => Tag, site => Site}), {ok, #d{ prep = Prep , vote = Vote , completed_actions = CompletedActions From 091f84cbf18afa616b95517b41b359bf17098e8f Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 17 Jun 2026 14:52:26 +0200 Subject: [PATCH 7/8] doc: Improve documentation about the run levels --- doc/classy.texi | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/doc/classy.texi b/doc/classy.texi index 18a5b4f..5d3265c 100644 --- a/doc/classy.texi +++ b/doc/classy.texi @@ -106,6 +106,14 @@ The document was typeset with Site is fully connected and operational. @end enumerate + Unless the node is stopped abruptly + (@code{erlang:halt}, @code{kill}, power loss), + run levels always advance and retard in sequence. + + Business applications can track the run level via @ref{classy:run_level/2} hook. + This hook can, for example, + activate or deactivate some business applications. + @node Network Partition @section Network Partitions From da293a2b2d9d3c60a96d9755a3268c58b89da0d1 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 17 Jun 2026 15:49:35 +0200 Subject: [PATCH 8/8] vote: Fix types --- src/classy_vote.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/classy_vote.erl b/src/classy_vote.erl index 2fddc31..0f2865d 100644 --- a/src/classy_vote.erl +++ b/src/classy_vote.erl @@ -154,7 +154,7 @@ Per-participant set of commit actions. -type actions() :: #{ prepare := classy_lib:mfargs() , commit := [classy_lib:mfargs()] - , rollback => classy_lib:mfargs() + , rollback => [classy_lib:mfargs()] }. -doc """ @@ -190,7 +190,7 @@ Common vote options. @item on_fail Executed on both coordinator and participant if commit / rollback / post_commit actions fail. - This callback may be used signal failures to the business logic. + This callback may be used to signal failures to the business logic. Classy prepends an argument of type @ref{t:classy_vote:fail_info/0} to the user-specified argument list. @end table """.