diff --git a/frontend/src/components/queue/ImportMethods.tsx b/frontend/src/components/queue/ImportMethods.tsx index 20dda83d..eb8f2eae 100644 --- a/frontend/src/components/queue/ImportMethods.tsx +++ b/frontend/src/components/queue/ImportMethods.tsx @@ -607,12 +607,14 @@ function StatusBadge({ status }: { status: string }) { function NzbDavImportSection() { const [inputMethod, setInputMethod] = useState<"server" | "upload">("server"); const [selectedDbPath, setSelectedDbPath] = useState(""); + const [blobsPath, setBlobsPath] = useState(""); const [selectedFile, setSelectedFile] = useState(null); const [rootFolder, setRootFolder] = useState(""); const [isLoading, setIsLoading] = useState(false); const [error, setError] = useState(null); const { showToast } = useToast(); const [isFileBrowserOpen, setIsFileBrowserOpen] = useState(false); + const [isBlobsBrowserOpen, setIsBlobsBrowserOpen] = useState(false); const { data: importStatus } = useNzbdavImportStatus(2000); const cancelImport = useCancelNzbdavImport(); @@ -639,6 +641,9 @@ function NzbDavImportSection() { const formData = new FormData(); formData.append("rootFolder", rootFolder); + if (blobsPath) { + formData.append("blobsPath", blobsPath); + } if (inputMethod === "server") { formData.append("dbPath", selectedDbPath); @@ -680,6 +685,10 @@ function NzbDavImportSection() { setSelectedDbPath(path); }; + const handleBlobsSelect = (path: string) => { + setBlobsPath(path); + }; + const handleFileUpload = (e: React.ChangeEvent) => { if (e.target.files && e.target.files.length > 0) { setSelectedFile(e.target.files[0]); @@ -899,7 +908,7 @@ function NzbDavImportSection() {
-
+
{inputMethod === "server" ? (
@@ -937,6 +946,32 @@ function NzbDavImportSection() { />
)} + +
+ + Blobs Directory (Required for NzbDav alpha) + +
+ setBlobsPath(e.target.value)} + /> + +
+

+ If left empty, it will default to a "blobs" folder in the same directory as the + database. +

+
@@ -963,6 +998,14 @@ function NzbDavImportSection() { onSelect={handleFileSelect} filterExtension=".sqlite" /> + + setIsBlobsBrowserOpen(false)} + onSelect={handleBlobsSelect} + title="Select Blobs Directory" + allowDirectorySelection={true} + />
); } diff --git a/internal/api/nzbdav_handlers.go b/internal/api/nzbdav_handlers.go index b4ad0c9f..4fb61c8f 100644 --- a/internal/api/nzbdav_handlers.go +++ b/internal/api/nzbdav_handlers.go @@ -41,6 +41,8 @@ func (s *Server) handleImportNzbdav(c *fiber.Ctx) error { }) } + blobsPath := c.FormValue("blobsPath") + // 2. Handle File Source (Path or Upload) dbPath := c.FormValue("dbPath") var isTempFile bool @@ -78,8 +80,13 @@ func (s *Server) handleImportNzbdav(c *fiber.Ctx) error { isTempFile = true } + // Default blobsPath if not provided + if blobsPath == "" { + blobsPath = filepath.Join(filepath.Dir(dbPath), "blobs") + } + // 3. Start Async Import - if err := s.importerService.StartNzbdavImport(dbPath, rootFolder, isTempFile); err != nil { + if err := s.importerService.StartNzbdavImport(dbPath, blobsPath, rootFolder, isTempFile); err != nil { if isTempFile { os.Remove(dbPath) // Clean up if start failed } diff --git a/internal/importer/interfaces.go b/internal/importer/interfaces.go index 797ddc82..a995b08c 100644 --- a/internal/importer/interfaces.go +++ b/internal/importer/interfaces.go @@ -39,7 +39,7 @@ type DirectoryScanner interface { // NzbDavImporter handles bulk import from NzbDav databases type NzbDavImporter interface { // StartNzbdavImport begins importing from an NzbDav database - StartNzbdavImport(dbPath string, rootFolder string, cleanupFile bool) error + StartNzbdavImport(dbPath string, blobsPath string, rootFolder string, cleanupFile bool) error // GetImportStatus returns the current import status GetImportStatus() ImportInfo // CancelImport cancels an in-progress import diff --git a/internal/importer/scanner/nzbdav.go b/internal/importer/scanner/nzbdav.go index 65771fa8..0a979bf4 100644 --- a/internal/importer/scanner/nzbdav.go +++ b/internal/importer/scanner/nzbdav.go @@ -43,7 +43,7 @@ func NewNzbDavImporter(batchAdder BatchQueueAdder) *NzbDavImporter { } // Start starts an asynchronous import from an NZBDav database -func (n *NzbDavImporter) Start(dbPath string, rootFolder string, cleanupFile bool) error { +func (n *NzbDavImporter) Start(dbPath string, blobsPath string, rootFolder string, cleanupFile bool) error { n.mu.Lock() defer n.mu.Unlock() @@ -64,7 +64,7 @@ func (n *NzbDavImporter) Start(dbPath string, rootFolder string, cleanupFile boo Skipped: 0, } - go n.performImport(importCtx, dbPath, rootFolder, cleanupFile) + go n.performImport(importCtx, dbPath, blobsPath, rootFolder, cleanupFile) return nil } @@ -108,9 +108,9 @@ func (n *NzbDavImporter) Reset() { } // performImport performs the actual import work -func (n *NzbDavImporter) performImport(ctx context.Context, dbPath string, rootFolder string, cleanupFile bool) { +func (n *NzbDavImporter) performImport(ctx context.Context, dbPath string, blobsPath string, rootFolder string, cleanupFile bool) { // Parse Database - parser := nzbdav.NewParser(dbPath) + parser := nzbdav.NewParser(dbPath, blobsPath) nzbChan, errChan := parser.Parse() defer func() { diff --git a/internal/importer/service.go b/internal/importer/service.go index c9b12080..fb563c9c 100644 --- a/internal/importer/service.go +++ b/internal/importer/service.go @@ -504,8 +504,8 @@ func (s *Service) CancelScan() error { } // StartNzbdavImport starts an asynchronous import from an NZBDav database -func (s *Service) StartNzbdavImport(dbPath string, rootFolder string, cleanupFile bool) error { - return s.nzbdavImporter.Start(dbPath, rootFolder, cleanupFile) +func (s *Service) StartNzbdavImport(dbPath string, blobsPath string, rootFolder string, cleanupFile bool) error { + return s.nzbdavImporter.Start(dbPath, blobsPath, rootFolder, cleanupFile) } // GetImportStatus returns the current import status diff --git a/internal/nzbdav/parser.go b/internal/nzbdav/parser.go index f742e52c..433859fd 100644 --- a/internal/nzbdav/parser.go +++ b/internal/nzbdav/parser.go @@ -7,18 +7,25 @@ import ( "fmt" "io" "log/slog" + "os" + "path/filepath" "strings" "text/template" + "github.com/klauspost/compress/zstd" _ "github.com/mattn/go-sqlite3" ) type Parser struct { - dbPath string + dbPath string + blobsPath string } -func NewParser(dbPath string) *Parser { - return &Parser{dbPath: dbPath} +func NewParser(dbPath, blobsPath string) *Parser { + return &Parser{ + dbPath: dbPath, + blobsPath: blobsPath, + } } // Parse streams NZBs from the database @@ -60,150 +67,243 @@ func (p *Parser) Parse() (<-chan *ParsedNzb, <-chan error) { slog.InfoContext(context.Background(), "NZBDav Database Tables", "tables", tables) } - // Query ALL files, ordered by ParentId - // This groups files belonging to the same release together efficiently - rows, err := db.Query(` - SELECT - COALESCE(p.Id, 'root') as ReleaseId, - COALESCE(p.Name, 'root') as ReleaseName, - COALESCE(p.Path, '/') as ReleasePath, - c.Id as FileId, - c.Name as FileName, - c.FileSize, - n.SegmentIds, - r.RarParts, - m.Metadata as MultipartMetadata - FROM DavItems c - LEFT JOIN DavItems p ON c.ParentId = p.Id - LEFT JOIN DavNzbFiles n ON n.Id = c.Id - LEFT JOIN DavRarFiles r ON r.Id = c.Id - LEFT JOIN DavMultipartFiles m ON m.Id = c.Id - WHERE (n.Id IS NOT NULL OR r.Id IS NOT NULL OR m.Id IS NOT NULL) - ORDER BY c.ParentId, c.Name - `) - if err != nil { - errChan <- fmt.Errorf("failed to query files: %w", err) - return - } - defer rows.Close() - slog.DebugContext(context.Background(), "NZBDav file query completed, starting iteration") - - var currentParentId string - var currentWriter *io.PipeWriter - count := 0 - var currentExtractedFiles []ExtractedFileInfo - - // cleanupCurrent ensures the current writer is properly closed - cleanupCurrent := func() { - if currentWriter != nil { - // Write NZB Footer - if _, err := currentWriter.Write([]byte("")); err != nil { - slog.ErrorContext(context.Background(), "Failed to write NZB footer", "error", err) - } - currentWriter.Close() - currentWriter = nil + // Check for blob-based storage (NzbDav alpha) + if p.blobsPath != "" { + // Check if NzbNames table exists + var nzbNamesExists bool + err = db.QueryRow("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='NzbNames'").Scan(&nzbNamesExists) + if err == nil && nzbNamesExists { + slog.InfoContext(context.Background(), "Detected blob-based NZBDav storage") + p.parseBlobs(db, out, errChan) + return } } - defer cleanupCurrent() - for rows.Next() { - var releaseId, releaseName, releasePath string - var fileId, fileName string - var fileSize sql.NullInt64 - var segmentIdsJSON, rarPartsJSON, multipartMetadataJSON sql.RawBytes + // Fallback to legacy reconstruction approach + p.parseLegacy(db, out, errChan) + }() - if err := rows.Scan(&releaseId, &releaseName, &releasePath, &fileId, &fileName, &fileSize, &segmentIdsJSON, &rarPartsJSON, &multipartMetadataJSON); err != nil { - slog.ErrorContext(context.Background(), "Failed to scan row", "error", err) - continue - } + return out, errChan +} - // Improve release name if it's just "extracted" - if strings.EqualFold(releaseName, "extracted") { - // Try to get the name from the path - pathParts := strings.Split(strings.Trim(releasePath, "/"), "/") - if len(pathParts) > 0 { - // Use the last part of the path that isn't "extracted" - for i := len(pathParts) - 1; i >= 0; i-- { - if !strings.EqualFold(pathParts[i], "extracted") { - releaseName = pathParts[i] - break - } - } - } +func (p *Parser) parseBlobs(db *sql.DB, out chan<- *ParsedNzb, errChan chan<- error) { + rows, err := db.Query(` + SELECT + d.Id, + n.FileName, + COALESCE(d.Path, '/') as ReleasePath, + d.NzbBlobId + FROM DavItems d + JOIN NzbNames n ON n.Id = d.NzbBlobId + WHERE d.NzbBlobId IS NOT NULL + AND d.NzbBlobId != '' + AND d.NzbBlobId != '00000000-0000-0000-0000-000000000000' + AND d.SubType = 203 + `) + if err != nil { + errChan <- fmt.Errorf("failed to query blob files: %w", err) + return + } + defer rows.Close() + + count := 0 + for rows.Next() { + var id, fileName, releasePath, blobId string + if err := rows.Scan(&id, &fileName, &releasePath, &blobId); err != nil { + slog.ErrorContext(context.Background(), "Failed to scan blob row", "error", err) + continue + } + + // Resolve blob path: blobs/XX/YY/[uuid] + if len(blobId) < 4 { + slog.WarnContext(context.Background(), "Invalid blob ID", "id", blobId) + continue + } + blobPath := filepath.Join(p.blobsPath, blobId[0:2], blobId[2:4], blobId) + + blobFile, err := os.Open(blobPath) + if err != nil { + slog.ErrorContext(context.Background(), "Failed to open blob file", "path", blobPath, "error", err) + continue + } + + // Decompress zstd blob + pr, pw := io.Pipe() + go func() { + defer blobFile.Close() + + zr, err := zstd.NewReader(blobFile) + if err != nil { + pw.CloseWithError(err) + return } + defer zr.Close() - // Check if this file is inside an "extracted" folder - isExtractedFile := strings.Contains(releasePath, "/extracted") || releaseName == "extracted" - if isExtractedFile && fileSize.Valid && fileSize.Int64 > 0 { - currentExtractedFiles = append(currentExtractedFiles, ExtractedFileInfo{ - Name: fileName, - Size: fileSize.Int64, - }) + if _, err := io.Copy(pw, zr); err != nil { + pw.CloseWithError(err) + return } + pw.Close() + }() + category := p.deriveCategory(releasePath) + relPath := p.deriveRelPath(releasePath, category) + + select { + case out <- &ParsedNzb{ + ID: id, + Name: strings.TrimSuffix(fileName, ".nzb"), + Category: category, + RelPath: relPath, + Content: pr, + }: count++ - if count%100 == 0 { - slog.InfoContext(context.Background(), "NZBDav import progress", "files_scanned", count) + } + } + slog.InfoContext(context.Background(), "NZBDav blob import scan completed", "total_files", count) +} + +func (p *Parser) parseLegacy(db *sql.DB, out chan<- *ParsedNzb, errChan chan<- error) { + // Query ALL files, ordered by ParentId + // This groups files belonging to the same release together efficiently + rows, err := db.Query(` + SELECT + COALESCE(p.Id, 'root') as ReleaseId, + COALESCE(p.Name, 'root') as ReleaseName, + COALESCE(p.Path, '/') as ReleasePath, + c.Id as FileId, + c.Name as FileName, + c.FileSize, + n.SegmentIds, + r.RarParts, + m.Metadata as MultipartMetadata + FROM DavItems c + LEFT JOIN DavItems p ON c.ParentId = p.Id + LEFT JOIN DavNzbFiles n ON n.Id = c.Id + LEFT JOIN DavRarFiles r ON r.Id = c.Id + LEFT JOIN DavMultipartFiles m ON m.Id = c.Id + WHERE (n.Id IS NOT NULL OR r.Id IS NOT NULL OR m.Id IS NOT NULL) + ORDER BY c.ParentId, c.Name + `) + if err != nil { + errChan <- fmt.Errorf("failed to query files: %w", err) + return + } + defer rows.Close() + slog.DebugContext(context.Background(), "NZBDav file query completed, starting iteration") + + var currentParentId string + var currentWriter *io.PipeWriter + count := 0 + var currentExtractedFiles []ExtractedFileInfo + + // cleanupCurrent ensures the current writer is properly closed + cleanupCurrent := func() { + if currentWriter != nil { + // Write NZB Footer + if _, err := currentWriter.Write([]byte("")); err != nil { + slog.ErrorContext(context.Background(), "Failed to write NZB footer", "error", err) } + currentWriter.Close() + currentWriter = nil + } + } + defer cleanupCurrent() + + for rows.Next() { + var releaseId, releaseName, releasePath string + var fileId, fileName string + var fileSize sql.NullInt64 + var segmentIdsJSON, rarPartsJSON, multipartMetadataJSON sql.RawBytes + + if err := rows.Scan(&releaseId, &releaseName, &releasePath, &fileId, &fileName, &fileSize, &segmentIdsJSON, &rarPartsJSON, &multipartMetadataJSON); err != nil { + slog.ErrorContext(context.Background(), "Failed to scan row", "error", err) + continue + } - // Check if we switched to a new release - if releaseId != currentParentId || currentWriter == nil { - cleanupCurrent() - - currentParentId = releaseId - currentExtractedFiles = nil // Reset for new release - slog.DebugContext(context.Background(), "Processing new release", "path", releasePath, "name", releaseName) - - // Create new pipe for this release - pr, pw := io.Pipe() - currentWriter = pw - - // Send ParsedNzb to output channel - category := p.deriveCategory(releasePath) - relPath := p.deriveRelPath(releasePath, category) - - select { - case out <- &ParsedNzb{ - ID: releaseId, - Name: releaseName, - Category: category, - RelPath: relPath, - Content: pr, - ExtractedFiles: currentExtractedFiles, - }: - case <-errChan: // Context cancelled or error - return + // Improve release name if it's just "extracted" + if strings.EqualFold(releaseName, "extracted") { + // Try to get the name from the path + pathParts := strings.Split(strings.Trim(releasePath, "/"), "/") + if len(pathParts) > 0 { + // Use the last part of the path that isn't "extracted" + for i := len(pathParts) - 1; i >= 0; i-- { + if !strings.EqualFold(pathParts[i], "extracted") { + releaseName = pathParts[i] + break + } } + } + } - // Write NZB Header - header := ` + // Check if this file is inside an "extracted" folder + isExtractedFile := strings.Contains(releasePath, "/extracted") || releaseName == "extracted" + if isExtractedFile && fileSize.Valid && fileSize.Int64 > 0 { + currentExtractedFiles = append(currentExtractedFiles, ExtractedFileInfo{ + Name: fileName, + Size: fileSize.Int64, + }) + } + + count++ + if count%100 == 0 { + slog.InfoContext(context.Background(), "NZBDav import progress", "files_scanned", count) + } + + // Check if we switched to a new release + if releaseId != currentParentId || currentWriter == nil { + cleanupCurrent() + + currentParentId = releaseId + currentExtractedFiles = nil // Reset for new release + slog.DebugContext(context.Background(), "Processing new release", "path", releasePath, "name", releaseName) + + // Create new pipe for this release + pr, pw := io.Pipe() + currentWriter = pw + + // Send ParsedNzb to output channel + category := p.deriveCategory(releasePath) + relPath := p.deriveRelPath(releasePath, category) + + select { + case out <- &ParsedNzb{ + ID: releaseId, + Name: releaseName, + Category: category, + RelPath: relPath, + Content: pr, + ExtractedFiles: currentExtractedFiles, + }: + } + + // Write NZB Header + header := ` ` + template.HTMLEscapeString(releaseName) + ` ` - if _, err := currentWriter.Write([]byte(header)); err != nil { - slog.ErrorContext(context.Background(), "Failed to write NZB header", "release", releaseName, "error", err) - currentWriter.CloseWithError(err) - currentWriter = nil - continue - } - } - - // Write File Entry - if err := p.writeFileEntry(currentWriter, fileId, fileName, fileSize, segmentIdsJSON, rarPartsJSON, multipartMetadataJSON); err != nil { - slog.ErrorContext(context.Background(), "Failed to write file entry", "file", fileName, "error", err) + if _, err := currentWriter.Write([]byte(header)); err != nil { + slog.ErrorContext(context.Background(), "Failed to write NZB header", "release", releaseName, "error", err) currentWriter.CloseWithError(err) currentWriter = nil + continue } } - slog.InfoContext(context.Background(), "NZBDav import scan completed", "total_files", count) - }() - return out, errChan + // Write File Entry + if err := p.writeFileEntry(currentWriter, fileId, fileName, fileSize, segmentIdsJSON, rarPartsJSON, multipartMetadataJSON); err != nil { + slog.ErrorContext(context.Background(), "Failed to write file entry", "file", fileName, "error", err) + currentWriter.CloseWithError(err) + currentWriter = nil + } + } + slog.InfoContext(context.Background(), "NZBDav import scan completed", "total_files", count) } + func (p *Parser) deriveCategory(path string) string { lowerPath := strings.ToLower(path) if strings.Contains(lowerPath, "/movies/") || strings.Contains(lowerPath, "/movie/") { diff --git a/internal/nzbdav/parser_test.go b/internal/nzbdav/parser_test.go index ef2368bf..9e10161a 100644 --- a/internal/nzbdav/parser_test.go +++ b/internal/nzbdav/parser_test.go @@ -3,9 +3,11 @@ package nzbdav import ( "database/sql" "io" + "os" "path/filepath" "testing" + "github.com/klauspost/compress/zstd" _ "github.com/mattn/go-sqlite3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -63,7 +65,7 @@ func TestParser_Parse(t *testing.T) { require.NoError(t, err) // Run Parser - parser := NewParser(dbPath) + parser := NewParser(dbPath, "") out, errChan := parser.Parse() // Verify @@ -117,3 +119,84 @@ func TestParser_Parse(t *testing.T) { _, ok := <-out assert.False(t, ok) } + +func TestParser_Parse_Blobs(t *testing.T) { + // Create temp dir and blobs folder + tmpDir := t.TempDir() + blobsDir := filepath.Join(tmpDir, "blobs") + require.NoError(t, os.MkdirAll(blobsDir, 0755)) + + dbPath := filepath.Join(tmpDir, "test_blobs.db") + db, err := sql.Open("sqlite3", dbPath) + require.NoError(t, err) + defer db.Close() + + // Init Schema (alpha version) + _, err = db.Exec(` + CREATE TABLE DavItems ( + Id TEXT PRIMARY KEY, + Name TEXT, + Path TEXT, + NzbBlobId TEXT, + SubType INTEGER + ); + CREATE TABLE NzbNames ( + Id TEXT PRIMARY KEY, + FileName TEXT + ); + `) + require.NoError(t, err) + + // Create a compressed blob + blobId := "a1b2c3d4e5f6g7h8" + shardedDir := filepath.Join(blobsDir, blobId[0:2], blobId[2:4]) + require.NoError(t, os.MkdirAll(shardedDir, 0755)) + + nzbContent := ` + + + alt.binaries.test + msgid@test + +` + + blobPath := filepath.Join(shardedDir, blobId) + f, err := os.Create(blobPath) + require.NoError(t, err) + + zw, err := zstd.NewWriter(f) + require.NoError(t, err) + _, err = zw.Write([]byte(nzbContent)) + require.NoError(t, err) + require.NoError(t, zw.Close()) + require.NoError(t, f.Close()) + + // Insert Data + _, err = db.Exec(` + INSERT INTO NzbNames (Id, FileName) VALUES ('a1b2c3d4e5f6g7h8', 'My Movie.nzb'); + INSERT INTO DavItems (Id, Name, Path, NzbBlobId, SubType) VALUES + ('item1', 'My Movie', '/movies/My Movie', 'a1b2c3d4e5f6g7h8', 203); + `) + require.NoError(t, err) + + // Run Parser + parser := NewParser(dbPath, blobsDir) + out, errChan := parser.Parse() + + // Verify + select { + case res, ok := <-out: + require.True(t, ok) + assert.Equal(t, "item1", res.ID) + assert.Equal(t, "My Movie", res.Name) + assert.Equal(t, "movies", res.Category) + content, _ := io.ReadAll(res.Content) + assert.Equal(t, nzbContent, string(content)) + case err := <-errChan: + require.NoError(t, err) + } + + // Should be no more items + _, ok := <-out + assert.False(t, ok) +}