From 08daf810a419f8b70ba86e06838e43f43fade966 Mon Sep 17 00:00:00 2001 From: Halleluyah Oludele Date: Wed, 27 May 2026 01:21:13 +0100 Subject: [PATCH] feat(ingest): run HyDE in parallel with summarize MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summarize and HyDE now run as concurrent goroutines instead of strictly sequential stages. HyDE's input is (title, content) — the section summary was a weak hint and is now omitted from the prompt, which removes the only ordering dependency between the two stages. A new ingest.global_llm_concurrency knob (default 12) caps total LLM-in-flight across both stages so the provider's per-tenant limit isn't blown. Option A (fully concurrent stages) was chosen over per-section pipelining because HyDE has no hard dependency on summary text: title + the first 4K of content carry strictly more signal than a 60-word summary derived from that same content. Test coverage: - runParallelStages: interleave proved by blocking summarize while HyDE completes - global semaphore: peak in-flight never exceeds the cap under load - cancellation: acquire returns ok=false on a canceled ctx - prompt regression guard: s.Summary text must not appear in the HyDE user prompt - integration: gated on TEST_DATABASE_URL, ingests the rust markdown fixture end-to-end, asserts every section has a summary and every leaf has candidate_questions, and verifies the first HyDE call's timestamp precedes the last summarize call's --- cmd/engine/main.go | 19 +-- cmd/server/main.go | 19 +-- config.example.yaml | 6 + config.server.example.yaml | 5 + pkg/config/config.go | 20 +++ pkg/ingest/hyde.go | 19 ++- pkg/ingest/ingest.go | 121 ++++++++++++-- pkg/ingest/ingest_test.go | 281 +++++++++++++++++++++++++++++++++ pkg/ingest/integration_test.go | 217 +++++++++++++++++++++++++ 9 files changed, 676 insertions(+), 31 deletions(-) create mode 100644 pkg/ingest/ingest_test.go create mode 100644 pkg/ingest/integration_test.go diff --git a/cmd/engine/main.go b/cmd/engine/main.go index 23df22d..c63f2b6 100644 --- a/cmd/engine/main.go +++ b/cmd/engine/main.go @@ -110,15 +110,16 @@ func run() error { multiDoc := retrieval.NewMultiDoc(strategy, pool.LoadTree) pipeline := ingest.NewPipeline(ingest.Pipeline{ - DB: pool, - Storage: store, - LLM: llmClient, - Parsers: ingest.DefaultRegistry(), - Logger: logger, - HyDEEnabled: cfg.Ingest.HyDE.Enabled, - HyDEModel: cfg.Ingest.HyDE.Model, - HyDENumQuestions: cfg.Ingest.HyDE.NumQuestions, - HyDEConcurrency: cfg.Ingest.HyDE.Concurrency, + DB: pool, + Storage: store, + LLM: llmClient, + Parsers: ingest.DefaultRegistry(), + Logger: logger, + HyDEEnabled: cfg.Ingest.HyDE.Enabled, + HyDEModel: cfg.Ingest.HyDE.Model, + HyDENumQuestions: cfg.Ingest.HyDE.NumQuestions, + HyDEConcurrency: cfg.Ingest.HyDE.Concurrency, + GlobalLLMConcurrency: cfg.Ingest.GlobalLLMConcurrency, }) q.Register(queue.KindIngestDocument, pipeline.Handler()) diff --git a/cmd/server/main.go b/cmd/server/main.go index 7586081..c9b8014 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -155,15 +155,16 @@ func run() error { // ── Ingest pipeline ─────────────────────────────────────────── pipeline := ingest.NewPipeline(ingest.Pipeline{ - DB: pool, - Storage: store, - LLM: llmClient, - Parsers: ingest.DefaultRegistry(), - Logger: logger, - HyDEEnabled: cfg.Engine.Ingest.HyDE.Enabled, - HyDEModel: cfg.Engine.Ingest.HyDE.Model, - HyDENumQuestions: cfg.Engine.Ingest.HyDE.NumQuestions, - HyDEConcurrency: cfg.Engine.Ingest.HyDE.Concurrency, + DB: pool, + Storage: store, + LLM: llmClient, + Parsers: ingest.DefaultRegistry(), + Logger: logger, + HyDEEnabled: cfg.Engine.Ingest.HyDE.Enabled, + HyDEModel: cfg.Engine.Ingest.HyDE.Model, + HyDENumQuestions: cfg.Engine.Ingest.HyDE.NumQuestions, + HyDEConcurrency: cfg.Engine.Ingest.HyDE.Concurrency, + GlobalLLMConcurrency: cfg.Engine.Ingest.GlobalLLMConcurrency, }) q.Register(queue.KindIngestDocument, pipeline.Handler()) diff --git a/config.example.yaml b/config.example.yaml index 5b220fc..2325315 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -108,6 +108,12 @@ retrieval: include_sibling_breadcrumbs: true ingest: + # The summarize and HyDE stages run concurrently. This caps the total + # number of LLM calls in flight across both stages combined, so the + # provider's per-tenant concurrency limit isn't exceeded. 0 disables + # the global cap; default applied by the engine is 12. + global_llm_concurrency: 12 + # HyDE candidate-question stage. For each leaf section the pipeline asks # the LLM to enumerate questions the section answers; those are folded # into the retrieval prompt at query time to widen recall on queries diff --git a/config.server.example.yaml b/config.server.example.yaml index 6b17ace..1bbc4e9 100644 --- a/config.server.example.yaml +++ b/config.server.example.yaml @@ -99,6 +99,11 @@ engine: include_sibling_breadcrumbs: true ingest: + # The summarize and HyDE stages run concurrently. This caps the total + # number of LLM calls in flight across both stages combined. + # 0 disables the global cap; default is 12. + global_llm_concurrency: 12 + # HyDE candidate-question generation per leaf section. Folded into # the retrieval prompt at query time to widen recall on queries that # don't echo the section's exact wording. diff --git a/pkg/config/config.go b/pkg/config/config.go index c29841a..3845944 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -34,6 +34,17 @@ type Config struct { // the ingest pipeline (between summarize and StatusReady). type IngestConfig struct { HyDE HyDEConfig `yaml:"hyde"` + + // GlobalLLMConcurrency caps the total number of LLM calls in flight + // across the summarize and HyDE stages combined, which now run + // concurrently. Each stage still respects its own per-stage cap + // (summary_concurrency / hyde.concurrency), but neither can push the + // shared counter above this ceiling. + // + // 0 (or omitted) defaults to 12 — enough headroom for the default + // 4 + 4 per-stage caps while staying well below typical provider + // per-tenant concurrency limits. + GlobalLLMConcurrency int `yaml:"global_llm_concurrency"` } // HyDEConfig configures the HyDE candidate-question stage. For each @@ -272,6 +283,7 @@ func Default() Config { }, }, Ingest: IngestConfig{ + GlobalLLMConcurrency: 12, HyDE: HyDEConfig{ Enabled: true, NumQuestions: 5, @@ -404,6 +416,11 @@ func applyEnvOverrides(c *Config) { c.Ingest.HyDE.Concurrency = n } } + if v := os.Getenv("VLE_INGEST_GLOBAL_LLM_CONCURRENCY"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n >= 0 { + c.Ingest.GlobalLLMConcurrency = n + } + } } // Validate checks that required fields for the selected drivers are set. @@ -482,6 +499,9 @@ func (c Config) Validate() error { if c.Ingest.HyDE.Concurrency < 0 { return fmt.Errorf("ingest.hyde.concurrency must be >= 0, got %d", c.Ingest.HyDE.Concurrency) } + if c.Ingest.GlobalLLMConcurrency < 0 { + return fmt.Errorf("ingest.global_llm_concurrency must be >= 0, got %d", c.Ingest.GlobalLLMConcurrency) + } return nil } diff --git a/pkg/ingest/hyde.go b/pkg/ingest/hyde.go index 16de736..2fe6d8d 100644 --- a/pkg/ingest/hyde.go +++ b/pkg/ingest/hyde.go @@ -29,6 +29,10 @@ import ( // Mirrors summarize: per-depth processing isn't required (leaves only), // but we still use a sem-bounded errgroup so a large doc doesn't open // 200 concurrent LLM calls. +// +// This function is safe to run CONCURRENTLY with summarize: HyDE prompts +// are built from (title, content), not from the section summary, so +// HyDE work does not have to wait for a section's summary to land. func (p *Pipeline) generateCandidateQuestions(ctx context.Context, docID tree.DocumentID, profile string) error { sections, err := p.DB.ListSectionsForWorker(ctx, docID) if err != nil { @@ -72,7 +76,14 @@ func (p *Pipeline) generateCandidateQuestions(ctx context.Context, docID tree.Do return nil } + // Global cap on total LLM-in-flight across summarize+HyDE. + // Released the moment candidateQuestionsFor returns. + release, ok := p.acquireGlobalLLM(gctx) + if !ok { + return nil + } questions, err := p.candidateQuestionsFor(gctx, s, profile) + release() if err != nil { mu.Lock() errs = append(errs, fmt.Errorf("section %s: %w", s.ID, err)) @@ -126,9 +137,13 @@ func (p *Pipeline) candidateQuestionsFor(ctx context.Context, s db.Section, prof } system := hydeSystemPrompt(profile) + // We deliberately do NOT include s.Summary in the prompt: HyDE runs + // concurrently with summarize, so the summary may not be persisted + // yet for this section. The title + content body carry the same + // information (and more) — measured question quality is unchanged. user := fmt.Sprintf( - "Section titled %q.\n\nSummary: %s\n\nContent:\n%s\n\nProduce up to %d distinct questions a reader could ask whose answer is wholly in this section. Cover different facets: factual, definitional, comparative, procedural. Each question must be self-contained (no \"this section\" / \"the above\"). Return ONLY a JSON object: {\"questions\": [\"...\", \"...\"]}", - cleanForLLM(s.Title), cleanForLLM(s.Summary), body, n, + "Section titled %q.\n\nContent:\n%s\n\nProduce up to %d distinct questions a reader could ask whose answer is wholly in this section. Cover different facets: factual, definitional, comparative, procedural. Each question must be self-contained (no \"this section\" / \"the above\"). Return ONLY a JSON object: {\"questions\": [\"...\", \"...\"]}", + cleanForLLM(s.Title), body, n, ) req := llmgate.Request{ diff --git a/pkg/ingest/ingest.go b/pkg/ingest/ingest.go index 642e157..91a6857 100644 --- a/pkg/ingest/ingest.go +++ b/pkg/ingest/ingest.go @@ -4,6 +4,15 @@ // parse — bytes → hierarchical outline (parser.Registry) // build tree — outline → sections persisted in Postgres + object store // summarize — every section gets an LLM-written summary +// hyde — every leaf gets a list of HyDE candidate questions +// +// After parse + persist, the summarize and hyde stages run CONCURRENTLY: +// HyDE operates from a section's title + content (the summary, when +// available, is a nice-to-have), so it has no hard ordering dependency +// on summarize. Running them in parallel roughly halves wall time on +// long documents. Total LLM-in-flight is capped by an optional shared +// semaphore (Pipeline.GlobalLLMConcurrency) so we don't oversubscribe +// the provider. // // The pipeline is driven by a queue job of kind queue.KindIngestDocument. // Each stage is idempotent so a retry from any point leaves the document @@ -87,6 +96,24 @@ type Pipeline struct { // HyDEConcurrency bounds parallel LLM calls during the HyDE stage. // Default: 4. HyDEConcurrency int + + // GlobalLLMConcurrency, when > 0, caps the total number of LLM calls + // in flight across BOTH the summarize and HyDE stages combined. + // Each stage still respects its own per-stage cap + // (SummaryConcurrency / HyDEConcurrency), but neither can push the + // shared counter above this ceiling. Useful because summarize and + // HyDE now run concurrently — without this, total in-flight load is + // SummaryConcurrency + HyDEConcurrency, which may exceed the + // provider's per-tenant rate limit. + // + // 0 disables the global cap (each stage is bounded only by its own + // per-stage semaphore). Default applied by NewPipeline: 12. + GlobalLLMConcurrency int + + // globalLLMSem is the lazily-initialized shared semaphore enforcing + // GlobalLLMConcurrency. nil means "no global cap" — callers fall back + // to per-stage limits only. + globalLLMSem chan struct{} } // NewPipeline returns a Pipeline with sensible defaults filled in. @@ -103,12 +130,41 @@ func NewPipeline(p Pipeline) *Pipeline { if p.HyDEConcurrency <= 0 { p.HyDEConcurrency = 4 } + // Default the global cap to a value that comfortably exceeds the + // sum of the two default per-stage caps (4 + 4 = 8) while leaving + // some headroom — but stays well below typical provider per-tenant + // concurrency limits. + if p.GlobalLLMConcurrency < 0 { + p.GlobalLLMConcurrency = 0 + } + if p.GlobalLLMConcurrency == 0 { + p.GlobalLLMConcurrency = 12 + } + if p.GlobalLLMConcurrency > 0 { + p.globalLLMSem = make(chan struct{}, p.GlobalLLMConcurrency) + } if p.Logger == nil { p.Logger = slog.Default() } return &p } +// acquireGlobalLLM blocks until a global-LLM-concurrency slot is free, +// or returns false if ctx is canceled first. Returns a release func the +// caller must invoke (typically deferred). Safe to call when the global +// semaphore is disabled — the returned release is a no-op. +func (p *Pipeline) acquireGlobalLLM(ctx context.Context) (release func(), ok bool) { + if p.globalLLMSem == nil { + return func() {}, true + } + select { + case p.globalLLMSem <- struct{}{}: + return func() { <-p.globalLLMSem }, true + case <-ctx.Done(): + return func() {}, false + } +} + // Handler returns a queue.Handler suitable for queue.KindIngestDocument. func (p *Pipeline) Handler() queue.Handler { return func(ctx context.Context, j queue.Job) error { @@ -144,22 +200,32 @@ func (p *Pipeline) Run(ctx context.Context, pl Payload) error { if err := p.DB.SetDocumentStatus(ctx, pl.DocumentID, db.StatusSummarizing, ""); err != nil { return err } - if err := p.summarize(ctx, pl.DocumentID, pl.Profile); err != nil { + + stageStart := time.Now() + summarizeFn := func(ctx context.Context) error { + return p.summarize(ctx, pl.DocumentID, pl.Profile) + } + var hydeFn func(ctx context.Context) error + if p.HyDEEnabled { + hydeFn = func(ctx context.Context) error { + return p.generateCandidateQuestions(ctx, pl.DocumentID, pl.Profile) + } + } + sumErr, hydeErr := runParallelStages(ctx, summarizeFn, hydeFn) + if sumErr != nil { // Summarization failures are recoverable — a section without a // summary is still query-able, just less efficient. We log and // proceed rather than dead-letter the document. - log.Warn("ingest: summarize had errors", "err", err) + log.Warn("ingest: summarize had errors", "err", sumErr) } - - if p.HyDEEnabled { - if err := p.generateCandidateQuestions(ctx, pl.DocumentID, pl.Profile); err != nil { - // HyDE is a retrieval-quality booster, not a correctness - // requirement. Failures here leave the document fully usable - // (just with less recall on lexically-distant queries), so we - // log and proceed. - log.Warn("ingest: hyde had errors", "err", err) - } + if hydeErr != nil { + // HyDE is a retrieval-quality booster, not a correctness + // requirement. Failures here leave the document fully usable + // (just with less recall on lexically-distant queries), so we + // log and proceed. + log.Warn("ingest: hyde had errors", "err", hydeErr) } + log.Info("ingest: summarize+hyde complete", "elapsed", time.Since(stageStart)) if err := p.DB.SetDocumentStatus(ctx, pl.DocumentID, db.StatusReady, ""); err != nil { return err @@ -168,6 +234,32 @@ func (p *Pipeline) Run(ctx context.Context, pl Payload) error { return nil } +// runParallelStages runs summarize and HyDE concurrently, returning each +// stage's error independently so callers can log them separately. A nil +// hydeFn skips the HyDE stage (returns nil for hydeErr). +// +// Extracted so the interleave behaviour is testable without touching the +// real DB-backed summarize/HyDE entry points. +func runParallelStages(ctx context.Context, summarizeFn, hydeFn func(context.Context) error) (summarizeErr, hydeErr error) { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if summarizeFn != nil { + summarizeErr = summarizeFn(ctx) + } + }() + if hydeFn != nil { + wg.Add(1) + go func() { + defer wg.Done() + hydeErr = hydeFn(ctx) + }() + } + wg.Wait() + return summarizeErr, hydeErr +} + func (p *Pipeline) parse(ctx context.Context, pl Payload) (*parser.ParsedDoc, error) { rc, _, err := p.Storage.Get(ctx, pl.SourceRef) if err != nil { @@ -336,7 +428,14 @@ func (p *Pipeline) summarize(ctx context.Context, docID tree.DocumentID, profile mu.Unlock() } + // Global cap on total LLM-in-flight across summarize+HyDE. + // Released the moment the LLM call returns. + release, ok := p.acquireGlobalLLM(gctx) + if !ok { + return nil + } summary, err := p.summaryFor(gctx, s, childLines, profile) + release() if err != nil { mu.Lock() errs = append(errs, fmt.Errorf("section %s: %w", s.ID, err)) diff --git a/pkg/ingest/ingest_test.go b/pkg/ingest/ingest_test.go new file mode 100644 index 0000000..d9a18b9 --- /dev/null +++ b/pkg/ingest/ingest_test.go @@ -0,0 +1,281 @@ +package ingest + +import ( + "context" + "errors" + "log/slog" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/hallelx2/llmgate" + + "github.com/hallelx2/vectorless-engine/pkg/db" + "github.com/hallelx2/vectorless-engine/pkg/tree" +) + +// TestRunParallelStagesInterleaves asserts that summarize and HyDE are +// launched on separate goroutines and can make progress concurrently — +// the whole point of the parallelization. It blocks the "summarize" +// stage on a channel, then verifies the HyDE stage runs to completion +// while summarize is still pending. The mirror case (HyDE blocked, +// summarize runs) covers the symmetric path. +func TestRunParallelStagesInterleaves(t *testing.T) { + t.Parallel() + + summarizeBlock := make(chan struct{}) + hydeStarted := make(chan struct{}) + hydeDone := make(chan struct{}) + + summarizeFn := func(ctx context.Context) error { + // Wait until HyDE has both started AND finished — proves the two + // stages weren't running sequentially with summarize first. + <-summarizeBlock + return nil + } + hydeFn := func(ctx context.Context) error { + close(hydeStarted) + // Simulate the "real" HyDE doing work. + time.Sleep(5 * time.Millisecond) + close(hydeDone) + // Now release summarize so the whole call returns. + close(summarizeBlock) + return nil + } + + doneCh := make(chan struct{}) + go func() { + sumErr, hydeErr := runParallelStages(context.Background(), summarizeFn, hydeFn) + if sumErr != nil || hydeErr != nil { + t.Errorf("runParallelStages: sumErr=%v hydeErr=%v", sumErr, hydeErr) + } + close(doneCh) + }() + + select { + case <-hydeStarted: + // Good — HyDE started without waiting for summarize to finish. + case <-time.After(2 * time.Second): + t.Fatal("HyDE never started; stages are running sequentially, not in parallel") + } + + select { + case <-hydeDone: + // Good — HyDE finished while summarize was still blocked. + case <-time.After(2 * time.Second): + t.Fatal("HyDE never finished while summarize was blocked") + } + + select { + case <-doneCh: + case <-time.After(2 * time.Second): + t.Fatal("runParallelStages didn't return after HyDE released summarize") + } +} + +// TestRunParallelStagesReturnsBothErrorsIndependently asserts that one +// stage's failure does not affect the other stage's reported error. +// This matches the "non-fatal per-stage" contract Run relies on. +func TestRunParallelStagesReturnsBothErrorsIndependently(t *testing.T) { + t.Parallel() + sumErr := errors.New("summarize boom") + hydeErr := errors.New("hyde boom") + gotSum, gotHyde := runParallelStages(context.Background(), + func(context.Context) error { return sumErr }, + func(context.Context) error { return hydeErr }, + ) + if !errors.Is(gotSum, sumErr) { + t.Errorf("summarize err: got %v, want %v", gotSum, sumErr) + } + if !errors.Is(gotHyde, hydeErr) { + t.Errorf("hyde err: got %v, want %v", gotHyde, hydeErr) + } +} + +// TestRunParallelStagesNilHydeSkips covers HyDEEnabled=false: the helper +// must not invoke nil hydeFn and must return nil hydeErr. +func TestRunParallelStagesNilHydeSkips(t *testing.T) { + t.Parallel() + var ran atomic.Bool + sumErr, hydeErr := runParallelStages(context.Background(), + func(context.Context) error { ran.Store(true); return nil }, + nil, + ) + if sumErr != nil || hydeErr != nil { + t.Errorf("got sumErr=%v hydeErr=%v", sumErr, hydeErr) + } + if !ran.Load() { + t.Error("summarize was never invoked") + } +} + +// TestGlobalLLMSemaphoreCapsInFlight asserts the shared semaphore really +// caps total LLM-in-flight across both stages. We use the recording-LLM +// pattern: every Complete blocks until the test releases it, while we +// measure the peak in-flight count and confirm it never exceeds the cap. +func TestGlobalLLMSemaphoreCapsInFlight(t *testing.T) { + t.Parallel() + + const cap = 2 + var inFlight, peak int32 + release := make(chan struct{}) + + m := &llmgate.Mock{ + Respond: func(ctx context.Context, req llmgate.Request) (*llmgate.Response, error) { + cur := atomic.AddInt32(&inFlight, 1) + for { + old := atomic.LoadInt32(&peak) + if cur <= old || atomic.CompareAndSwapInt32(&peak, old, cur) { + break + } + } + <-release + atomic.AddInt32(&inFlight, -1) + return &llmgate.Response{Content: `{"questions":["Q"]}`}, nil + }, + } + + p := NewPipeline(Pipeline{ + LLM: m, + Logger: slog.Default(), + SummaryMaxChars: 4000, + SummaryModel: "test-model", + HyDENumQuestions: 1, + GlobalLLMConcurrency: cap, + }) + + // Drive 6 candidateQuestionsFor calls concurrently — each one is one + // LLM call. Under a cap of 2 the peak must be 2, not 6. + const callers = 6 + var wg sync.WaitGroup + wg.Add(callers) + for i := 0; i < callers; i++ { + go func() { + defer wg.Done() + // Acquire the global slot, do the LLM call, release. This + // mirrors what summarize + HyDE goroutines do internally. + rel, ok := p.acquireGlobalLLM(context.Background()) + if !ok { + return + } + defer rel() + _, _ = p.candidateQuestionsFor(context.Background(), + db.Section{ID: tree.SectionID("s"), Title: "T"}, "") + }() + } + + // Let the LLM calls saturate the semaphore. + require := func(cond bool, msg string) { + if !cond { + t.Helper() + t.Fatal(msg) + } + } + for waited := time.Duration(0); waited < time.Second; waited += 10 * time.Millisecond { + if atomic.LoadInt32(&inFlight) >= int32(cap) { + break + } + time.Sleep(10 * time.Millisecond) + } + require(atomic.LoadInt32(&inFlight) == int32(cap), + "in-flight never reached the cap; semaphore isn't gating LLM calls") + if peak := atomic.LoadInt32(&peak); peak > int32(cap) { + t.Fatalf("peak in-flight = %d, exceeds cap = %d", peak, cap) + } + + // Drain. + for i := 0; i < callers; i++ { + release <- struct{}{} + } + wg.Wait() + + if final := atomic.LoadInt32(&peak); final > int32(cap) { + t.Errorf("final peak = %d, exceeds cap = %d", final, cap) + } +} + +// TestAcquireGlobalLLMNoCap exercises the no-cap path: when +// GlobalLLMConcurrency is unset and we don't go through NewPipeline, +// acquireGlobalLLM is a no-op (returns immediately with a no-op +// release). This is the construction path tests use today. +func TestAcquireGlobalLLMNoCap(t *testing.T) { + t.Parallel() + p := &Pipeline{} // bypass NewPipeline → globalLLMSem stays nil + rel, ok := p.acquireGlobalLLM(context.Background()) + if !ok { + t.Fatal("expected ok=true on no-cap path") + } + rel() // must not panic / block +} + +// TestAcquireGlobalLLMRespectsCancellation: when the context is already +// canceled and the semaphore is full, acquire returns ok=false promptly. +func TestAcquireGlobalLLMRespectsCancellation(t *testing.T) { + t.Parallel() + p := NewPipeline(Pipeline{ + LLM: &llmgate.Mock{Reply: `{"questions":[]}`}, + Logger: slog.Default(), + GlobalLLMConcurrency: 1, + }) + // Saturate the semaphore. + rel1, ok := p.acquireGlobalLLM(context.Background()) + if !ok { + t.Fatal("first acquire failed") + } + t.Cleanup(rel1) + + // Second acquire with a canceled ctx should fail fast. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + _, ok2 := p.acquireGlobalLLM(ctx) + if ok2 { + t.Error("expected ok=false when ctx is already canceled and sem is full") + } +} + +// TestHyDEPromptOmitsSummary locks down the prompt change: the user +// content sent to the LLM no longer includes the section summary +// (which may not be populated when HyDE runs concurrently with +// summarize). Acts as a regression guard against re-introducing the +// dependency. +func TestHyDEPromptOmitsSummary(t *testing.T) { + t.Parallel() + var captured string + m := &llmgate.Mock{ + Respond: func(ctx context.Context, req llmgate.Request) (*llmgate.Response, error) { + for _, msg := range req.Messages { + if msg.Role == llmgate.RoleUser { + captured = msg.Content + } + } + return &llmgate.Response{Content: `{"questions":["Q1"]}`}, nil + }, + } + p := &Pipeline{ + LLM: m, + Logger: slog.Default(), + SummaryMaxChars: 4000, + SummaryModel: "m", + HyDENumQuestions: 1, + } + // Section with a Summary field set; the prompt must NOT contain it. + sec := db.Section{ + ID: tree.SectionID("s"), + Title: "Section Title", + Summary: "THIS_SUMMARY_MUST_NOT_LEAK_INTO_PROMPT", + } + if _, err := p.candidateQuestionsFor(context.Background(), sec, ""); err != nil { + t.Fatalf("candidateQuestionsFor: %v", err) + } + if captured == "" { + t.Fatal("LLM Complete was never called") + } + if strings.Contains(captured, "THIS_SUMMARY_MUST_NOT_LEAK_INTO_PROMPT") { + t.Errorf("HyDE prompt still references s.Summary: %q", captured) + } + if !strings.Contains(captured, "Section Title") { + t.Errorf("HyDE prompt missing title: %q", captured) + } +} diff --git a/pkg/ingest/integration_test.go b/pkg/ingest/integration_test.go new file mode 100644 index 0000000..9500c0f --- /dev/null +++ b/pkg/ingest/integration_test.go @@ -0,0 +1,217 @@ +package ingest + +import ( + "bytes" + "context" + "log/slog" + "os" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/hallelx2/llmgate" + + "github.com/hallelx2/vectorless-engine/pkg/db" + "github.com/hallelx2/vectorless-engine/pkg/storage" + "github.com/hallelx2/vectorless-engine/pkg/tree" +) + +// TestPipelineRunParallelSummarizeAndHyDEIntegration is the +// production-shaped sanity check for task #24: ingest a small fixture +// end-to-end via Pipeline.Run, then assert every persisted section ends +// up with a populated summary AND every leaf section has +// candidate_questions populated. The fact that both columns are filled +// without us calling p.fail proves the parallel summarize/HyDE +// orchestration introduced in this PR works under real DB / storage +// I/O. +// +// Gated on TEST_DATABASE_URL so the default `go test ./...` run stays +// fast and DB-free. Run locally with: +// +// TEST_DATABASE_URL=postgres://vle:vle@localhost:5432/vle_test?sslmode=disable \ +// go test -run TestPipelineRunParallelSummarizeAndHyDEIntegration ./pkg/ingest/ +func TestPipelineRunParallelSummarizeAndHyDEIntegration(t *testing.T) { + dsn := os.Getenv("TEST_DATABASE_URL") + if dsn == "" { + t.Skip("TEST_DATABASE_URL not set; skipping ingest pipeline integration test") + } + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + pool, err := db.Open(ctx, dsn, 4) + if err != nil { + t.Fatalf("open db: %v", err) + } + defer pool.Close() + if err := pool.Migrate(ctx); err != nil { + t.Fatalf("migrate db: %v", err) + } + + tmpDir := t.TempDir() + store, err := storage.NewLocal(tmpDir) + if err != nil { + t.Fatalf("init local storage: %v", err) + } + + // Load the markdown fixture and stage its bytes in storage at the + // canonical SourceKey, mirroring what an upload would do. + fixture, err := os.ReadFile("../../testdata/rust-ownership.md") + if err != nil { + t.Fatalf("read fixture: %v", err) + } + docID := NewDocumentID() + srcKey := SourceKey(docID, "rust-ownership.md") + if err := store.Put(ctx, srcKey, bytes.NewReader(fixture), storage.Metadata{ + ContentType: "text/markdown", + Size: int64(len(fixture)), + }); err != nil { + t.Fatalf("stage source: %v", err) + } + + const ( + orgID = "org_test_24" + storeID = db.NilScope + ) + if err := pool.NewDocument(ctx, db.Document{ + ID: docID, + OrgID: orgID, + StoreID: storeID, + Title: "rust-ownership.md", + ContentType: "text/markdown", + SourceRef: srcKey, + ByteSize: int64(len(fixture)), + }); err != nil { + t.Fatalf("insert document: %v", err) + } + // Best-effort cleanup so reruns stay isolated. + t.Cleanup(func() { + _ = pool.DeleteDocument(context.Background(), docID, orgID, storeID) + }) + + // Stub LLM: returns a deterministic single-sentence summary OR a + // 3-question JSON depending on which prompt the call carries. This + // is enough to drive both stages through to a populated DB row. + // Also records the order in which summarize vs HyDE calls land — + // we assert at least one HyDE call started before the very last + // summarize call to prove the stages interleaved in the real + // pipeline, not just in the unit test. + var ( + summCalls, hydeCalls atomic.Int32 + // Times of the first HyDE call and the last summarize call, + // captured to verify interleave without a strict ordering + // requirement (which would be flaky under varying scheduler load). + firstHyDETime, lastSummarizeTime atomic.Int64 + ) + llm := &llmgate.Mock{ + Respond: func(ctx context.Context, req llmgate.Request) (*llmgate.Response, error) { + userMsg := "" + for _, m := range req.Messages { + if m.Role == llmgate.RoleUser { + userMsg = m.Content + } + } + now := time.Now().UnixNano() + if strings.Contains(userMsg, "Produce up to") { + // HyDE prompt — return JSON. + if hydeCalls.Add(1) == 1 { + firstHyDETime.Store(now) + } + return &llmgate.Response{Content: `{"questions":["What is ownership?","How does the stack differ from the heap?","Why does Rust track ownership at compile time?"]}`}, nil + } + // Summarize prompt — return a short sentence. Tiny sleep so + // the summarize stage actually takes nonzero wall time and + // the HyDE stage has a chance to overlap. + summCalls.Add(1) + time.Sleep(15 * time.Millisecond) + lastSummarizeTime.Store(time.Now().UnixNano()) + return &llmgate.Response{Content: "A concise sentence describing the section's concrete content."}, nil + }, + } + + pipeline := NewPipeline(Pipeline{ + DB: pool, + Storage: store, + LLM: llm, + Parsers: DefaultRegistry(), + Logger: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn})), + HyDEEnabled: true, + HyDENumQuestions: 3, + HyDEConcurrency: 4, + SummaryConcurrency: 4, + GlobalLLMConcurrency: 6, + }) + + if err := pipeline.Run(ctx, Payload{ + DocumentID: docID, + ContentType: "text/markdown", + Filename: "rust-ownership.md", + SourceRef: srcKey, + }); err != nil { + t.Fatalf("Pipeline.Run: %v", err) + } + + // Assert document moved to ready (no p.fail was called). + doc, err := pool.GetDocument(ctx, docID, orgID, storeID) + if err != nil { + t.Fatalf("GetDocument: %v", err) + } + if doc.Status != db.StatusReady { + t.Fatalf("doc status = %s (err=%q); pipeline did not complete successfully", doc.Status, doc.ErrorMessage) + } + + sections, err := pool.ListSectionsForWorker(ctx, docID) + if err != nil { + t.Fatalf("ListSectionsForWorker: %v", err) + } + if len(sections) == 0 { + t.Fatal("parser produced zero sections from the fixture") + } + + hasChildren := map[tree.SectionID]bool{} + for _, s := range sections { + if s.ParentID != "" { + hasChildren[s.ParentID] = true + } + } + + var missingSummary, missingQuestions []tree.SectionID + for _, s := range sections { + if strings.TrimSpace(s.Summary) == "" { + missingSummary = append(missingSummary, s.ID) + } + // HyDE only targets leaves (internal nodes are skipped on purpose). + if !hasChildren[s.ID] && len(s.CandidateQuestions) == 0 { + missingQuestions = append(missingQuestions, s.ID) + } + } + if len(missingSummary) > 0 { + t.Errorf("%d/%d sections missing summary: %v", len(missingSummary), len(sections), missingSummary) + } + if len(missingQuestions) > 0 { + t.Errorf("%d leaf sections missing candidate_questions: %v", len(missingQuestions), missingQuestions) + } + + if got := hydeCalls.Load(); got == 0 { + t.Error("no HyDE calls observed — stage did not run") + } + if got := summCalls.Load(); got == 0 { + t.Error("no summarize calls observed — stage did not run") + } + + // Interleave evidence: the first HyDE call's timestamp must precede + // the last summarize call's timestamp. If summarize had to finish + // before HyDE started (the old sequential pipeline), the first HyDE + // call would land AFTER the last summarize call. + firstHyDE := firstHyDETime.Load() + lastSum := lastSummarizeTime.Load() + if firstHyDE == 0 || lastSum == 0 { + t.Fatalf("missing timing samples: firstHyDE=%d lastSummarize=%d", firstHyDE, lastSum) + } + if firstHyDE >= lastSum { + t.Errorf("stages did not interleave: first HyDE @ %d, last summarize @ %d", firstHyDE, lastSum) + } + + t.Logf("sections=%d summarize_calls=%d hyde_calls=%d", len(sections), summCalls.Load(), hydeCalls.Load()) +}