From d6babb981749c20b0506d31906269da3f54b6379 Mon Sep 17 00:00:00 2001 From: Dilum Aluthge Date: Fri, 20 Feb 2026 08:18:19 -0500 Subject: [PATCH] Change `Distributed.cluster_manager` from a global non-constant `ClusterManager` to a global constant `Ref{ClusterManager}` (#177) This is a forward-port of https://github.com/JuliaLang/Distributed.jl/pull/177 (https://github.com/JuliaLang/Distributed.jl/commit/2fe1aa4e267517565e99cd06664550dcd230cfc6). (cherry picked from commit 2fe1aa4e267517565e99cd06664550dcd230cfc6) --- src/cluster.jl | 11 +++++++++-- src/process_messages.jl | 8 ++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/cluster.jl b/src/cluster.jl index 2c8f2f7..50e4627 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -9,6 +9,14 @@ Cluster managers implement how workers can be added, removed and communicated wi """ abstract type ClusterManager end +# cluster_manager is a global constant +const cluster_manager = Ref{ClusterManager}() + +function throw_if_cluster_manager_unassigned() + isassigned(cluster_manager) || error("cluster_manager is unassigned") + return nothing +end + """ WorkerConfig @@ -390,8 +398,7 @@ function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClus # On workers, the default cluster manager connects via TCP sockets. Custom # transports will need to call this function with their own manager. - global cluster_manager - cluster_manager = manager + cluster_manager[] = manager # Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called. @assert nprocs() <= 1 diff --git a/src/process_messages.jl b/src/process_messages.jl index a444651..33d6fe3 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -315,6 +315,8 @@ function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version) end function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version) + throw_if_cluster_manager_unassigned() + # register a new peer worker connection w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version) send_connection_hdr(w, false) @@ -328,6 +330,8 @@ function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, versi end function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) + throw_if_cluster_manager_unassigned() + LPROC.id = msg.self_pid controller = Worker(1, r_stream, w_stream, cluster_manager; version=version) notify(controller.initialized) @@ -348,9 +352,9 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) let rpid=rpid, wconfig=wconfig if lazy # The constructor registers the object with a global registry. - Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig)) + Worker(rpid, ()->connect_to_peer(cluster_manager[], rpid, wconfig)) else - @async connect_to_peer(cluster_manager, rpid, wconfig) + @async connect_to_peer(cluster_manager[], rpid, wconfig) end end end