Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions .github/workflows/test_template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,26 @@ jobs:
CTEST_START: ${{ inputs.CTEST_START }}
CTEST_END: ${{ inputs.CTEST_END }}
MANTICORE_LOCATOR: ${{ inputs.MANTICORE_LOCATOR }}
# ─── Heap-corruption diagnostics for the auto-embed crashes ───
# MALLOC_CHECK_=3 enables glibc's malloc consistency checks. On glibc
# < 2.34 the env var is read directly. On glibc >= 2.34 the env var
# is ignored for unprivileged binaries and you must use the tunable.
# Setting both makes us version-agnostic.
#
# Effect on next crash: glibc prints a diagnostic line to stderr
# naming the corruption kind (e.g. "double free or corruption",
# "free(): invalid pointer", "malloc(): memory corruption") BEFORE
# aborting. searchd's --log captures stderr, so the line will show
# up in searchd.log just above the crash dump.
#
# MALLOC_PERTURB_=204 fills new mallocs with 0xCC and freed memory
# with 0x33 (~0xCC). Catches use-after-free reads as garbage values
# propagating into downstream logic.
MALLOC_CHECK_: "3"
GLIBC_TUNABLES: "glibc.malloc.check=3"
MALLOC_PERTURB_: "204"
# The following is useful to test a specific test, just uncomment it, no need to disable CTEST_START/END
# CTEST_REGEX: test_234
# CTEST_REGEX: test_481
steps:
- name: Checkout repository
uses: actions/checkout@v4
Expand Down Expand Up @@ -174,10 +192,40 @@ jobs:
continue-on-error: true
run: ${{ inputs.xml_command }}

# ─── Capture embeddings lib diagnostics ──────────────────────────────
# The embeddings lib's GetLibFuncs() installs:
# - a Rust panic hook that writes to /tmp/manticore-embeddings-diag.log
# - an async-signal-safe SIGSEGV/SIGBUS/SIGABRT/SIGILL handler that
# writes a marker line to the same file before re-raising
# so the next time the daemon crashes we get the actual panic site /
# signal that triggered it, NOT just the downstream malloc abort.
#
# Copy the file into the test build dir so the upload step picks it up,
# and also dump it to the step log for inline visibility.
- name: Collect embeddings diag log
if: always()
continue-on-error: true
run: |
mkdir -p build/_deps/manticore-build/test
if [ -f /tmp/manticore-embeddings-diag.log ]; then
cp /tmp/manticore-embeddings-diag.log build/_deps/manticore-build/test/embeddings-diag.log
echo "===== embeddings diag log ====="
cat /tmp/manticore-embeddings-diag.log
echo "===== end embeddings diag log ====="
else
echo "no embeddings diag log at /tmp/manticore-embeddings-diag.log" \
> build/_deps/manticore-build/test/embeddings-diag.log
echo "no embeddings diag log produced"
fi
# also search the test data dir for any diag logs the daemon may
# have produced inside per-test directories (in case /tmp is wiped
# between test runs)
find build/_deps/manticore-build/test -name 'manticore-embeddings-diag.log' -print -exec cat {} \; || true

- name: Upload test artifacts
if: always()
continue-on-error: true
uses: manticoresoftware/upload_artifact_with_retries@v4
with:
name: ${{ inputs.artifact_name }}
path: "build/junit*.xml build/_deps/manticore-build/test/test_*/report.* build/_deps/manticore-build/test/error*.txt build/_deps/manticore-build/test/*log build/status*"
path: "build/junit*.xml build/_deps/manticore-build/test/test_*/report.* build/_deps/manticore-build/test/error*.txt build/_deps/manticore-build/test/*log build/_deps/manticore-build/test/embeddings-diag.log build/status*"
1 change: 1 addition & 0 deletions embeddings/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions embeddings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ candle-core = "0.9.2"
candle-nn = "0.9.2"
candle-transformers = "0.9.2"
ort = { version = "2.0.0-rc.9", default-features = false, features = ["std"] }
libc = "0.2"

[features]
default = []
Expand Down
244 changes: 225 additions & 19 deletions embeddings/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,232 @@ const LIB: EmbedLib = EmbedLib {
free_string: TextModelWrapper::free_string,
};

/// Path the diagnostics hooks write to. The daemon's stderr is not captured
/// by the test harness, so we write to a known file path that CI can upload
/// as an artifact. CI step:
///
/// - name: Capture embeddings diagnostics
/// if: always()
/// run: cat /tmp/manticore-embeddings-diag.log || echo "no diag log"
const DIAG_LOG_PATH: &str = "/tmp/manticore-embeddings-diag.log";

/// Append one line to DIAG_LOG_PATH plus a process-local stderr copy.
/// Errors are intentionally swallowed — this is a diagnostics path, we
/// must not panic from within it.
pub(crate) fn diag_log(line: &str) {
use std::io::Write;
eprintln!("{line}");
if let Ok(mut f) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(DIAG_LOG_PATH)
{
let _ = writeln!(f, "{line}");
let _ = f.flush();
}
}

/// Find which mapped VMA in /proc/self/maps contains the given address.
/// Returns (start, end, tag) of that mapping. Works for both pthread stacks
/// AND boost-coroutine stacks. pthread stacks have the `[stack]` tag for
/// the main thread or anonymous for spawned threads; boost-coroutine
/// stacks are always anonymous rw-p mappings invisible to
/// pthread_getattr_np. Either way /proc/self/maps shows them.
///
/// Reading /proc/self/maps is NOT async-signal-safe and may allocate. Do
/// not call from signal handlers. From normal probe sites it's fine.
#[cfg(target_os = "linux")]
fn stack_region_for(addr: usize) -> Option<(usize, usize, String)> {
let maps = std::fs::read_to_string("/proc/self/maps").ok()?;
for line in maps.lines() {
// Format: "7f277d95a000-7f277d97a000 rw-p 00000000 00:00 0 [stack]"
// or just "...rw-p ..." for anonymous (including boost coroutines).
let mut parts = line.splitn(2, ' ');
let range = parts.next()?;
let rest = parts.next().unwrap_or("");
let mut bounds = range.splitn(2, '-');
let start = usize::from_str_radix(bounds.next()?, 16).ok()?;
let end = usize::from_str_radix(bounds.next()?, 16).ok()?;
if addr >= start && addr < end {
// Tag: text after the last whitespace — "[stack]", "[heap]",
// a path, or treat empty as "[anon]" (= likely boost coroutine).
let tag = rest
.split_whitespace()
.last()
.filter(|s| s.starts_with('[') || s.starts_with('/'))
.unwrap_or("[anon]")
.to_string();
return Some((start, end, tag));
}
}
None
}

#[cfg(not(target_os = "linux"))]
fn stack_region_for(_addr: usize) -> Option<(usize, usize, String)> {
None
}

/// Log a stack-usage snapshot at the current point. Captures:
/// - current $sp (estimated via address of a stack-allocated local)
/// - the VMA from /proc/self/maps that currently contains $sp — i.e. the
/// ACTUAL stack region we are on. For pthread workers this is the
/// "[stack]"-tagged mapping; for boost-context coroutines it's an
/// anonymous rw-p mapping. Either way the bounds are real.
/// - bytes used (region_end - sp)
/// - bytes remaining (sp - region_start)
/// - the VMA tag so we can tell "[stack]" vs "[anon]" (= coroutine)
///
/// Call this at suspected stack-pressure choke points (every FFI entry,
/// before each candle op) to map out where exactly the budget is burnt.
/// Safe to call from any thread; no global state mutation.
pub(crate) fn stack_probe(label: &str) {
let probe: u8 = 0;
let sp = &probe as *const u8 as usize;
let tid = unsafe { libc::gettid() };
match stack_region_for(sp) {
Some((start, end, tag)) => {
let size = end - start;
let used = end.saturating_sub(sp);
let remaining = sp.saturating_sub(start);
diag_log(&format!(
"[stack_probe] {label}: tid={tid} sp=0x{sp:016x} \
region=0x{start:016x}-0x{end:016x} tag={tag} \
size={size} used={used} remaining={remaining}"
));
}
None => {
diag_log(&format!(
"[stack_probe] {label}: tid={tid} sp=0x{sp:016x} \
(no /proc/self/maps match — sp may be in a freshly-allocated region)"
));
}
}
}

/// One-shot installation of panic and signal diagnostics. Called from
/// GetLibFuncs() — runs once when the daemon dlopens the lib.
fn install_diagnostics() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
// RUST_BACKTRACE=full so std::backtrace::Backtrace::force_capture()
// produces a full unwind regardless of the daemon's env.
std::env::set_var("RUST_BACKTRACE", "full");

// Rust panic hook: write to file with full backtrace.
std::panic::set_hook(Box::new(|info| {
let loc = info
.location()
.map(|l| format!("{}:{}:{}", l.file(), l.line(), l.column()))
.unwrap_or_else(|| "<unknown>".to_string());
let payload = info
.payload()
.downcast_ref::<&str>()
.copied()
.or_else(|| info.payload().downcast_ref::<String>().map(|s| s.as_str()))
.unwrap_or("<non-string payload>");
let bt = std::backtrace::Backtrace::force_capture();
diag_log(&format!(
"===== manticore-embeddings PANIC =====\n\
location: {loc}\n\
payload: {payload}\n\
backtrace:\n{bt}\n\
====================================="
));
}));

// Native signal handler for SIGSEGV/SIGBUS/SIGILL/SIGABRT. These are
// what a stack overflow or heap-corruption abort looks like at the
// OS level. We write a marker to the diag log so we can correlate
// the OS-level event with the daemon's crash dump.
//
// Inside a signal handler we are *very* restricted (async-signal-safe
// only). We deliberately use the lowest-level write(2) syscall via
// libc and avoid Rust formatting / allocation.
install_signal_diag();

// Stamp on lib load so the file always has at least one line.
diag_log("===== manticore-embeddings loaded =====");
});
}

#[cfg(unix)]
fn install_signal_diag() {
// We install handlers for the signals that wrap up our crash scenarios:
// SIGSEGV / SIGBUS — bad memory access (stack overflow past guard,
// NULL deref, unmapped page, etc.)
// SIGABRT — glibc malloc consistency abort, assertion fail
// SIGILL — undefined-behaviour sanitiser trip on some setups
//
// The handler writes a short prefix to the diag log (via raw write(2),
// async-signal-safe) and then re-raises the signal with the default
// disposition so the daemon's own crash handler still runs.
use std::sync::atomic::{AtomicBool, Ordering};
static INSTALLED: AtomicBool = AtomicBool::new(false);
if INSTALLED.swap(true, Ordering::SeqCst) {
return;
}

extern "C" fn handler(signum: libc::c_int) {
// Async-signal-safe: no allocation, no formatted IO, just raw write.
let prefix: &[u8] = match signum {
libc::SIGSEGV => b"===== manticore-embeddings SIGSEGV =====\n",
libc::SIGBUS => b"===== manticore-embeddings SIGBUS =====\n",
libc::SIGABRT => b"===== manticore-embeddings SIGABRT =====\n",
libc::SIGILL => b"===== manticore-embeddings SIGILL =====\n",
_ => b"===== manticore-embeddings SIGNAL =====\n",
};
// O_WRONLY|O_CREAT|O_APPEND; mode 0644 — async-signal-safe via libc.
unsafe {
let path = b"/tmp/manticore-embeddings-diag.log\0";
let fd = libc::open(
path.as_ptr() as *const libc::c_char,
libc::O_WRONLY | libc::O_CREAT | libc::O_APPEND,
0o644,
);
if fd >= 0 {
let _ = libc::write(fd, prefix.as_ptr() as *const _, prefix.len());
let _ = libc::close(fd);
}
}

// Re-raise with default disposition so the daemon's own handler still
// runs (it produces the existing FATAL CRASH DUMP we already see).
unsafe {
libc::signal(signum, libc::SIG_DFL);
libc::raise(signum);
}
}

// sigaction with SA_ONSTACK so the handler runs even when the original
// stack is overflowed — vital for diagnosing stack overflow specifically.
unsafe {
// 8 KB alternate signal stack — enough for our 1-line write.
const ALT_STACK_SIZE: usize = 16 * 1024;
let stack_mem = Box::leak(Box::new([0u8; ALT_STACK_SIZE]));
let mut altstack: libc::stack_t = std::mem::zeroed();
altstack.ss_sp = stack_mem.as_mut_ptr() as *mut libc::c_void;
altstack.ss_size = ALT_STACK_SIZE;
altstack.ss_flags = 0;
let _ = libc::sigaltstack(&altstack, std::ptr::null_mut());

let mut sa: libc::sigaction = std::mem::zeroed();
sa.sa_sigaction = handler as *const () as usize;
sa.sa_flags = libc::SA_ONSTACK | libc::SA_RESETHAND;
libc::sigemptyset(&mut sa.sa_mask);

for sig in [libc::SIGSEGV, libc::SIGBUS, libc::SIGABRT, libc::SIGILL] {
libc::sigaction(sig, &sa, std::ptr::null_mut());
}
}
}

#[cfg(not(unix))]
fn install_signal_diag() {}

#[no_mangle]
pub extern "C" fn GetLibFuncs() -> *const EmbedLib {
// Log panics to stderr (with location + payload) instead of silently
// discarding them. The previous no-op hook was hiding the root cause of
// FFI-boundary crashes; we still need catch_unwind at every extern "C"
// entry point (see text_model_wrapper.rs) to convert the unwind into a
// clean error return, but the hook here ensures the original panic site
// appears in the daemon's log before we swallow it.
std::panic::set_hook(Box::new(|info| {
let loc = info
.location()
.map(|l| format!("{}:{}:{}", l.file(), l.line(), l.column()))
.unwrap_or_else(|| "<unknown>".to_string());
let payload = info
.payload()
.downcast_ref::<&str>()
.copied()
.or_else(|| info.payload().downcast_ref::<String>().map(|s| s.as_str()))
.unwrap_or("<non-string payload>");
eprintln!("manticore-knn-embeddings: panic at {loc}: {payload}");
}));
install_diagnostics();
&LIB
}
8 changes: 8 additions & 0 deletions embeddings/src/model/local.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::TextModel;
use crate::error::LibError;
use crate::ffi::stack_probe;
use crate::utils::{get_hidden_size, get_max_input_length, normalize, pre_truncate_text};
use candle_core::quantized::gguf_file;
use candle_core::{DType, Device, IndexOp, Tensor};
Expand Down Expand Up @@ -472,12 +473,15 @@ impl BertEmbeddingModel {
// another thread re-enters forward. Holding the lock until the
// f32 data has been copied into an owned Vec eliminates the race.
if batch.len() == 1 {
stack_probe("predict_chunks_b1:entry");
let chunk = &batch[0];
let token_ids = Tensor::new(chunk.as_slice(), &self.device)?.unsqueeze(0)?;
let token_type_ids = token_ids.zeros_like()?;
let mut emb_vec: Vec<f32> = {
let model = self.model.lock().unwrap_or_else(|e| e.into_inner());
stack_probe("predict_chunks_b1:before_forward");
let emb = model.forward(&token_ids, &token_type_ids, None)?;
stack_probe("predict_chunks_b1:after_forward");
let seq_len = token_ids.dims()[1];
let summed = emb.sum(1)?.to_dtype(DType::F32)?;
let divisor = Tensor::new(seq_len as f32, &self.device)?;
Expand Down Expand Up @@ -1231,6 +1235,7 @@ impl TextModel for LocalModel {
// Lock scope covers the full candle pipeline through to_vec1; see
// BertEmbeddingModel::predict_chunks for the concurrency rationale.
if texts.len() == 1 {
stack_probe("bert_predict_single:entry");
let text = pre_truncate_text(texts[0], m.max_input_len);
let enc = m
.tokenizer
Expand All @@ -1241,9 +1246,12 @@ impl TextModel for LocalModel {

let token_ids = Tensor::new(ids, &m.device)?.unsqueeze(0)?;
let token_type_ids = token_ids.zeros_like()?;
stack_probe("bert_predict_single:before_lock");
let mut emb_vec: Vec<f32> = {
let model = m.model.lock().unwrap_or_else(|e| e.into_inner());
stack_probe("bert_predict_single:before_forward");
let emb = model.forward(&token_ids, &token_type_ids, None)?;
stack_probe("bert_predict_single:after_forward");
let seq_len = token_ids.dims()[1];
let summed = emb.sum(1)?.to_dtype(DType::F32)?;
let divisor = Tensor::new(seq_len as f32, &m.device)?;
Expand Down
Loading
Loading