Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 72 additions & 8 deletions src/RustyIceberg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,63 @@ end
const _INIT_LOCK::ReentrantLock = ReentrantLock()
_PANIC_HOOK::Function = default_panic_hook

# ---------------------------------------------------------------------------
# Profiler signal masking
# ---------------------------------------------------------------------------
# Julia's CPU profiler delivers SIGUSR2 to every adopted Julia thread to
# collect backtraces. Tokio worker threads become adopted Julia threads via
# the @ccallable preamble in notify_result_iceberg, but while they execute
# Rust code between Julia callbacks their Julia task context is incomplete.
# Receiving SIGUSR2 in that state causes UB in Julia's signal handler,
# manifesting as a hang when Profile.@profile is active.
#
# Fix: block SIGUSR2 (and SIGPROF, used by older Julia / macOS profilers)
# on the calling thread before starting the Tokio runtime. Tokio worker
# threads inherit the mask and are never targeted by the profiler.
# The mask is restored on the calling thread after init_runtime returns.

@static if Sys.islinux()
const _SIGSET_T_BYTES = 128 # sizeof(sigset_t) on Linux
const _SIG_BLOCK = Cint(0)
const _SIG_SETMASK = Cint(2)
const _SIGUSR2_NUM = 12
const _SIGPROF_NUM = 27

function _profiler_sigset()
buf = zeros(UInt8, _SIGSET_T_BYTES)
for sig in (_SIGUSR2_NUM, _SIGPROF_NUM)
buf[div(sig - 1, 8) + 1] |= UInt8(1 << ((sig - 1) % 8))
end
buf
end
const _PROFILER_SIGSET = _profiler_sigset()

function _block_profiler_signals()
old = zeros(UInt8, _SIGSET_T_BYTES)
GC.@preserve old _PROFILER_SIGSET begin
@ccall pthread_sigmask(
_SIG_BLOCK::Cint,
pointer(_PROFILER_SIGSET)::Ptr{Cvoid},
pointer(old)::Ptr{Cvoid},
)::Cint
end
return old
end

function _restore_signal_mask(old::Vector{UInt8})
GC.@preserve old begin
@ccall pthread_sigmask(
_SIG_SETMASK::Cint,
pointer(old)::Ptr{Cvoid},
C_NULL::Ptr{Cvoid},
)::Cint
end
end
else
_block_profiler_signals() = nothing
_restore_signal_mask(::Nothing) = nothing
end

Base.@ccallable function iceberg_panic_hook_wrapper()::Cint
global _PANIC_HOOK
_PANIC_HOOK()
Expand Down Expand Up @@ -144,15 +201,22 @@ function init_runtime(
_PANIC_HOOK = on_rust_panic
panic_fn_ptr = @cfunction(iceberg_panic_hook_wrapper, Cint, ())
fn_ptr = @cfunction(notify_result_iceberg, Cint, (Ptr{Nothing},))
res = @ccall rust_lib.iceberg_init_runtime(config::StaticConfig, panic_fn_ptr::Ptr{Nothing}, fn_ptr::Ptr{Nothing})::Cint
if res != 0
throw(IcebergException(
INTERNAL,
"Failed to initialize Iceberg runtime",
"iceberg_init_runtime returned $res",
))
# Block profiler signals before starting Tokio so its worker threads
# inherit a mask that excludes SIGUSR2/SIGPROF (see _block_profiler_signals).
old_mask = _block_profiler_signals()
try
res = @ccall rust_lib.iceberg_init_runtime(config::StaticConfig, panic_fn_ptr::Ptr{Nothing}, fn_ptr::Ptr{Nothing})::Cint
if res != 0
throw(IcebergException(
INTERNAL,
"Failed to initialize Iceberg runtime",
"iceberg_init_runtime returned $res",
))
end
_ICEBERG_STARTED[] = true
finally
_restore_signal_mask(old_mask)
end
_ICEBERG_STARTED[] = true
end
return nothing
end
Expand Down
Loading