Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
*.out # Dependency directories (remove the comment below to include it)
# vendor/

# Go workspace file
Expand All @@ -35,3 +33,6 @@ bin/
/tmp
/models
/.vscode

# Dynarag binary
dynarag
Binary file removed dynarag
Binary file not shown.
85 changes: 85 additions & 0 deletions internal/store/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package store

import (
"context"
"log"
"sync"
)

type Chunk struct {
FilePath string
ChunkText string
EmbeddingText *string
Metadata map[string]interface{}
}

type BatchProgress struct {
Total int
Completed int
Failed []string
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}

func InitializeBatchProgress(total int) *BatchProgress {
ctx, cancel := context.WithCancel(context.Background())
return &BatchProgress{
Total: total,
Failed: []string{},
ctx: ctx,
cancel: cancel,
}
}

// using userId as a dummy for now

func BatchProgressEmbeddings(ctx context.Context, userId string, chunks []Chunk) *BatchProgress {
progress := InitializeBatchProgress(len(chunks))
var wg sync.WaitGroup

for _, chunk := range chunks {
select {
case <-progress.ctx.Done():
log.Println("Batch process canceled")
return progress
default:
wg.Add(1)
go func(chunk Chunk) {
defer wg.Done()
if err := processSingleChunk(ctx, userId, chunk, progress); err != nil {
progress.mu.Lock()
progress.Failed = append(progress.Failed, chunk.FilePath)
progress.mu.Unlock()
}
}(chunk)
}
}

wg.Wait()
return progress
}

func processSingleChunk(
ctx context.Context,
userId string,
chunk Chunk,
progress *BatchProgress,
) error {
_, err := AddEmbedding(
ctx,
userId,
chunk.FilePath,
chunk.ChunkText,
chunk.EmbeddingText,
chunk.Metadata,
)
progress.mu.Lock()
defer progress.mu.Unlock()
if err != nil {
log.Printf("Failed to process chunk for file: %s, error: %v", chunk.FilePath, err)
return err
}
progress.Completed++
return nil
}
Loading