Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ msgvault can search your archive semantically using vector embeddings in additio

A separate MCP tool, `find_similar_messages`, returns nearest neighbors for a seed message. See the [Vector Search guide](https://msgvault.io/usage/vector-search/) for setup, backfill, and troubleshooting.

> **Run only one embedding process at a time.** Don't run `msgvault embeddings build`/`resume` or `repair-encoding` concurrently with a `msgvault serve` daemon — they write the same embedding state, and concurrent writers are not coordinated across processes.

## Importing from MBOX or Apple Mail

Import email from providers that offer MBOX exports or from a local Apple Mail data directory:
Expand Down
22 changes: 15 additions & 7 deletions cmd/msgvault/cmd/embed.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
var (
embedFullRebuild bool
embedYes bool
embedBackstop bool
embeddingsRetireYes bool
embeddingsRetireForceActive bool
embeddingsActivateForce bool
Expand All @@ -25,9 +26,11 @@ var embeddingsResumeCmd = &cobra.Command{
Use: "resume",
Short: "Resume or top up the current vector embedding generation",
Long: `Resume or top up the current vector embedding generation.
If a matching generation is building, this drains its pending queue and
activates it when complete. Otherwise it embeds pending rows for the
active generation.`,
If a matching generation is building, this embeds any messages still
needing embedding for it and activates it when complete. Otherwise it
embeds any messages still needing embedding for the active generation.
Pass --backstop for a full-scan pass that ignores the per-generation
watermark, catching any straggler messages the incremental scan skipped.`,
RunE: runEmbeddingsResume,
}
var embeddingsListCmd = &cobra.Command{
Expand Down Expand Up @@ -55,16 +58,19 @@ func newEmbeddingsBuildCmd(use string) *cobra.Command {
Short: "Build or update the vector embedding index (incremental by default; --full-rebuild for a new generation)",
Long: `Build or update the vector embedding index for hybrid search.
Writes vectors to the co-located vectors.db. In the default incremental
mode, the command drains any pending rows in the active generation. With
--full-rebuild, it creates a new building generation, embeds the entire
corpus, and (on a clean completion) atomically activates it.
mode, the command embeds any messages still needing embedding for the
active generation. With --full-rebuild, it creates a new building
generation, embeds the entire corpus, and (on a clean completion)
atomically activates it.

Requires [vector] to be enabled in config.toml and [vector.embeddings]
to point at a running OpenAI-compatible endpoint.`,
RunE: runEmbeddingsBuild,
}
cmd.Flags().BoolVar(&embedFullRebuild, "full-rebuild", false, "Create a new generation and rebuild from scratch")
cmd.Flags().BoolVar(&embedYes, "yes", false, "Skip confirmation prompts")
cmd.Flags().BoolVar(&embedBackstop, "backstop", false,
"Full-scan pass that ignores the per-generation watermark, catching any straggler messages the incremental scan skipped (idempotent)")
return cmd
}

Expand Down Expand Up @@ -92,10 +98,12 @@ func runEmbeddingsResume(cmd *cobra.Command, args []string) error {

func init() {
embedCmd.Deprecated = "use 'msgvault embeddings build' instead"
embeddingsResumeCmd.Flags().BoolVar(&embedBackstop, "backstop", false,
"Full-scan pass that ignores the per-generation watermark, catching any straggler messages the incremental scan skipped (idempotent)")
embeddingsRetireCmd.Flags().BoolVar(&embeddingsRetireYes, "yes", false, "Skip confirmation prompt")
embeddingsRetireCmd.Flags().BoolVar(&embeddingsRetireForceActive, "force-active", false, "Allow retiring the active generation")
embeddingsActivateCmd.Flags().BoolVar(&embeddingsActivateYes, "yes", false, "Skip confirmation prompt")
embeddingsActivateCmd.Flags().BoolVar(&embeddingsActivateForce, "force", false, "Allow activation with pending rows or a fingerprint mismatch")
embeddingsActivateCmd.Flags().BoolVar(&embeddingsActivateForce, "force", false, "Allow activation while messages still need embedding, or with a fingerprint mismatch")
embeddingsCmd.AddCommand(embeddingsBuildCmd)
embeddingsCmd.AddCommand(embeddingsResumeCmd)
embeddingsCmd.AddCommand(embeddingsListCmd)
Expand Down
42 changes: 22 additions & 20 deletions cmd/msgvault/cmd/embed_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,19 @@ func countEmbeddingRowsPG(t *testing.T, db *sql.DB, gen vector.GenerationID) int
return n
}

// seedGenWithEmbeddingsPG creates a building generation, upserts one chunk per
// supplied message id (dim 4), and clears its pending queue so the management
// commands treat it as a finished generation. Returns the generation id.
// seedGenWithEmbeddingsPG creates a building generation and upserts one chunk
// per supplied message id (dim 4). The consuming tests force-activate/retire
// (force=true), bypassing the coverage gate, so no embed_gen stamping is
// needed to make the management commands treat it as finished. Returns the
// generation id.
func seedGenWithEmbeddingsPG(t *testing.T, pgb *pgvector.Backend, ids ...int64) vector.GenerationID {
t.Helper()
ctx := context.Background()
for _, id := range ids {
_, err := pgb.DB().ExecContext(ctx,
`INSERT INTO messages (id) VALUES ($1) ON CONFLICT DO NOTHING`, id)
require.NoErrorf(t, err, "seed message %d", id)
}
// No messages rows are inserted: embeddings.message_id has no FK to
// messages, and the consuming tests force-activate/retire (force=true),
// bypassing the coverage gate, so no live messages are needed. (The full
// schema's messages.id is GENERATED ALWAYS AS IDENTITY, so an explicit-id
// insert would be rejected anyway.)
gen, err := pgb.CreateGeneration(ctx, "test-model", 4, "test-model:4")
require.NoError(t, err, "CreateGeneration")
chunks := make([]vector.Chunk, 0, len(ids))
Expand All @@ -54,9 +56,8 @@ func seedGenWithEmbeddingsPG(t *testing.T, pgb *pgvector.Backend, ids ...int64)
chunks = append(chunks, vector.Chunk{MessageID: id, ChunkIndex: 0, Vector: v})
}
require.NoError(t, pgb.Upsert(ctx, gen, chunks), "Upsert")
_, err = pgb.DB().ExecContext(ctx,
`DELETE FROM pending_embeddings WHERE generation_id = $1`, int64(gen))
require.NoError(t, err, "clear pending")
// The consuming tests force-activate/retire (force=true), bypassing the
// coverage gate, so no embed_gen stamping is needed here.
return gen
}

Expand Down Expand Up @@ -157,22 +158,23 @@ func openEmbedManagePGDB(t *testing.T) (*pgvector.Backend, func(string) string,
require.NoError(t, err, "store.Open")
t.Cleanup(func() { _ = st.Close() })

// Mirror production: the CLI embeddings commands (runEmbed,
// runEmbeddingsRetire/Activate/List) all run store.InitSchema BEFORE
// opening the pgvector backend. InitSchema creates the full messages table
// (with embed_gen) and applied_migrations, so the backend's open-time reset/
// backfill have the columns they touch. A minimal hand-rolled messages
// table omitting embed_gen diverges from production and breaks both the
// open-time reset and the activate/retire coverage gates (which reference
// embed_gen even under --force).
require.NoError(t, st.InitSchema(), "InitSchema")

pgb, err := pgvector.Open(ctx, pgvector.Options{
DB: st.DB(),
Dimension: 4,
})
require.NoError(t, err, "pgvector.Open")
t.Cleanup(func() { _ = pgb.Close() })

// Create a minimal messages table so CreateGeneration's seed query works.
_, err = st.DB().ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS messages (
id BIGINT PRIMARY KEY,
deleted_at TIMESTAMPTZ,
deleted_from_source_at TIMESTAMPTZ
)`)
require.NoError(t, err, "create messages scaffold")

return pgb, (&store.PostgreSQLDialect{}).Rebind, dsn
}

Expand Down
108 changes: 92 additions & 16 deletions cmd/msgvault/cmd/embed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ func TestEmbeddingsCommandRegistration(t *testing.T) {
require.Equal("build", buildCmd.Name())
require.NotNil(buildCmd.Flags().Lookup("full-rebuild"))
require.NotNil(buildCmd.Flags().Lookup("yes"))
require.NotNil(buildCmd.Flags().Lookup("backstop"))

resumeCmd, _, err := rootCmd.Find([]string{"embeddings", "resume"})
require.NoError(err)
require.Equal("resume", resumeCmd.Name())
require.Nil(resumeCmd.Flags().Lookup("full-rebuild"))
// --backstop is also available on resume, so operators
// can do a watermark-ignoring straggler sweep without --full-rebuild.
require.NotNil(resumeCmd.Flags().Lookup("backstop"))

listCmd, _, err := rootCmd.Find([]string{"embeddings", "list"})
require.NoError(err)
Expand All @@ -52,30 +56,74 @@ func TestEmbeddingsCommandRegistration(t *testing.T) {
require.NotNil(legacyCmd.Flags().Lookup("yes"))
}

// TestRunEmbeddingsResume_PreservesBackstopFlag pins the resume behavior:
// resume forces incremental mode (saves/restores embedFullRebuild + embedYes) but
// must leave embedBackstop exactly as the operator set it, so
// `embeddings resume --backstop` actually runs a backstop pass.
func TestRunEmbeddingsResume_PreservesBackstopFlag(t *testing.T) {
assert := assertpkg.New(t)

// Save and restore all three globals so the test is hermetic.
oldFull, oldYes, oldBackstop := embedFullRebuild, embedYes, embedBackstop
t.Cleanup(func() { embedFullRebuild, embedYes, embedBackstop = oldFull, oldYes, oldBackstop })

// Operator state: full-rebuild on (resume must clear it), backstop on
// (resume must NOT touch it). Point at an empty config so the run errors
// out early (vector disabled) without needing a real backend.
embedFullRebuild = true
embedYes = false
embedBackstop = true
oldCfg := cfg
cfg = &config.Config{}
t.Cleanup(func() { cfg = oldCfg })

cmd := embeddingsResumeCmd
oldCtx := cmd.Context()
cmd.SetContext(context.Background())
t.Cleanup(func() { cmd.SetContext(oldCtx) })

// Errors because vector is not enabled — that's fine; we only assert the
// flag-preservation contract of runEmbeddingsResume.
_ = runEmbeddingsResume(cmd, nil)

assert.True(embedBackstop, "resume must NOT clobber embedBackstop")
assert.True(embedFullRebuild, "resume must restore embedFullRebuild to its prior value")
assert.False(embedYes, "resume must restore embedYes to its prior value")
}

func TestListEmbeddingGenerationsIncludesActiveAndBuilding(t *testing.T) {
require := requirepkg.New(t)
assert := assertpkg.New(t)
db := newEmbeddingMetadataTestDB(t)

// listEmbeddingGenerations reads only the generation metadata now;
// coverage (missing count) is filled separately from the main DB via
// fillCoverage, so it is not asserted here.
rows, err := listEmbeddingGenerations(t.Context(), db, sqliteRebind)
require.NoError(err)
require.Len(rows, 2)

assert.Equal(vector.GenerationID(1), rows[0].ID)
assert.Equal(vector.GenerationActive, rows[0].State)
assert.Equal(int64(2), rows[0].MessageCount)
assert.Equal(int64(0), rows[0].PendingCount)

assert.Equal(vector.GenerationID(2), rows[1].ID)
assert.Equal(vector.GenerationBuilding, rows[1].State)
assert.Equal(int64(1), rows[1].PendingCount)
}

func TestRunEmbeddingsActivateRefusesPendingWithoutForce(t *testing.T) {
// TestRunEmbeddingsActivateRefusesMissingWithoutForce verifies the CLI
// pre-flight coverage gate: activating a building generation that still
// has live messages needing embedding (embed_gen <> gen in the main DB)
// must fail without --force.
func TestRunEmbeddingsActivateRefusesMissingWithoutForce(t *testing.T) {
require := requirepkg.New(t)
assert := assertpkg.New(t)
dbPath := newEmbeddingMetadataTestDBFile(t)
withEmbeddingCommandConfig(t, dbPath)
dataDir := t.TempDir()
dbPath := newEmbeddingMetadataTestDBFileAt(t, filepath.Join(dataDir, "vectors.db"))
// Main DB with one live, unembedded message -> coverage reports
// missing=1 for generation 2.
seedMainDBWithLiveMessage(t, dataDir)
withEmbeddingCommandConfigDataDir(t, dbPath, dataDir)

oldYes := embeddingsActivateYes
embeddingsActivateYes = true
Expand All @@ -87,7 +135,8 @@ func TestRunEmbeddingsActivateRefusesPendingWithoutForce(t *testing.T) {
err := runEmbeddingsActivate(cmd, []string{"2"})

require.Error(err)
assert.Contains(err.Error(), "pending embedding rows")
assert.Contains(err.Error(), "needing embedding")
assert.Contains(err.Error(), "msgvault embeddings resume --backstop")
}

// TestRetireEmbeddingGenerationRefusesActiveWithoutForce_PreCheck pins the
Expand Down Expand Up @@ -134,7 +183,14 @@ func newEmbeddingMetadataTestDB(t *testing.T) *sql.DB {

func newEmbeddingMetadataTestDBFile(t *testing.T) string {
t.Helper()
path := filepath.Join(t.TempDir(), "vectors.db")
return newEmbeddingMetadataTestDBFileAt(t, filepath.Join(t.TempDir(), "vectors.db"))
}

// newEmbeddingMetadataTestDBFileAt creates a vectors.db with just the
// index_generations metadata (no pending_embeddings — coverage now lives
// in the main DB) at the given path.
func newEmbeddingMetadataTestDBFileAt(t *testing.T, path string) string {
t.Helper()
db, err := sql.Open("sqlite3", path)
requirepkg.NoError(t, err)
defer func() { requirepkg.NoError(t, db.Close()) }()
Expand All @@ -152,14 +208,6 @@ CREATE TABLE index_generations (
state TEXT NOT NULL,
message_count INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE pending_embeddings (
generation_id INTEGER NOT NULL,
message_id INTEGER NOT NULL,
enqueued_at INTEGER NOT NULL,
claimed_at INTEGER,
claim_token TEXT,
PRIMARY KEY (generation_id, message_id)
);
`)
requirepkg.NoError(t, err)

Expand All @@ -170,19 +218,47 @@ INSERT INTO index_generations
VALUES
(1, 'model', 4, ?, 100, 101, 110, 111, 'active', 2),
(2, 'model', 4, ?, 120, 121, NULL, NULL, 'building', 1);
INSERT INTO pending_embeddings (generation_id, message_id, enqueued_at) VALUES (2, 42, 120);
`, fp, fp)
requirepkg.NoError(t, err)
return path
}

// seedMainDBWithLiveMessage creates a main msgvault.db in dataDir with one
// live message whose embed_gen is NULL — i.e. it reads as "missing" for
// every generation, so the coverage gate refuses activation.
func seedMainDBWithLiveMessage(t *testing.T, dataDir string) {
t.Helper()
s, err := store.Open(filepath.Join(dataDir, "msgvault.db"))
requirepkg.NoError(t, err)
defer func() { requirepkg.NoError(t, s.Close()) }()
requirepkg.NoError(t, s.InitSchema())
_, err = s.DB().Exec(`
INSERT INTO sources (id, source_type, identifier) VALUES (1, 'gmail', 'me@example.com');
INSERT INTO conversations (id, source_id, conversation_type) VALUES (1, 1, 'email_thread');
INSERT INTO messages (id, conversation_id, source_id, source_message_id, message_type, embed_gen) VALUES (1, 1, 1, 'm1', 'email', NULL);
`)
requirepkg.NoError(t, err)
}

func withEmbeddingCommandConfig(t *testing.T, vecPath string) {
t.Helper()
oldCfg := cfg
cfg = newTestConfigForFingerprint(vecPath)
t.Cleanup(func() { cfg = oldCfg })
}

// withEmbeddingCommandConfigDataDir is like withEmbeddingCommandConfig but
// also sets Data.DataDir so DatabaseDSN() resolves to a real main DB (used
// by the coverage gate).
func withEmbeddingCommandConfigDataDir(t *testing.T, vecPath, dataDir string) {
t.Helper()
oldCfg := cfg
c := newTestConfigForFingerprint(vecPath)
c.Data.DataDir = dataDir
cfg = c
t.Cleanup(func() { cfg = oldCfg })
}

func newTestConfigForFingerprint(vecPath string) *config.Config {
return &config.Config{
Vector: vector.Config{
Expand Down
Loading