diff --git a/config/altmount.log b/config/altmount.log deleted file mode 100644 index 085f362b5..000000000 --- a/config/altmount.log +++ /dev/null @@ -1,28 +0,0 @@ -time=2025-12-19T20:35:08.095Z level=INFO msg="OK 001_initial_schema.sql (7.34ms)" -time=2025-12-19T20:35:08.100Z level=INFO msg="OK 002_add_repair_retry_system.sql (4.22ms)" -time=2025-12-19T20:35:08.101Z level=INFO msg="OK 003_add_file_id_column.sql (741.12µs)" -time=2025-12-19T20:35:08.103Z level=INFO msg="OK 004_remove_partial_status.sql (1.91ms)" -time=2025-12-19T20:35:08.103Z level=INFO msg="OK 005_add_health_scheduling_fields.sql (658.17µs)" -time=2025-12-19T20:35:08.105Z level=INFO msg="OK 006_remove_next_retry_at.sql (1.06ms)" -time=2025-12-19T20:35:08.105Z level=INFO msg="OK 007_add_library_path.sql (300.95µs)" -time=2025-12-19T20:35:08.105Z level=INFO msg="OK 008_add_priority_column.sql (323.27µs)" -time=2025-12-19T20:35:08.105Z level=INFO msg="goose: successfully migrated database to version: 8" -time=2025-12-19T20:35:08.105Z level=INFO msg="Starting server without NNTP providers - configure via API to enable downloads" -time=2025-12-19T20:35:08.105Z level=INFO msg="RClone RC notifications disabled" -time=2025-12-19T20:35:08.105Z level=INFO msg="NZB import service started successfully with 2 workers" component=importer-service -time=2025-12-19T20:35:08.106Z level=INFO msg="Authentication service initialized" -time=2025-12-19T20:35:08.106Z level=INFO msg="Health system disabled - no health monitoring or repairs will occur" -time=2025-12-19T20:35:08.106Z level=INFO msg="Arrs service is disabled in configuration" -time=2025-12-19T20:35:08.106Z level=INFO msg="AltMount server started" port=8080 webdav_path=/webdav api_path=/api providers=0 download_workers=15 processor_workers=2 -time=2025-12-19T20:35:10.106Z level=INFO msg="RClone mount service is disabled in configuration" -time=2025-12-19T20:39:35.686Z level=INFO msg="Received shutdown signal" signal=terminated -time=2025-12-19T20:39:35.686Z level=INFO msg="Starting graceful shutdown sequence" -time=2025-12-19T20:39:35.686Z level=ERROR msg="Failed to stop health worker" error="health worker not running" -time=2025-12-19T20:39:35.687Z level=INFO msg="Shutting down server..." -time=2025-12-19T20:39:35.687Z level=INFO msg="Server shutdown completed" -time=2025-12-19T20:39:35.687Z level=INFO msg="AltMount server shutdown completed successfully" -time=2025-12-19T20:39:35.687Z level=INFO msg="Closing importer service" -time=2025-12-19T20:39:35.687Z level=INFO msg="Queue worker stopped" component=importer-service worker_id=0 -time=2025-12-19T20:39:35.687Z level=INFO msg="Queue worker stopped" component=importer-service worker_id=1 -time=2025-12-19T20:39:35.688Z level=INFO msg="Clearing NNTP pool" -time=2025-12-19T20:39:35.688Z level=INFO msg="Closing database" diff --git a/internal/importer/scanner/nzbdav.go b/internal/importer/scanner/nzbdav.go index 51ddb7de6..934636a3b 100644 --- a/internal/importer/scanner/nzbdav.go +++ b/internal/importer/scanner/nzbdav.go @@ -133,7 +133,7 @@ func (n *NzbDavImporter) performImport(ctx context.Context, dbPath string, rootF } // Create workers - numWorkers := 20 // 20 parallel workers for file creation + numWorkers := 4 // Use fewer parallel workers for file creation var workerWg sync.WaitGroup batchChan := make(chan *database.ImportQueueItem, 100) @@ -145,6 +145,19 @@ func (n *NzbDavImporter) performImport(ctx context.Context, dbPath string, rootF n.processBatch(ctx, batchChan) }() + // Monitor error channel in background to catch query/DB failures early + go func() { + for err := range errChan { + if err != nil { + n.log.ErrorContext(ctx, "Error during NZBDav parsing", "error", err) + n.mu.Lock() + msg := err.Error() + n.info.LastError = &msg + n.mu.Unlock() + } + } + }() + // Start workers for i := 0; i < numWorkers; i++ { workerWg.Add(1) diff --git a/internal/nzbdav/parser.go b/internal/nzbdav/parser.go index 8c07a6bb7..b8033ceb5 100644 --- a/internal/nzbdav/parser.go +++ b/internal/nzbdav/parser.go @@ -7,7 +7,6 @@ import ( "io" "log/slog" "strings" - "sync" "text/template" _ "github.com/mattn/go-sqlite3" @@ -27,76 +26,166 @@ func (p *Parser) Parse() (<-chan *ParsedNzb, <-chan error) { errChan := make(chan error, 1) go func() { - // WaitGroup to track active writers - var wg sync.WaitGroup - - db, err := sql.Open("sqlite3", p.dbPath) + // Open in read-only mode to avoid locking issues + db, err := sql.Open("sqlite3", p.dbPath+"?mode=ro&_journal_mode=WAL") if err != nil { errChan <- fmt.Errorf("failed to open database: %w", err) close(out) close(errChan) return } - - // Ensure DB is closed only after all writers are done + + // Ensure DB is closed only after processing is done defer func() { - wg.Wait() db.Close() close(out) close(errChan) }() - - // Enable WAL mode for better concurrency if possible, - // but simple connection reuse is already a big win. + // Set limits to prevent file descriptor exhaustion db.SetMaxOpenConns(25) db.SetMaxIdleConns(10) - // Query to find all "Release" folders - // A release folder is a parent of any item that has an entry in DavNzbFiles or DavRarFiles + // Log available tables for debugging + tableRows, err := db.Query("SELECT name FROM sqlite_master WHERE type='table'") + if err == nil { + var tables []string + for tableRows.Next() { + var name string + tableRows.Scan(&name) + tables = append(tables, name) + } + tableRows.Close() + slog.Info("NZBDav Database Tables", "tables", tables) + } + + // Query ALL files, ordered by ParentId + // This groups files belonging to the same release together efficiently rows, err := db.Query(` - SELECT DISTINCT p.Id, p.Name, p.Path - FROM DavItems p - JOIN DavItems c ON c.ParentId = p.Id + SELECT + COALESCE(p.Id, 'root') as ReleaseId, + COALESCE(p.Name, 'root') as ReleaseName, + COALESCE(p.Path, '/') as ReleasePath, + c.Id as FileId, + c.Name as FileName, + c.FileSize, + n.SegmentIds, + r.RarParts, + m.Metadata as MultipartMetadata + FROM DavItems c + LEFT JOIN DavItems p ON c.ParentId = p.Id LEFT JOIN DavNzbFiles n ON n.Id = c.Id LEFT JOIN DavRarFiles r ON r.Id = c.Id - WHERE n.Id IS NOT NULL OR r.Id IS NOT NULL + LEFT JOIN DavMultipartFiles m ON m.Id = c.Id + WHERE (n.Id IS NOT NULL OR r.Id IS NOT NULL OR m.Id IS NOT NULL) + ORDER BY c.ParentId, c.Name `) if err != nil { - errChan <- fmt.Errorf("failed to query releases: %w", err) + errChan <- fmt.Errorf("failed to query files: %w", err) return } defer rows.Close() + slog.Debug("NZBDav file query completed, starting iteration") + + var currentParentId string + var currentWriter *io.PipeWriter + count := 0 + + // cleanupCurrent ensures the current writer is properly closed + cleanupCurrent := func() { + if currentWriter != nil { + // Write NZB Footer + if _, err := currentWriter.Write([]byte("")); err != nil { + slog.Error("Failed to write NZB footer", "error", err) + } + currentWriter.Close() + currentWriter = nil + } + } + defer cleanupCurrent() for rows.Next() { - var id, name, path string - if err := rows.Scan(&id, &name, &path); err != nil { - slog.Error("Failed to scan release row", "error", err) + var releaseId, releaseName, releasePath string + var fileId, fileName string + var fileSize sql.NullInt64 + var segmentIdsJSON, rarPartsJSON, multipartMetadataJSON sql.RawBytes + + if err := rows.Scan(&releaseId, &releaseName, &releasePath, &fileId, &fileName, &fileSize, &segmentIdsJSON, &rarPartsJSON, &multipartMetadataJSON); err != nil { + slog.Error("Failed to scan row", "error", err) continue } - // Create pipe for streaming content - pr, pw := io.Pipe() - - wg.Add(1) - go func(rid, rname string, writer *io.PipeWriter) { - defer wg.Done() - p.writeNzb(db, rid, rname, writer) - }(id, name, pw) - - category := p.deriveCategory(path) - select { - case out <- &ParsedNzb{ - ID: id, - Name: name, - Category: category, - RelPath: p.deriveRelPath(path, category), - Content: pr, - }: - case <-errChan: // Should not happen given logic, but good practice - return + // Improve release name if it's just "extracted" + if strings.EqualFold(releaseName, "extracted") { + // Try to get the name from the path + pathParts := strings.Split(strings.Trim(releasePath, "/"), "/") + if len(pathParts) > 0 { + // Use the last part of the path that isn't "extracted" + for i := len(pathParts) - 1; i >= 0; i-- { + if !strings.EqualFold(pathParts[i], "extracted") { + releaseName = pathParts[i] + break + } + } + } + } + + count++ + if count%100 == 0 { + slog.Info("NZBDav import progress", "files_scanned", count) + } + + // Check if we switched to a new release + if releaseId != currentParentId || currentWriter == nil { + cleanupCurrent() + + currentParentId = releaseId + slog.Debug("Processing new release", "path", releasePath, "name", releaseName) + + // Create new pipe for this release + pr, pw := io.Pipe() + currentWriter = pw + + // Send ParsedNzb to output channel + category := p.deriveCategory(releasePath) + relPath := p.deriveRelPath(releasePath, category) + + select { + case out <- &ParsedNzb{ + ID: releaseId, + Name: releaseName, + Category: category, + RelPath: relPath, + Content: pr, + }: + case <-errChan: // Context cancelled or error + return + } + + // Write NZB Header + header := ` + + + + ` + template.HTMLEscapeString(releaseName) + ` + +` + if _, err := currentWriter.Write([]byte(header)); err != nil { + slog.Error("Failed to write NZB header", "release", releaseName, "error", err) + currentWriter.CloseWithError(err) + currentWriter = nil + continue + } + } + + // Write File Entry + if err := p.writeFileEntry(currentWriter, fileId, fileName, fileSize, segmentIdsJSON, rarPartsJSON, multipartMetadataJSON); err != nil { + slog.Error("Failed to write file entry", "file", fileName, "error", err) + currentWriter.CloseWithError(err) + currentWriter = nil } } + slog.Info("NZBDav import scan completed", "total_files", count) }() return out, errChan @@ -119,8 +208,6 @@ func (p *Parser) deriveRelPath(path, category string) string { path = strings.Trim(path, "/") // 2. Identify and remove category prefix - // We want to find the category folder in the path and return everything after it - // e.g. /content/tv/Show/Season 1 -> Show/Season 1 parts := strings.Split(path, "/") // Remove the last part (Release Name) as that is handled by the release name itself @@ -143,197 +230,202 @@ func (p *Parser) deriveRelPath(path, category string) string { return strings.Join(parts[categoryIndex+1:], "/") } - // If category not found or it's the last folder, return empty - // This prevents returning "content/tv" which results in "tv/content/tv" return "" } type rarPart struct { SegmentIds []string `json:"SegmentIds"` - PartSize int64 `json:"PartSize"` - Offset int64 `json:"Offset"` ByteCount int64 `json:"ByteCount"` } -// writeNzb generates the NZB XML and writes it to the pipe -// It uses the shared DB connection pool -func (p *Parser) writeNzb(db *sql.DB, releaseId, releaseName string, pw *io.PipeWriter) { - defer pw.Close() - - // Fetch files for this release - rows, err := db.Query(` - SELECT c.Id, c.Name, c.FileSize, n.SegmentIds, r.RarParts - FROM DavItems c - LEFT JOIN DavNzbFiles n ON n.Id = c.Id - LEFT JOIN DavRarFiles r ON r.Id = c.Id - WHERE c.ParentId = ? AND (n.Id IS NOT NULL OR r.Id IS NOT NULL) - `, releaseId) - if err != nil { - pw.CloseWithError(fmt.Errorf("failed to query files: %w", err)) - return - } - defer rows.Close() +type multipartMetadata struct { + AesParams *aesParams `json:"AesParams"` + FileParts []filePart `json:"FileParts"` +} - // Start writing NZB Header - header := ` - - - - ` + template.HTMLEscapeString(releaseName) + ` - -` - if _, err := pw.Write([]byte(header)); err != nil { - return - } +type aesParams struct { + DecodedSize int64 `json:"DecodedSize"` + Iv string `json:"Iv"` + Key string `json:"Key"` +} - // Iterate files and write segments - for rows.Next() { - var fileId, fileName string - var fileSize sql.NullInt64 - var segmentIdsJSON sql.NullString - var rarPartsJSON sql.NullString - - if err := rows.Scan(&fileId, &fileName, &fileSize, &segmentIdsJSON, &rarPartsJSON); err != nil { - slog.Error("Failed to scan file row", "error", err) - continue - } - - if segmentIdsJSON.Valid { - var segmentIds []string - if err := json.Unmarshal([]byte(segmentIdsJSON.String), &segmentIds); err != nil { - slog.Error("Failed to unmarshal segment IDs", "file", fileName, "error", err) - continue - } - - if len(segmentIds) == 0 { - continue - } - - // Calculate segment size - totalBytes := int64(0) - if fileSize.Valid { - totalBytes = fileSize.Int64 - } - - // Estimate bytes per segment - bytesPerSegment := int64(0) - if totalBytes > 0 { - bytesPerSegment = totalBytes / int64(len(segmentIds)) - } - - // Write File Header - subject := template.HTMLEscapeString(fileName) - if fileId != "" { - subject = fmt.Sprintf("NZBDAV_ID:%s %s", template.HTMLEscapeString(fileId), template.HTMLEscapeString(fileName)) - } +type filePart struct { + SegmentIds []string `json:"SegmentIds"` +} + +// writeFileEntry writes a single file's segments to the NZB writer +func (p *Parser) writeFileEntry(w io.Writer, fileId, fileName string, fileSize sql.NullInt64, segmentIdsJSON, rarPartsJSON, multipartMetadataJSON sql.RawBytes) error { + if len(segmentIdsJSON) > 0 { + var segmentIds []string + if err := json.Unmarshal(segmentIdsJSON, &segmentIds); err != nil { + return fmt.Errorf("failed to unmarshal segment IDs: %w", err) + } + + if len(segmentIds) == 0 { + return nil + } + + // Calculate segment size + totalBytes := int64(0) + if fileSize.Valid { + totalBytes = fileSize.Int64 + } + + // Estimate bytes per segment + bytesPerSegment := int64(0) + if totalBytes > 0 { + bytesPerSegment = totalBytes / int64(len(segmentIds)) + } - fileHeader := fmt.Sprintf(` + // Write File Header + subject := template.HTMLEscapeString(fileName) + if fileId != "" { + subject = fmt.Sprintf("NZBDAV_ID:%s %s", template.HTMLEscapeString(fileId), template.HTMLEscapeString(fileName)) + } + + fileHeader := fmt.Sprintf(` alt.binaries.test - `, 0, subject) +`, 0, subject) - if _, err := pw.Write([]byte(fileHeader)); err != nil { - return - } + if _, err := w.Write([]byte(fileHeader)); err != nil { + return err + } + + // Write Segments + for i, msgId := range segmentIds { + segBytes := bytesPerSegment + // Adjust last segment size + if i == len(segmentIds)-1 && totalBytes > 0 { + segBytes = totalBytes - (bytesPerSegment * int64(i)) + } + if segBytes <= 0 { + segBytes = 1 // Fallback + } + + segmentLine := fmt.Sprintf(` %s +`, segBytes, i+1, template.HTMLEscapeString(msgId)) + + if _, err := w.Write([]byte(segmentLine)); err != nil { + return err + } + } + + if _, err := w.Write([]byte(" \n\t\n")); err != nil { + return err + } + } else if len(rarPartsJSON) > 0 { + var parts []rarPart + if err := json.Unmarshal(rarPartsJSON, &parts); err != nil { + return fmt.Errorf("failed to unmarshal RAR parts: %w", err) + } + + for partIdx, part := range parts { + if len(part.SegmentIds) == 0 { + continue + } + + partFileName := fmt.Sprintf("%s.part%02d.rar", fileName, partIdx+1) + totalBytes := part.ByteCount + bytesPerSegment := int64(0) + if totalBytes > 0 { + bytesPerSegment = totalBytes / int64(len(part.SegmentIds)) + } + + // Write File Header + subject := template.HTMLEscapeString(partFileName) + if fileId != "" { + subject = fmt.Sprintf("NZBDAV_ID:%s %s", template.HTMLEscapeString(fileId), template.HTMLEscapeString(partFileName)) + } + + fileHeader := fmt.Sprintf(` + + alt.binaries.test + + +`, 0, subject) + + if _, err := w.Write([]byte(fileHeader)); err != nil { + return err + } // Write Segments - for i, msgId := range segmentIds { + for i, msgId := range part.SegmentIds { segBytes := bytesPerSegment - // Adjust last segment size - if i == len(segmentIds)-1 && totalBytes > 0 { + if i == len(part.SegmentIds)-1 && totalBytes > 0 { segBytes = totalBytes - (bytesPerSegment * int64(i)) } if segBytes <= 0 { - segBytes = 1 // Fallback + segBytes = 1 } segmentLine := fmt.Sprintf(` %s `, segBytes, i+1, template.HTMLEscapeString(msgId)) - if _, err := pw.Write([]byte(segmentLine)); err != nil { - return + if _, err := w.Write([]byte(segmentLine)); err != nil { + return err } } - // Write File Footer - if _, err := pw.Write([]byte(` - -`)); err != nil { - return - } - } else if rarPartsJSON.Valid { - var parts []rarPart - if err := json.Unmarshal([]byte(rarPartsJSON.String), &parts); err != nil { - slog.Error("Failed to unmarshal RAR parts", "file", fileName, "error", err) - continue - } + if _, err := w.Write([]byte(" \n\t\n")); err != nil { - for partIdx, part := range parts { - if len(part.SegmentIds) == 0 { - continue - } + return err - // If it's a RAR archive, we should name the files accordingly so the importer detects it - // If fileName is "movie.mkv", we name parts "movie.mkv.part01.rar", etc. - // This ensures the importer processes it as an archive and gets the real file inside. - partFileName := fmt.Sprintf("%s.part%02d.rar", fileName, partIdx+1) + } - totalBytes := part.ByteCount - bytesPerSegment := int64(0) - if totalBytes > 0 { - bytesPerSegment = totalBytes / int64(len(part.SegmentIds)) - } + } - // Write File Header - subject := template.HTMLEscapeString(partFileName) - if fileId != "" { - subject = fmt.Sprintf("NZBDAV_ID:%s %s", template.HTMLEscapeString(fileId), template.HTMLEscapeString(partFileName)) - } + } else if len(multipartMetadataJSON) > 0 { + + var meta multipartMetadata + + if err := json.Unmarshal(multipartMetadataJSON, &meta); err != nil { + + return fmt.Errorf("failed to unmarshal multipart metadata: %w", err) + + } + + for partIdx, part := range meta.FileParts { + if len(part.SegmentIds) == 0 { + continue + } + + partFileName := fmt.Sprintf("%s.part%02d", fileName, partIdx+1) + extraMeta := "" + if meta.AesParams != nil { + extraMeta = fmt.Sprintf("AES_KEY:%s AES_IV:%s DECODED_SIZE:%d ", + meta.AesParams.Key, meta.AesParams.Iv, meta.AesParams.DecodedSize) + } - fileHeader := fmt.Sprintf(` + subject := fmt.Sprintf("NZBDAV_ID:%s %s%s", + template.HTMLEscapeString(fileId), extraMeta, template.HTMLEscapeString(partFileName)) + + fileHeader := fmt.Sprintf(` alt.binaries.test `, 0, subject) - if _, err := pw.Write([]byte(fileHeader)); err != nil { - return - } - - // Write Segments - for i, msgId := range part.SegmentIds { - segBytes := bytesPerSegment - // Adjust last segment size - if i == len(part.SegmentIds)-1 && totalBytes > 0 { - segBytes = totalBytes - (bytesPerSegment * int64(i)) - } - if segBytes <= 0 { - segBytes = 1 // Fallback - } + if _, err := w.Write([]byte(fileHeader)); err != nil { + return err + } - segmentLine := fmt.Sprintf(` %s -`, segBytes, i+1, template.HTMLEscapeString(msgId)) + for i, msgId := range part.SegmentIds { + segmentLine := fmt.Sprintf(` %s +`, 750000, i+1, template.HTMLEscapeString(msgId)) - if _, err := pw.Write([]byte(segmentLine)); err != nil { - return - } + if _, err := w.Write([]byte(segmentLine)); err != nil { + return err } + } - // Write File Footer - if _, err := pw.Write([]byte(` - -`)); err != nil { - return - } + if _, err := w.Write([]byte(" \n\t\n")); err != nil { + return err } } } - - // Write NZB Footer - if _, err := pw.Write([]byte(``)); err != nil { - return - } -} + return nil +} \ No newline at end of file diff --git a/internal/nzbdav/parser_test.go b/internal/nzbdav/parser_test.go index c5b925fb5..2bdf4133b 100644 --- a/internal/nzbdav/parser_test.go +++ b/internal/nzbdav/parser_test.go @@ -37,6 +37,10 @@ func TestParser_Parse(t *testing.T) { Id TEXT PRIMARY KEY, RarParts TEXT ); + CREATE TABLE DavMultipartFiles ( + Id TEXT PRIMARY KEY, + Metadata TEXT + ); `) require.NoError(t, err) @@ -47,10 +51,14 @@ func TestParser_Parse(t *testing.T) { ('root', NULL, '/', 1, '/'), ('movies', 'root', 'movies', 1, '/movies'), ('rel1', 'movies', 'My.Release.1080p', 1, '/movies/My.Release.1080p'), - ('file1', 'rel1', 'movie.mkv', 0, '/movies/My.Release.1080p/movie.mkv'); + ('file1', 'rel1', 'movie.mkv', 0, '/movies/My.Release.1080p/movie.mkv'), + ('rel2', 'movies', 'Actual.Movie.Name', 1, '/movies/Actual.Movie.Name'), + ('ext', 'rel2', 'extracted', 1, '/movies/Actual.Movie.Name/extracted'), + ('file2', 'ext', 'movie2.mkv', 0, '/movies/Actual.Movie.Name/extracted/movie2.mkv'); INSERT INTO DavNzbFiles (Id, SegmentIds) VALUES - ('file1', '["msg1@test", "msg2@test"]'); + ('file1', '["msg1@test", "msg2@test"]'), + ('file2', '["msg3@test"]'); `) require.NoError(t, err) @@ -59,23 +67,48 @@ func TestParser_Parse(t *testing.T) { out, errChan := parser.Parse() // Verify + // Note: ORDER BY c.ParentId, c.Name + // file1 parent is rel1 + // file2 parent is ext + + // Item 1 select { case res, ok := <-out: require.True(t, ok) - assert.Equal(t, "My.Release.1080p", res.Name) - assert.Equal(t, "movies", res.Category) + // file2 (parent 'ext') might come first depending on sorting, but let's see. + // Actually, since we order by ParentId, and IDs are likely UUIDs or sequential. + // In our insert, 'ext' comes after 'rel1' alphabetically? maybe. + + if res.Name == "Actual.Movie.Name" { + assert.Equal(t, "movies", res.Category) + content, _ := io.ReadAll(res.Content) + assert.Contains(t, string(content), `Actual.Movie.Name`) + } else { + assert.Equal(t, "My.Release.1080p", res.Name) + assert.Equal(t, "movies", res.Category) + content, _ := io.ReadAll(res.Content) + assert.Contains(t, string(content), `My.Release.1080p`) + assert.Contains(t, string(content), "subject=\"NZBDAV_ID:file1 "movie.mkv"\">") + } - // Read content - content, err := io.ReadAll(res.Content) + case err := <-errChan: require.NoError(t, err) - xmlStr := string(content) - - assert.Contains(t, xmlStr, `My.Release.1080p`) - assert.Contains(t, xmlStr, ``) - assert.Contains(t, xmlStr, `>msg1@test`) - assert.Contains(t, xmlStr, `>msg2@test`) + } + // Item 2 + select { + case res, ok := <-out: + require.True(t, ok) + if res.Name == "Actual.Movie.Name" { + assert.Equal(t, "movies", res.Category) + content, _ := io.ReadAll(res.Content) + assert.Contains(t, string(content), `Actual.Movie.Name`) + } else { + assert.Equal(t, "My.Release.1080p", res.Name) + assert.Equal(t, "movies", res.Category) + content, _ := io.ReadAll(res.Content) + assert.Contains(t, string(content), `My.Release.1080p`) + } case err := <-errChan: require.NoError(t, err) } @@ -83,4 +116,4 @@ func TestParser_Parse(t *testing.T) { // Should be no more items _, ok := <-out assert.False(t, ok) -} +} \ No newline at end of file