Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,25 @@ concurrency:

jobs:
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
name: julia -t${{ matrix.threads}} - ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
Comment thread
JamesWrigley marked this conversation as resolved.
runs-on: ${{ matrix.os }}
timeout-minutes: 30
strategy:
fail-fast: false
matrix:
version:
- 'nightly'
os:
- ubuntu-latest
- macOS-latest
- windows-latest
arch:
- x64
- x86
threads:
# - '1'
- '4,4'
version: [nightly]
os: [ubuntu-latest, windows-latest, macOS-latest]
arch: [x64, x86, aarch64]
exclude:
- os: ubuntu-latest
arch: aarch64
- os: windows-latest
arch: aarch64
- os: macOS-latest
arch: x64
- os: macOS-latest
arch: x86
steps:
Expand All @@ -44,6 +48,7 @@ jobs:
- uses: julia-actions/julia-runtest@v1
env:
JULIA_DISTRIBUTED_TESTING_STANDALONE: 1
JULIA_NUM_THREADS: '${{ matrix.threads}}'
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v5
with:
Expand Down
64 changes: 30 additions & 34 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ function set_worker_state(w, state)
end

function check_worker_state(w::Worker)
if w.state === W_CREATED
if (@atomic w.state) === W_CREATED
if !isclusterlazy()
if PGRP.topology === :all_to_all
# Since higher pids connect with lower pids, the remote worker
Expand All @@ -163,10 +163,10 @@ function check_worker_state(w::Worker)
else
w.ct_time = time()
if myid() > w.id
t = Threads.@spawn Threads.threadpool() exec_conn_func(w)
t = @async exec_conn_func(w)
else
# route request via node 1
t = Threads.@spawn Threads.threadpool() remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
end
errormonitor(t)
wait_for_conn(w)
Expand All @@ -190,20 +190,14 @@ function exec_conn_func(w::Worker)
end

function wait_for_conn(w)
if w.state === W_CREATED
if (@atomic w.state) === W_CREATED
timeout = worker_timeout() - (time() - w.ct_time)
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")

T = Threads.@spawn Threads.threadpool() begin
sleep($timeout)
lock(w.c_state) do
notify(w.c_state; all=true)
end
end
errormonitor(T)
lock(w.c_state) do
wait(w.c_state)
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
if timedwait(() -> (@atomic w.state) === W_CONNECTED, timeout) === :timed_out
# Notify any waiters on the state and throw
@lock w.c_state notify(w.c_state)
error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
end
end
nothing
Expand Down Expand Up @@ -258,7 +252,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
else
sock = listen(interface, LPROC.bind_port)
end
errormonitor(Threads.@spawn while isopen(sock)
errormonitor(@async while isopen(sock)
client = accept(sock)
process_messages(client, client, true)
end)
Expand Down Expand Up @@ -290,7 +284,7 @@ end


function redirect_worker_output(ident, stream)
t = Threads.@spawn while !eof(stream)
t = @async while !eof(stream)
line = readline(stream)
if startswith(line, " From worker ")
# stdout's of "additional" workers started from an initial worker on a host are not available
Expand Down Expand Up @@ -329,7 +323,7 @@ function read_worker_host_port(io::IO)
leader = String[]
try
while ntries > 0
readtask = Threads.@spawn Threads.threadpool() readline(io)
readtask = @async readline(io)
yield()
while !istaskdone(readtask) && ((time_ns() - t0) < timeout)
sleep(0.05)
Expand Down Expand Up @@ -430,7 +424,7 @@ if launching workers programmatically, execute `addprocs` in its own task.

```julia
# On busy clusters, call `addprocs` asynchronously
t = Threads.@spawn addprocs(...)
t = @async addprocs(...)
```

```julia
Expand Down Expand Up @@ -496,13 +490,14 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
# call manager's `launch` is a separate task. This allows the master
# process initiate the connection setup process as and when workers come
# online
t_launch = Threads.@spawn Threads.threadpool() launch(manager, params, launched, launch_ntfy)
# NOTE: Must be `@async`. See FIXME above
t_launch = @async launch(manager, params, launched, launch_ntfy)

@sync begin
while true
if isempty(launched)
istaskdone(t_launch) && break
Threads.@spawn Threads.threadpool() begin
@async begin # NOTE: Must be `@async`. See FIXME above
sleep(1)
notify(launch_ntfy)
end
Expand All @@ -512,7 +507,8 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
if !isempty(launched)
wconfig = popfirst!(launched)
let wconfig=wconfig
Threads.@spawn Threads.threadpool() setup_launched_worker(manager, wconfig, launched_q)
# NOTE: Must be `@async`. See FIXME above
@async setup_launched_worker(manager, wconfig, launched_q)
end
end
end
Expand Down Expand Up @@ -592,7 +588,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
wconfig.port = port

let wconfig=wconfig
Threads.@spawn Threads.threadpool() begin
@async begin
pid = create_worker(manager, wconfig)
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
push!(launched_q, pid)
Expand Down Expand Up @@ -660,7 +656,7 @@ function create_worker(manager, wconfig)
for jw in PGRP.workers
if (jw.id != 1) && (jw.id < w.id)
# wait for wl to join
if jw.state === W_CREATED
if (@atomic jw.state) === W_CREATED
lock(jw.c_state) do
wait(jw.c_state)
end
Expand Down Expand Up @@ -688,7 +684,7 @@ function create_worker(manager, wconfig)

for wl in wlist
lock(wl.c_state) do
if wl.state === W_CREATED
if (@atomic wl.state) === W_CREATED
# wait for wl to join
wait(wl.c_state)
end
Expand Down Expand Up @@ -758,7 +754,7 @@ function check_master_connect()
end

errormonitor(
Threads.@spawn begin
@async begin
timeout = worker_timeout()
if timedwait(() -> !haskey(map_pid_wrkr, 1), timeout) === :timed_out
print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n")
Expand Down Expand Up @@ -890,7 +886,7 @@ function nprocs()
n = length(PGRP.workers)
# filter out workers in the process of being setup/shutdown.
for jw in PGRP.workers
if !isa(jw, LocalProcess) && (jw.state !== W_CONNECTED)
if !isa(jw, LocalProcess) && ((@atomic jw.state) !== W_CONNECTED)
n = n - 1
end
end
Expand Down Expand Up @@ -941,7 +937,7 @@ julia> procs()
function procs()
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
# filter out workers in the process of being setup/shutdown.
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
else
return Int[x.id for x in PGRP.workers]
end
Expand All @@ -950,7 +946,7 @@ end
function id_in_procs(id) # faster version of `id in procs()`
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
for x in PGRP.workers
if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state === W_CONNECTED)
if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === W_CONNECTED)
return true
end
end
Expand All @@ -972,7 +968,7 @@ Specifically all workers bound to the same ip-address as `pid` are returned.
"""
function procs(pid::Integer)
if myid() == 1
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager))
Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)]
else
Expand Down Expand Up @@ -1050,13 +1046,13 @@ function rmprocs(pids...; waitfor=typemax(Int))

pids = vcat(pids...)
if waitfor == 0
t = Threads.@spawn Threads.threadpool() _rmprocs(pids, typemax(Int))
t = @async _rmprocs(pids, typemax(Int))
yield()
return t
else
_rmprocs(pids, waitfor)
# return a dummy task object that user code can wait on.
return Threads.@spawn Threads.threadpool() nothing
return @async nothing
end
end

Expand All @@ -1079,11 +1075,11 @@ function _rmprocs(pids, waitfor)

start = time_ns()
while (time_ns() - start) < waitfor*1e9
all(w -> w.state === W_TERMINATED, rmprocset) && break
all(w -> (@atomic w.state) === W_TERMINATED, rmprocset) && break
sleep(min(0.1, waitfor - (time_ns() - start)/1e9))
end

unremoved = [wrkr.id for wrkr in filter(w -> w.state !== W_TERMINATED, rmprocset)]
unremoved = [wrkr.id for wrkr in filter(w -> (@atomic w.state) !== W_TERMINATED, rmprocset)]
if length(unremoved) > 0
estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.")
throw(ErrorException(estr))
Expand Down Expand Up @@ -1239,7 +1235,7 @@ function interrupt(pids::AbstractVector=workers())
@assert myid() == 1
@sync begin
for pid in pids
Threads.@spawn Threads.threadpool() interrupt(pid)
@async interrupt(pid)
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ function remotecall_eval(m::Module, procs, ex)
# execute locally last as we do not want local execution to block serialization
# of the request to remote nodes.
for _ in 1:run_locally
Threads.@spawn Threads.threadpool() Core.eval(m, ex)
@async Core.eval(m, ex)
end
end
nothing
Expand Down Expand Up @@ -275,7 +275,7 @@ function preduce(reducer, f, R)
end

function pfor(f, R)
t = Threads.@spawn Threads.threadpool() @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
@spawnat :any f(R, first(c), last(c))
end
errormonitor(t)
Expand Down
18 changes: 13 additions & 5 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ addprocs([

* `exeflags`: additional flags passed to the worker processes. It can either be a `Cmd`, a `String`
holding one flag, or a collection of strings, with one element per flag.
E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`.
E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`.

* `topology`: Specifies how the workers connect to each other. Sending a message between
unconnected workers results in an error.
Expand Down Expand Up @@ -178,7 +178,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
# Wait for all launches to complete.
@sync for (i, (machine, cnt)) in enumerate(manager.machines)
let machine=machine, cnt=cnt
Threads.@spawn Threads.threadpool() try
@async try
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
catch e
print(stderr, "exception launching on machine $(machine) : $(e)\n")
Expand Down Expand Up @@ -740,16 +740,24 @@ function kill(manager::SSHManager, pid::Int, config::WorkerConfig)
nothing
end

function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeout = 15, term_timeout = 15)
function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wait = 6, exit_timeout = 15, term_timeout = 15)
# profile_wait = 6 is 1s for profile, 5s for the report to show
# First, try sending `exit()` to the remote over the usual control channels
remote_do(exit, pid)

timer_task = Threads.@spawn Threads.threadpool() begin
timer_task = @async begin
sleep(exit_timeout)

# Check to see if our child exited, and if not, send an actual kill signal
if !process_exited(config.process)
@warn("Failed to gracefully kill worker $(pid), sending SIGQUIT")
@warn "Failed to gracefully kill worker $(pid)"
profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10)
if profile_sig !== nothing
@warn("Sending profile $(profile_sig[1]) to worker $(pid)")
kill(config.process, profile_sig[2])
sleep(profile_wait)
end
@warn("Sending SIGQUIT to worker $(pid)")
kill(config.process, Base.SIGQUIT)

sleep(term_timeout)
Expand Down
4 changes: 2 additions & 2 deletions src/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,13 @@ end
function flush_gc_msgs()
try
for w in (PGRP::ProcessGroup).workers
if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag
if isa(w,Worker) && ((@atomic w.state) == W_CONNECTED) && w.gcflag
flush_gc_msgs(w)
end
end
catch e
bt = catch_backtrace()
Threads.@spawn showerror(stderr, e, bt)
@async showerror(stderr, e, bt)
end
end

Expand Down
Loading