Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ Cluster managers implement how workers can be added, removed and communicated wi
"""
abstract type ClusterManager end

# cluster_manager is a global constant
const cluster_manager = Ref{ClusterManager}()

function throw_if_cluster_manager_unassigned()
isassigned(cluster_manager) || error("cluster_manager is unassigned")
return nothing
end

"""
WorkerConfig

Expand Down Expand Up @@ -390,8 +398,7 @@ function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClus

# On workers, the default cluster manager connects via TCP sockets. Custom
# transports will need to call this function with their own manager.
global cluster_manager
cluster_manager = manager
cluster_manager[] = manager

# Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called.
@assert nprocs() <= 1
Expand Down
8 changes: 6 additions & 2 deletions src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
end

function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version)
throw_if_cluster_manager_unassigned()

# register a new peer worker connection
w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version)
send_connection_hdr(w, false)
Expand All @@ -328,6 +330,8 @@ function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, versi
end

function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
throw_if_cluster_manager_unassigned()

LPROC.id = msg.self_pid
controller = Worker(1, r_stream, w_stream, cluster_manager; version=version)
notify(controller.initialized)
Expand All @@ -348,9 +352,9 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
let rpid=rpid, wconfig=wconfig
if lazy
# The constructor registers the object with a global registry.
Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig))
Worker(rpid, ()->connect_to_peer(cluster_manager[], rpid, wconfig))
else
@async connect_to_peer(cluster_manager, rpid, wconfig)
@async connect_to_peer(cluster_manager[], rpid, wconfig)
end
end
end
Expand Down
Loading