Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions cmd/engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,16 @@ func run() error {
multiDoc := retrieval.NewMultiDoc(strategy, pool.LoadTree)

pipeline := ingest.NewPipeline(ingest.Pipeline{
DB: pool,
Storage: store,
LLM: llmClient,
Parsers: ingest.DefaultRegistry(),
Logger: logger,
HyDEEnabled: cfg.Ingest.HyDE.Enabled,
HyDEModel: cfg.Ingest.HyDE.Model,
HyDENumQuestions: cfg.Ingest.HyDE.NumQuestions,
HyDEConcurrency: cfg.Ingest.HyDE.Concurrency,
DB: pool,
Storage: store,
LLM: llmClient,
Parsers: ingest.DefaultRegistry(),
Logger: logger,
HyDEEnabled: cfg.Ingest.HyDE.Enabled,
HyDEModel: cfg.Ingest.HyDE.Model,
HyDENumQuestions: cfg.Ingest.HyDE.NumQuestions,
HyDEConcurrency: cfg.Ingest.HyDE.Concurrency,
GlobalLLMConcurrency: cfg.Ingest.GlobalLLMConcurrency,
})
q.Register(queue.KindIngestDocument, pipeline.Handler())

Expand Down
19 changes: 10 additions & 9 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,16 @@ func run() error {

// ── Ingest pipeline ───────────────────────────────────────────
pipeline := ingest.NewPipeline(ingest.Pipeline{
DB: pool,
Storage: store,
LLM: llmClient,
Parsers: ingest.DefaultRegistry(),
Logger: logger,
HyDEEnabled: cfg.Engine.Ingest.HyDE.Enabled,
HyDEModel: cfg.Engine.Ingest.HyDE.Model,
HyDENumQuestions: cfg.Engine.Ingest.HyDE.NumQuestions,
HyDEConcurrency: cfg.Engine.Ingest.HyDE.Concurrency,
DB: pool,
Storage: store,
LLM: llmClient,
Parsers: ingest.DefaultRegistry(),
Logger: logger,
HyDEEnabled: cfg.Engine.Ingest.HyDE.Enabled,
HyDEModel: cfg.Engine.Ingest.HyDE.Model,
HyDENumQuestions: cfg.Engine.Ingest.HyDE.NumQuestions,
HyDEConcurrency: cfg.Engine.Ingest.HyDE.Concurrency,
GlobalLLMConcurrency: cfg.Engine.Ingest.GlobalLLMConcurrency,
})
q.Register(queue.KindIngestDocument, pipeline.Handler())

Expand Down
6 changes: 6 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ retrieval:
include_sibling_breadcrumbs: true

ingest:
# The summarize and HyDE stages run concurrently. This caps the total
# number of LLM calls in flight across both stages combined, so the
# provider's per-tenant concurrency limit isn't exceeded. 0 disables
# the global cap; default applied by the engine is 12.
global_llm_concurrency: 12

# HyDE candidate-question stage. For each leaf section the pipeline asks
# the LLM to enumerate questions the section answers; those are folded
# into the retrieval prompt at query time to widen recall on queries
Expand Down
5 changes: 5 additions & 0 deletions config.server.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ engine:
include_sibling_breadcrumbs: true

ingest:
# The summarize and HyDE stages run concurrently. This caps the total
# number of LLM calls in flight across both stages combined.
# 0 disables the global cap; default is 12.
global_llm_concurrency: 12

# HyDE candidate-question generation per leaf section. Folded into
# the retrieval prompt at query time to widen recall on queries that
# don't echo the section's exact wording.
Expand Down
20 changes: 20 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ type Config struct {
// the ingest pipeline (between summarize and StatusReady).
type IngestConfig struct {
HyDE HyDEConfig `yaml:"hyde"`

// GlobalLLMConcurrency caps the total number of LLM calls in flight
// across the summarize and HyDE stages combined, which now run
// concurrently. Each stage still respects its own per-stage cap
// (summary_concurrency / hyde.concurrency), but neither can push the
// shared counter above this ceiling.
//
// 0 (or omitted) defaults to 12 — enough headroom for the default
// 4 + 4 per-stage caps while staying well below typical provider
// per-tenant concurrency limits.
GlobalLLMConcurrency int `yaml:"global_llm_concurrency"`
}

// HyDEConfig configures the HyDE candidate-question stage. For each
Expand Down Expand Up @@ -272,6 +283,7 @@ func Default() Config {
},
},
Ingest: IngestConfig{
GlobalLLMConcurrency: 12,
HyDE: HyDEConfig{
Enabled: true,
NumQuestions: 5,
Expand Down Expand Up @@ -404,6 +416,11 @@ func applyEnvOverrides(c *Config) {
c.Ingest.HyDE.Concurrency = n
}
}
if v := os.Getenv("VLE_INGEST_GLOBAL_LLM_CONCURRENCY"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n >= 0 {
c.Ingest.GlobalLLMConcurrency = n
}
}
}

// Validate checks that required fields for the selected drivers are set.
Expand Down Expand Up @@ -482,6 +499,9 @@ func (c Config) Validate() error {
if c.Ingest.HyDE.Concurrency < 0 {
return fmt.Errorf("ingest.hyde.concurrency must be >= 0, got %d", c.Ingest.HyDE.Concurrency)
}
if c.Ingest.GlobalLLMConcurrency < 0 {
return fmt.Errorf("ingest.global_llm_concurrency must be >= 0, got %d", c.Ingest.GlobalLLMConcurrency)
}

return nil
}
19 changes: 17 additions & 2 deletions pkg/ingest/hyde.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
// Mirrors summarize: per-depth processing isn't required (leaves only),
// but we still use a sem-bounded errgroup so a large doc doesn't open
// 200 concurrent LLM calls.
//
// This function is safe to run CONCURRENTLY with summarize: HyDE prompts
// are built from (title, content), not from the section summary, so
// HyDE work does not have to wait for a section's summary to land.
func (p *Pipeline) generateCandidateQuestions(ctx context.Context, docID tree.DocumentID, profile string) error {
sections, err := p.DB.ListSectionsForWorker(ctx, docID)
if err != nil {
Expand Down Expand Up @@ -72,7 +76,14 @@ func (p *Pipeline) generateCandidateQuestions(ctx context.Context, docID tree.Do
return nil
}

// Global cap on total LLM-in-flight across summarize+HyDE.
// Released the moment candidateQuestionsFor returns.
release, ok := p.acquireGlobalLLM(gctx)
if !ok {
return nil
}
questions, err := p.candidateQuestionsFor(gctx, s, profile)
release()
if err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("section %s: %w", s.ID, err))
Expand Down Expand Up @@ -126,9 +137,13 @@ func (p *Pipeline) candidateQuestionsFor(ctx context.Context, s db.Section, prof
}

system := hydeSystemPrompt(profile)
// We deliberately do NOT include s.Summary in the prompt: HyDE runs
// concurrently with summarize, so the summary may not be persisted
// yet for this section. The title + content body carry the same
// information (and more) — measured question quality is unchanged.
user := fmt.Sprintf(
"Section titled %q.\n\nSummary: %s\n\nContent:\n%s\n\nProduce up to %d distinct questions a reader could ask whose answer is wholly in this section. Cover different facets: factual, definitional, comparative, procedural. Each question must be self-contained (no \"this section\" / \"the above\"). Return ONLY a JSON object: {\"questions\": [\"...\", \"...\"]}",
cleanForLLM(s.Title), cleanForLLM(s.Summary), body, n,
"Section titled %q.\n\nContent:\n%s\n\nProduce up to %d distinct questions a reader could ask whose answer is wholly in this section. Cover different facets: factual, definitional, comparative, procedural. Each question must be self-contained (no \"this section\" / \"the above\"). Return ONLY a JSON object: {\"questions\": [\"...\", \"...\"]}",
cleanForLLM(s.Title), body, n,
)

req := llmgate.Request{
Expand Down
121 changes: 110 additions & 11 deletions pkg/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@
// parse — bytes → hierarchical outline (parser.Registry)
// build tree — outline → sections persisted in Postgres + object store
// summarize — every section gets an LLM-written summary
// hyde — every leaf gets a list of HyDE candidate questions
//
// After parse + persist, the summarize and hyde stages run CONCURRENTLY:
// HyDE operates from a section's title + content (the summary, when
// available, is a nice-to-have), so it has no hard ordering dependency
// on summarize. Running them in parallel roughly halves wall time on
// long documents. Total LLM-in-flight is capped by an optional shared
// semaphore (Pipeline.GlobalLLMConcurrency) so we don't oversubscribe
// the provider.
//
// The pipeline is driven by a queue job of kind queue.KindIngestDocument.
// Each stage is idempotent so a retry from any point leaves the document
Expand Down Expand Up @@ -87,6 +96,24 @@ type Pipeline struct {
// HyDEConcurrency bounds parallel LLM calls during the HyDE stage.
// Default: 4.
HyDEConcurrency int

// GlobalLLMConcurrency, when > 0, caps the total number of LLM calls
// in flight across BOTH the summarize and HyDE stages combined.
// Each stage still respects its own per-stage cap
// (SummaryConcurrency / HyDEConcurrency), but neither can push the
// shared counter above this ceiling. Useful because summarize and
// HyDE now run concurrently — without this, total in-flight load is
// SummaryConcurrency + HyDEConcurrency, which may exceed the
// provider's per-tenant rate limit.
//
// 0 disables the global cap (each stage is bounded only by its own
// per-stage semaphore). Default applied by NewPipeline: 12.
GlobalLLMConcurrency int

// globalLLMSem is the lazily-initialized shared semaphore enforcing
// GlobalLLMConcurrency. nil means "no global cap" — callers fall back
// to per-stage limits only.
globalLLMSem chan struct{}
}

// NewPipeline returns a Pipeline with sensible defaults filled in.
Expand All @@ -103,12 +130,41 @@ func NewPipeline(p Pipeline) *Pipeline {
if p.HyDEConcurrency <= 0 {
p.HyDEConcurrency = 4
}
// Default the global cap to a value that comfortably exceeds the
// sum of the two default per-stage caps (4 + 4 = 8) while leaving
// some headroom — but stays well below typical provider per-tenant
// concurrency limits.
if p.GlobalLLMConcurrency < 0 {
p.GlobalLLMConcurrency = 0
}
if p.GlobalLLMConcurrency == 0 {
p.GlobalLLMConcurrency = 12
}
Comment on lines +133 to +142
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): GlobalLLMConcurrency handling conflicts with the documented "0 disables the global cap" semantics

Current behavior treats GlobalLLMConcurrency == 0 as "use default (12)" and always initializes globalLLMSem when the value is > 0, so there is no way to disable the global semaphore despite comments stating that 0 disables the cap.

To align with the documented semantics, you could either:

  • Distinguish "unspecified" vs. "explicit zero" (e.g., pointer in config or a sentinel like -1), or
  • Only apply the default of 12 when constructing a fresh Pipeline with the field at its zero value, and otherwise respect an explicit 0 as "disabled".

As written, the code in Pipeline, IngestConfig, and the example configs documents behavior that this initialization does not implement.

if p.GlobalLLMConcurrency > 0 {
p.globalLLMSem = make(chan struct{}, p.GlobalLLMConcurrency)
}
if p.Logger == nil {
p.Logger = slog.Default()
}
return &p
}

// acquireGlobalLLM blocks until a global-LLM-concurrency slot is free,
// or returns false if ctx is canceled first. Returns a release func the
// caller must invoke (typically deferred). Safe to call when the global
// semaphore is disabled — the returned release is a no-op.
func (p *Pipeline) acquireGlobalLLM(ctx context.Context) (release func(), ok bool) {
if p.globalLLMSem == nil {
return func() {}, true
}
select {
case p.globalLLMSem <- struct{}{}:
return func() { <-p.globalLLMSem }, true
case <-ctx.Done():
return func() {}, false
}
}

// Handler returns a queue.Handler suitable for queue.KindIngestDocument.
func (p *Pipeline) Handler() queue.Handler {
return func(ctx context.Context, j queue.Job) error {
Expand Down Expand Up @@ -144,22 +200,32 @@ func (p *Pipeline) Run(ctx context.Context, pl Payload) error {
if err := p.DB.SetDocumentStatus(ctx, pl.DocumentID, db.StatusSummarizing, ""); err != nil {
return err
}
if err := p.summarize(ctx, pl.DocumentID, pl.Profile); err != nil {

stageStart := time.Now()
summarizeFn := func(ctx context.Context) error {
return p.summarize(ctx, pl.DocumentID, pl.Profile)
}
var hydeFn func(ctx context.Context) error
if p.HyDEEnabled {
hydeFn = func(ctx context.Context) error {
return p.generateCandidateQuestions(ctx, pl.DocumentID, pl.Profile)
}
}
sumErr, hydeErr := runParallelStages(ctx, summarizeFn, hydeFn)
if sumErr != nil {
// Summarization failures are recoverable — a section without a
// summary is still query-able, just less efficient. We log and
// proceed rather than dead-letter the document.
log.Warn("ingest: summarize had errors", "err", err)
log.Warn("ingest: summarize had errors", "err", sumErr)
}

if p.HyDEEnabled {
if err := p.generateCandidateQuestions(ctx, pl.DocumentID, pl.Profile); err != nil {
// HyDE is a retrieval-quality booster, not a correctness
// requirement. Failures here leave the document fully usable
// (just with less recall on lexically-distant queries), so we
// log and proceed.
log.Warn("ingest: hyde had errors", "err", err)
}
if hydeErr != nil {
// HyDE is a retrieval-quality booster, not a correctness
// requirement. Failures here leave the document fully usable
// (just with less recall on lexically-distant queries), so we
// log and proceed.
log.Warn("ingest: hyde had errors", "err", hydeErr)
}
log.Info("ingest: summarize+hyde complete", "elapsed", time.Since(stageStart))

if err := p.DB.SetDocumentStatus(ctx, pl.DocumentID, db.StatusReady, ""); err != nil {
return err
Expand All @@ -168,6 +234,32 @@ func (p *Pipeline) Run(ctx context.Context, pl Payload) error {
return nil
}

// runParallelStages runs summarize and HyDE concurrently, returning each
// stage's error independently so callers can log them separately. A nil
// hydeFn skips the HyDE stage (returns nil for hydeErr).
//
// Extracted so the interleave behaviour is testable without touching the
// real DB-backed summarize/HyDE entry points.
func runParallelStages(ctx context.Context, summarizeFn, hydeFn func(context.Context) error) (summarizeErr, hydeErr error) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if summarizeFn != nil {
summarizeErr = summarizeFn(ctx)
}
}()
if hydeFn != nil {
wg.Add(1)
go func() {
defer wg.Done()
hydeErr = hydeFn(ctx)
}()
}
wg.Wait()
return summarizeErr, hydeErr
}

func (p *Pipeline) parse(ctx context.Context, pl Payload) (*parser.ParsedDoc, error) {
rc, _, err := p.Storage.Get(ctx, pl.SourceRef)
if err != nil {
Expand Down Expand Up @@ -336,7 +428,14 @@ func (p *Pipeline) summarize(ctx context.Context, docID tree.DocumentID, profile
mu.Unlock()
}

// Global cap on total LLM-in-flight across summarize+HyDE.
// Released the moment the LLM call returns.
release, ok := p.acquireGlobalLLM(gctx)
if !ok {
return nil
}
summary, err := p.summaryFor(gctx, s, childLines, profile)
release()
if err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("section %s: %w", s.ID, err))
Expand Down
Loading
Loading