diff --git a/go.mod b/go.mod index a81abb787..f9515e30c 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/bits-and-blooms/bitset v1.24.2 github.com/blevesearch/bleve_index_api v1.3.11 github.com/blevesearch/geo v0.2.5 - github.com/blevesearch/go-faiss v1.0.34 + github.com/blevesearch/go-faiss v1.0.36 github.com/blevesearch/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/blevesearch/go-porterstemmer v1.0.3 github.com/blevesearch/goleveldb v1.0.1 @@ -25,7 +25,7 @@ require ( github.com/blevesearch/zapx/v14 v14.4.3 github.com/blevesearch/zapx/v15 v15.4.3 github.com/blevesearch/zapx/v16 v16.3.4 - github.com/blevesearch/zapx/v17 v17.0.12-0.20260421145725-c120519962c1 + github.com/blevesearch/zapx/v17 v17.1.1 github.com/couchbase/moss v0.2.0 github.com/spf13/cobra v1.10.2 go.etcd.io/bbolt v1.4.0 diff --git a/go.sum b/go.sum index 32e149993..9a57a9f50 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/blevesearch/bleve_index_api v1.3.11 h1:x29vbV8OjWfLcrDVd7Lr1q+BkLNS0J github.com/blevesearch/bleve_index_api v1.3.11/go.mod h1:xvd48t5XMeeioWQ5/jZvgLrV98flT2rdvEJ3l/ki4Ko= github.com/blevesearch/geo v0.2.5 h1:yJg9FX1oRwLnjXSXF+ECHfXFTF4diF02Ca/qUGVjJhE= github.com/blevesearch/geo v0.2.5/go.mod h1:Jhq7WE2K6mJTx1xS44M2pUO6Io+wjCSHh1+co3YOgH4= -github.com/blevesearch/go-faiss v1.0.34 h1:cFE1jRkjJfk7qMMsqXBqGEivbYQz/tjSf5yyoH50xbY= -github.com/blevesearch/go-faiss v1.0.34/go.mod h1:OMGQwOaRRYxrmeNdMrXJPvVx8gBnvE5RYrr0BahNnkk= +github.com/blevesearch/go-faiss v1.0.36 h1:qrP6LZX7xrQQ3pOF2B+t+5E+brlOzwQUzZrGLHz4IeU= +github.com/blevesearch/go-faiss v1.0.36/go.mod h1:OMGQwOaRRYxrmeNdMrXJPvVx8gBnvE5RYrr0BahNnkk= github.com/blevesearch/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:kDy+zgJFJJoJYBvdfBSiZYBbdsUL0XcjHYWezpQBGPA= github.com/blevesearch/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:9eJDeqxJ3E7WnLebQUlPD7ZjSce7AnDb9vjGmMCbD0A= github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+DdXTAn4YRcHCo= @@ -45,8 +45,8 @@ github.com/blevesearch/zapx/v15 v15.4.3 h1:iJiMJOHrz216jyO6lS0m9RTCEkprUnzvqAI2l github.com/blevesearch/zapx/v15 v15.4.3/go.mod h1:1pssev/59FsuWcgSnTa0OeEpOzmhtmr/0/11H0Z8+Nw= github.com/blevesearch/zapx/v16 v16.3.4 h1:hDAqA8qusZTNbPEL7//w5P65UZ2de6yhSeUaTbp0Po0= github.com/blevesearch/zapx/v16 v16.3.4/go.mod h1:zqkPPqs9GS9FzVWzCO3Wf1X044yWAV17+4zb+FTiEHg= -github.com/blevesearch/zapx/v17 v17.0.12-0.20260421145725-c120519962c1 h1:1qM+d5vKedxmdL7rIldvQfgh68NevZXMNE7aQwkj5cU= -github.com/blevesearch/zapx/v17 v17.0.12-0.20260421145725-c120519962c1/go.mod h1:be77zp3wB5sTGTWo/6KwCEHnPRyOZYkIeQEr3YIO55E= +github.com/blevesearch/zapx/v17 v17.1.1 h1:Ltal7LsjzRerUg4hqVgMruKj3BAse+rrrDTe+9epJ2k= +github.com/blevesearch/zapx/v17 v17.1.1/go.mod h1:AfYxjApHf7JpQdW4yzFGisSKIrdkPesFn4yJ3vKKPQE= github.com/couchbase/ghistogram v0.1.0 h1:b95QcQTCzjTUocDXp/uMgSNQi8oj1tGwnJ4bODWZnps= github.com/couchbase/ghistogram v0.1.0/go.mod h1:s1Jhy76zqfEecpNWJfWUiKZookAFaiGOEoyzgHt9i7k= github.com/couchbase/moss v0.2.0 h1:VCYrMzFwEryyhRSeI+/b3tRBSeTpi/8gn5Kf6dxqn+o= diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 0f656b38e..4f35483d8 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -1106,7 +1106,7 @@ func (s *Scorch) SetPathInBolt(key []byte, value []byte) error { return err } - // currently this is specific to centroid index file update + // currently this is specific to trained index file update err = s.trainer.updateBolt(snapshotsBucket, key, value) if err != nil { return err diff --git a/index/scorch/train_vector.go b/index/scorch/train_vector.go index cff7a2851..4c3b15ca8 100644 --- a/index/scorch/train_vector.go +++ b/index/scorch/train_vector.go @@ -51,9 +51,9 @@ type vectorTrainer struct { m sync.RWMutex // not a searchable segment in the sense that it won't return - // the data vectors. can return centroid vectors - centroidIndex *SegmentSnapshot - trainCh chan *trainRequest + // the data vectors, returns trained centroid layout + trainedIndex *SegmentSnapshot + trainCh chan *trainRequest } const IndexTrainedWithFastMerge = "vector_index_fast_merge" @@ -67,8 +67,8 @@ func initTrainer(s *Scorch, config map[string]interface{}) *vectorTrainer { config: maps.Clone(s.config), trainCh: make(chan *trainRequest, 1), } - // update the parent scorch config with the trainer's callback to fetch the centroid index - s.segmentConfig[index.TrainedIndexCallback] = index.TrainedIndexCallbackFn(trainer.getCentroidIndex) + // update the parent scorch config with the trainer's callback to fetch the trained index + s.segmentConfig[index.TrainedIndexCallback] = index.TrainedIndexCallbackFn(trainer.getTrainedIndex) return &trainer } } @@ -98,11 +98,11 @@ func (t *vectorTrainer) persistToBolt(trainReq *trainRequest) error { trainerBucket, err := snapshotsBucket.CreateBucketIfNotExists(util.BoltTrainerKey) if err != nil { - return fmt.Errorf("error creating centroid bucket: %v", err) + return fmt.Errorf("error creating trained index bucket: %v", err) } err = trainerBucket.Put(util.BoltPathKey, []byte(index.TrainedIndexFileName), nil) if err != nil { - return fmt.Errorf("error updating centroid bucket: %v", err) + return fmt.Errorf("error updating trained index bucket: %v", err) } t.trainingComplete.Store(trainReq.finalSample) @@ -128,13 +128,17 @@ func (t *vectorTrainer) persistToBolt(trainReq *trainRequest) error { // this is not a routine that will be running throughout the lifetime of the index. It's purpose // is to only train the vector index before the data ingestion starts. func (t *vectorTrainer) trainLoop() { - defer func() { - t.parent.asyncTasks.Done() - }() + defer t.parent.asyncTasks.Done() trainLoopStartTime := time.Now() path := filepath.Join(t.parent.path, index.TrainedIndexFileName) for { + // exit once the final sample set has been ingested and training is complete. + if t.trainingComplete.Load() { + atomic.StoreUint64(&t.parent.stats.TotTrainedSamples, t.trainedSamples) + atomic.StoreUint64(&t.parent.stats.TotTrainTime, uint64(time.Since(trainLoopStartTime).Milliseconds())) + return + } select { case <-t.parent.closeCh: select { @@ -146,85 +150,75 @@ func (t *vectorTrainer) trainLoop() { return case trainReq := <-t.trainCh: sampleSeg := trainReq.sample - if t.centroidIndex == nil { + // no sample segment: just persist state if this is the final sample and move on. + if sampleSeg == nil { + if trainReq.finalSample { + if err := t.persistToBolt(trainReq); err != nil { + trainReq.ackCh <- fmt.Errorf("error persisting to bolt: %v", err) + close(trainReq.ackCh) + return + } + } + close(trainReq.ackCh) + continue + } + + if t.trainedIndex == nil { switch seg := sampleSeg.(type) { case segment.UnpersistedSegment: - err := persistToDirectory(seg, nil, path) - if err != nil { + if err := persistToDirectory(seg, nil, path); err != nil { trainReq.ackCh <- fmt.Errorf("error persisting segment: %v", err) close(trainReq.ackCh) continue } - default: } } else { - // merge the new segment with the existing one, to create a new - // .tmp centroid index file and then move it to the actual - // centroid index file path (during the merge, Os.Open(centroidIndexPath) - // won't be safe since its still being used for merge) + // merge the new segment with the existing one into a .tmp file, then + // atomically rename it into place (Os.Open on the live path is unsafe + // during the merge). t.config[index.TrainingKey] = true - _, _, err := t.parent.segPlugin.MergeUsing([]segment.Segment{t.centroidIndex.segment, sampleSeg}, + _, _, err := t.parent.segPlugin.MergeUsing([]segment.Segment{t.trainedIndex.segment, sampleSeg}, []*roaring.Bitmap{nil, nil}, path+".tmp", t.parent.closeCh, nil, t.config) + t.config[index.TrainingKey] = false if err != nil { - trainReq.ackCh <- fmt.Errorf("error merging centroid index: %v", err) + trainReq.ackCh <- fmt.Errorf("error merging trained index: %v", err) close(trainReq.ackCh) return } - // reset the training flag once completed - t.config[index.TrainingKey] = false - // close the existing centroid segment - it's supposed to be gc'd at this point - t.centroidIndex.segment.Close() - err = moveFile(path+".tmp", path) - if err != nil { - trainReq.ackCh <- fmt.Errorf("error renaming centroid index: %v", err) + t.trainedIndex.segment.Close() + if err = moveFile(path+".tmp", path); err != nil { + trainReq.ackCh <- fmt.Errorf("error renaming trained index: %v", err) close(trainReq.ackCh) return } } - // a bolt transaction is necessary for failover-recovery scenario and also serves as a checkpoint - // where we can be sure that the centroid index is available for the indexing operations downstream - // - // note: when the scale increases massively especially with real world dimensions of 1536+, this API - // will have to be refactored to persist in a more resource efficient way. so having this bolt related - // code will help in tracking the progress a lot better and avoid any redudant data streaming operations. - // + // bolt write acts as a checkpoint for failover-recovery: callers downstream + // can rely on the trained index being available once this completes. // todo: rethink the frequency of bolt writes - err := t.persistToBolt(trainReq) - if err != nil { + if err := t.persistToBolt(trainReq); err != nil { trainReq.ackCh <- fmt.Errorf("error persisting to bolt: %v", err) close(trainReq.ackCh) return } - // update the centroid index pointer - centroidIndex, err := t.parent.segPlugin.OpenUsing(path, t.parent.segmentConfig) + trainedIndex, err := t.parent.segPlugin.OpenUsing(path, t.parent.segmentConfig) if err != nil { - trainReq.ackCh <- fmt.Errorf("error opening centroid index: %v", err) + trainReq.ackCh <- fmt.Errorf("error opening trained index: %v", err) close(trainReq.ackCh) return } t.m.Lock() - t.centroidIndex = &SegmentSnapshot{ - segment: centroidIndex, - } + t.trainedIndex = &SegmentSnapshot{segment: trainedIndex} t.m.Unlock() close(trainReq.ackCh) - - // exit the trainer loop we've ingested the final sample set and training - // is assumed to be complete. - if t.trainingComplete.Load() { - atomic.StoreUint64(&t.parent.stats.TotTrainedSamples, t.trainedSamples) - atomic.StoreUint64(&t.parent.stats.TotTrainTime, uint64(time.Since(trainLoopStartTime).Milliseconds())) - return - } } } } -// loads the metadata specific to the centroid index from boltdb, happens during init +// loads the metadata specific to the trained index from boltdb, happens during init // no lock needed func (t *vectorTrainer) loadTrainedData(bucket *util.BoltBucketImpl) error { if bucket == nil { @@ -262,7 +256,7 @@ func (t *vectorTrainer) loadTrainedData(bucket *util.BoltBucketImpl) error { t.m.Lock() defer t.m.Unlock() - t.centroidIndex = segmentSnapshot + t.trainedIndex = segmentSnapshot return nil } @@ -290,6 +284,11 @@ func (t *vectorTrainer) train(batch *index.Batch) error { return fmt.Errorf("error parsing train complete: %v", err) } + trainReq := &trainRequest{ + finalSample: fin, + sampleSize: len(trainData), + ackCh: make(chan error), + } // just builds a new vector index out of the train data provided // this is not necessarily the final train data since this is submitted // as a request to the trainer component to be merged. once the training @@ -298,16 +297,11 @@ func (t *vectorTrainer) train(batch *index.Batch) error { // // note: this might index text data too, how to handle this? s.segmentConfig? // todo: updates/deletes -> data drift detection - seg, _, err := t.parent.segPlugin.NewUsing(trainData, t.parent.segmentConfig) - if err != nil { - return err - } - - trainReq := &trainRequest{ - finalSample: fin, - sampleSize: len(trainData), - ackCh: make(chan error), - sample: seg, + if len(trainData) > 0 { + trainReq.sample, _, err = t.parent.segPlugin.NewUsing(trainData, t.parent.segmentConfig) + if err != nil { + return err + } } t.trainCh <- trainReq @@ -327,15 +321,15 @@ func (t *vectorTrainer) getInternal(key []byte) ([]byte, error) { return nil, nil } -func (t *vectorTrainer) getCentroidIndex(field string) (interface{}, error) { +func (t *vectorTrainer) getTrainedIndex(field string) (interface{}, error) { // return the coarse quantizer of the trained faiss index belonging to the field // if its not available then zap performs naive merge t.m.RLock() defer t.m.RUnlock() - if t.centroidIndex != nil { - trainedSegment, ok := t.centroidIndex.segment.(segment.TrainedSegment) + if t.trainedIndex != nil { + trainedSegment, ok := t.trainedIndex.segment.(segment.TrainedSegment) if !ok { - return nil, fmt.Errorf("segment is not a centroid index segment") + return nil, fmt.Errorf("segment is not a trained index segment") } coarseQuantizer, err := trainedSegment.GetCoarseQuantizer(field) @@ -349,7 +343,7 @@ func (t *vectorTrainer) getCentroidIndex(field string) (interface{}, error) { func (t *vectorTrainer) copyFileLOCKED(file string, d index.IndexDirectory) error { if strings.HasSuffix(file, index.TrainedIndexFileName) { - // centroid index file - this is outside the snapshots domain so the bolt update is different + // trained index file - this is outside the snapshots domain so the bolt update is different err := d.SetPathInBolt(util.BoltTrainerKey, []byte(file)) if err != nil { return fmt.Errorf("error updating dest index bolt: %w", err) @@ -393,7 +387,7 @@ func (t *vectorTrainer) updateBolt(snapshotsBucket *util.BoltBucketImpl, key []b } // update the centroid index pointer - t.centroidIndex, err = t.parent.loadSegment(trainerBucket, reader) + t.trainedIndex, err = t.parent.loadSegment(trainerBucket, reader) if err != nil { return err }