diff --git a/frontend/src/components/config/ArrsConfigSection.tsx b/frontend/src/components/config/ArrsConfigSection.tsx index 9fa96fcc6..872898a18 100644 --- a/frontend/src/components/config/ArrsConfigSection.tsx +++ b/frontend/src/components/config/ArrsConfigSection.tsx @@ -336,6 +336,39 @@ export function ArrsConfigSection({ +
+
+ Cleanup Grace Period +
+ handleFormChange("queue_cleanup_grace_period_minutes", parseInt(e.target.value) || 10)} + min={0} + disabled={isReadOnly} + /> + min +
+

Wait time before considering a failed item "stuck" and eligible for cleanup.

+
+ +
+ Import Failure Cleanup + +

Automatically remove items from queue that failed with "Automatic Import" errors.

+
+
+
Allowlist (Ignore Errors)
diff --git a/frontend/src/components/config/HealthConfigSection.tsx b/frontend/src/components/config/HealthConfigSection.tsx index eda5a8694..5dd12343b 100644 --- a/frontend/src/components/config/HealthConfigSection.tsx +++ b/frontend/src/components/config/HealthConfigSection.tsx @@ -322,6 +322,33 @@ export function HealthConfigSection({

How often to scan your library for new files.

+ +
+
+ Health Check Loop Interval (Sec) + handleInputChange("check_interval_seconds", Number.parseInt(e.target.value, 10) || 5)} + /> +

Idle time between background health check cycles.

+
+
+ Sync Concurrency + handleInputChange("library_sync_concurrency", Number.parseInt(e.target.value, 10) || 0)} + /> +

Max parallel file scans during sync (0 = auto).

+
+
diff --git a/frontend/src/components/config/MountConfigSection.tsx b/frontend/src/components/config/MountConfigSection.tsx index 8606014f7..ba330461d 100644 --- a/frontend/src/components/config/MountConfigSection.tsx +++ b/frontend/src/components/config/MountConfigSection.tsx @@ -933,6 +933,21 @@ function FuseMountSubSection({ config, isRunning, onFormDataChange }: FuseSubSec Allow other users to access mount +
+ Prefetch Concurrency + + updateField({ + prefetch_concurrency: Number.parseInt(e.target.value, 10) || 0, + }) + } + disabled={isRunning} + /> +

Number of parallel segment downloads during prefetch (0 = auto).

+
diff --git a/frontend/src/components/config/SystemConfigSection.tsx b/frontend/src/components/config/SystemConfigSection.tsx index 06f94ca22..eb44e7596 100644 --- a/frontend/src/components/config/SystemConfigSection.tsx +++ b/frontend/src/components/config/SystemConfigSection.tsx @@ -29,6 +29,7 @@ export function SystemConfigSection({ max_backups: config.log.max_backups, compress: config.log.compress, }); + const [profilerEnabled, setProfilerEnabled] = useState(config.profiler_enabled); const [hasChanges, setHasChanges] = useState(false); const regenerateAPIKey = useRegenerateAPIKey(); @@ -45,8 +46,9 @@ export function SystemConfigSection({ compress: config.log.compress, }; setFormData(newFormData); + setProfilerEnabled(config.profiler_enabled); setHasChanges(false); - }, [config.log]); + }, [config.log, config.profiler_enabled]); const handleInputChange = (field: keyof LogFormData, value: string | number | boolean) => { const newData = { ...formData, [field]: value }; @@ -55,12 +57,20 @@ export function SystemConfigSection({ file: config.log.file, level: config.log.level, max_size: config.log.max_size, max_age: config.log.max_age, max_backups: config.log.max_backups, compress: config.log.compress, }; - setHasChanges(JSON.stringify(newData) !== JSON.stringify(configData)); + setHasChanges(JSON.stringify(newData) !== JSON.stringify(configData) || profilerEnabled !== config.profiler_enabled); + }; + + const handleProfilerChange = (enabled: boolean) => { + setProfilerEnabled(enabled); + setHasChanges(true); }; const handleSave = async () => { if (onUpdate && hasChanges) { - await onUpdate("log", formData); + // We need a way to update profiler_enabled too. + // In ConfigurationPage, onUpdate for 'log' updates 'system' section which includes log. + // Let's assume the backend handles both if we send them. + await onUpdate("log", { ...formData, profiler_enabled: profilerEnabled } as any); setHasChanges(false); } }; @@ -109,21 +119,95 @@ export function SystemConfigSection({
-
- Minimum Log Level - handleInputChange("level", e.target.value)} + > + + + + + +

Determines how much information is stored in logs.

+
+ +
+ Max Log Size (MB) + handleInputChange("max_size", parseInt(e.target.value) || 0)} + /> +
+ + +
+
+ Max Age (Days) + handleInputChange("max_age", parseInt(e.target.value) || 0)} + /> +
+
+ Max Backups + handleInputChange("max_backups", parseInt(e.target.value) || 0)} + /> +
+
+ Compress Logs +
+ handleInputChange("compress", e.target.checked)} + /> +
+
+
+ + + {/* Performance Profiler */} +
+
+ +

Performance

+
+
+ +
+
+
System Profiler (pprof)
+

+ Enable Go runtime profiling at /debug/pprof. + Only recommended for debugging resource leaks. +

+
+ handleInputChange("level", e.target.value)} - > - - - - - -

Determines how much information is stored in logs.

- + onChange={(e) => handleProfilerChange(e.target.checked)} + /> +
{/* Security Section */} diff --git a/frontend/src/components/config/WorkersConfigSection.tsx b/frontend/src/components/config/WorkersConfigSection.tsx index 0803d5d1a..6dc4afe76 100644 --- a/frontend/src/components/config/WorkersConfigSection.tsx +++ b/frontend/src/components/config/WorkersConfigSection.tsx @@ -113,6 +113,34 @@ export function ImportConfigSection({

Socket limit per active worker.

+ +
+
+ Max Download Prefetch + handleInputChange("max_download_prefetch", Number.parseInt(e.target.value, 10) || 1)} + /> +

Segments prefetched ahead for archive analysis.

+
+ +
+ Read Timeout (Seconds) + handleInputChange("read_timeout_seconds", Number.parseInt(e.target.value, 10) || 300)} + /> +

Usenet socket read timeout.

+
+
{/* Validation Slider */} @@ -213,6 +241,45 @@ export function ImportConfigSection({ )} + +
+ +
+
+
NZB Watch Directory
+

+ Monitor a specific folder for new NZB files and import them automatically. +

+
+ +
+
+ Watch Directory Path + handleInputChange("watch_dir", e.target.value)} + /> +

Absolute path to monitor.

+
+ +
+ Polling Interval (Seconds) + handleInputChange("watch_interval_seconds", Number.parseInt(e.target.value, 10) || 10)} + /> +

How often to check for new files.

+
+
+
{/* File Extensions */} diff --git a/frontend/src/pages/ConfigurationPage.tsx b/frontend/src/pages/ConfigurationPage.tsx index 56eec49a3..c0d7ebb4d 100644 --- a/frontend/src/pages/ConfigurationPage.tsx +++ b/frontend/src/pages/ConfigurationPage.tsx @@ -252,9 +252,14 @@ export function ConfigurationPage() { config: { providers: data as unknown as ProviderConfig[] }, }); } else if (section === "log") { + const logData = data as unknown as LogFormData & { profiler_enabled?: boolean }; + const { profiler_enabled, ...logConfig } = logData; await updateConfigSection.mutateAsync({ section: "system", - config: { log: data as unknown as LogFormData }, + config: { + log: logConfig, + profiler_enabled: profiler_enabled + }, }); } } catch (error) { diff --git a/frontend/src/types/config.ts b/frontend/src/types/config.ts index 46afbd3f3..8eb472d41 100644 --- a/frontend/src/types/config.ts +++ b/frontend/src/types/config.ts @@ -22,6 +22,7 @@ export interface ConfigResponse { mount_path: string; mount_type: MountType; api_key?: string; + profiler_enabled: boolean; } // WebDAV server configuration @@ -79,6 +80,7 @@ export interface HealthConfig { max_concurrent_jobs?: number; // Max concurrent health check jobs segment_sample_percentage?: number; // Percentage of segments to check (1-100) library_sync_interval_minutes?: number; // Library sync interval in minutes (optional) + library_sync_concurrency?: number; check_all_segments?: boolean; // Whether to check all segments or use sampling resolve_repair_on_import?: boolean; // Automatically resolve pending repairs in the same directory when a new file is imported verify_data?: boolean; // Verify 1 byte of data for each segment @@ -186,6 +188,7 @@ export interface FuseConfig { disk_cache_expiry_hours?: number; chunk_size_mb?: number; read_ahead_chunks?: number; + prefetch_concurrency?: number; } // Import strategy type @@ -273,6 +276,7 @@ export interface ConfigUpdateRequest { providers?: ProviderUpdateRequest[]; mount_path?: string; mount_type?: MountType; + profiler_enabled?: boolean; } // WebDAV update request @@ -318,6 +322,7 @@ export interface HealthUpdateRequest { max_connections_for_health_checks?: number; max_concurrent_jobs?: number; // Max concurrent health check jobs library_sync_interval_minutes?: number; // Library sync interval in minutes (optional) + library_sync_concurrency?: number; check_all_segments?: boolean; // Whether to check all segments or use sampling resolve_repair_on_import?: boolean; verify_data?: boolean; @@ -469,6 +474,8 @@ export interface APIFormData { export interface ImportFormData { max_processor_workers: number; queue_processing_interval_seconds: number; // Interval in seconds for queue processing + max_download_prefetch: number; + read_timeout_seconds: number; import_strategy: ImportStrategy; import_dir: string; watch_dir?: string; diff --git a/internal/api/config_handlers.go b/internal/api/config_handlers.go index 42569550c..e2df11e0b 100644 --- a/internal/api/config_handlers.go +++ b/internal/api/config_handlers.go @@ -174,6 +174,8 @@ func (s *Server) handlePatchConfigSection(c *fiber.Ctx) error { switch section { case "webdav", "api", "auth", "database", "metadata", "streaming", "health", "rclone", "import", "log", "sabnzbd", "arrs", "fuse", "system", "mount_path", "mount", "providers": err = c.BodyParser(newConfig) + // BodyParser will map fields like "profiler_enabled" from JSON to the root of newConfig + // because Config struct has it with `json:"profiler_enabled"`. default: return RespondValidationError(c, fmt.Sprintf("Unknown configuration section: %s", section), "INVALID_SECTION") } diff --git a/internal/api/types.go b/internal/api/types.go index c567b6614..a9bb6d62d 100644 --- a/internal/api/types.go +++ b/internal/api/types.go @@ -14,12 +14,13 @@ import ( // ConfigAPIResponse wraps config.Config with sensitive data handling type ConfigAPIResponse struct { *config.Config - WebDAV WebDAVAPIResponse `json:"webdav"` - Import ImportAPIResponse `json:"import"` - RClone RCloneAPIResponse `json:"rclone"` - SABnzbd SABnzbdAPIResponse `json:"sabnzbd"` - Providers []ProviderAPIResponse `json:"providers"` - APIKey string `json:"api_key,omitempty"` // User's API key for authentication + WebDAV WebDAVAPIResponse `json:"webdav"` + Import ImportAPIResponse `json:"import"` + RClone RCloneAPIResponse `json:"rclone"` + SABnzbd SABnzbdAPIResponse `json:"sabnzbd"` + Providers []ProviderAPIResponse `json:"providers"` + APIKey string `json:"api_key,omitempty"` // User's API key for authentication + ProfilerEnabled bool `json:"profiler_enabled"` } // WebDAVAPIResponse sanitizes WebDAV config for API responses @@ -117,6 +118,7 @@ type ImportAPIResponse struct { AllowedFileExtensions []string `json:"allowed_file_extensions"` MaxImportConnections int `json:"max_import_connections"` MaxDownloadPrefetch int `json:"max_download_prefetch"` + ReadTimeoutSeconds int `json:"read_timeout_seconds"` SegmentSamplePercentage int `json:"segment_sample_percentage"` // Percentage of segments to check (1-100) ImportStrategy config.ImportStrategy `json:"import_strategy"` ImportDir *string `json:"import_dir,omitempty"` @@ -241,13 +243,14 @@ func ToConfigAPIResponse(cfg *config.Config, apiKey string) *ConfigAPIResponse { } return &ConfigAPIResponse{ - Config: cfg, - WebDAV: webdavResp, - Import: ToImportAPIResponse(cfg.Import), - RClone: rcloneResp, - SABnzbd: sabnzbdResp, - Providers: providers, - APIKey: apiKey, + Config: cfg, + WebDAV: webdavResp, + Import: ToImportAPIResponse(cfg.Import), + RClone: rcloneResp, + SABnzbd: sabnzbdResp, + Providers: providers, + APIKey: apiKey, + ProfilerEnabled: cfg.ProfilerEnabled, } } @@ -258,6 +261,7 @@ func ToImportAPIResponse(importConfig config.ImportConfig) ImportAPIResponse { AllowedFileExtensions: importConfig.AllowedFileExtensions, MaxImportConnections: importConfig.MaxImportConnections, MaxDownloadPrefetch: importConfig.MaxDownloadPrefetch, + ReadTimeoutSeconds: importConfig.ReadTimeoutSeconds, SegmentSamplePercentage: importConfig.SegmentSamplePercentage, ImportStrategy: importConfig.ImportStrategy, ImportDir: importConfig.ImportDir, diff --git a/internal/fuse/dir.go b/internal/fuse/dir.go index d74efb18a..40c7723db 100644 --- a/internal/fuse/dir.go +++ b/internal/fuse/dir.go @@ -22,6 +22,8 @@ var _ fs.NodeRenamer = (*Dir)(nil) var _ fs.NodeSetattrer = (*Dir)(nil) var _ fs.NodeStatfser = (*Dir)(nil) var _ fs.NodeMkdirer = (*Dir)(nil) +var _ fs.NodeUnlinker = (*Dir)(nil) +var _ fs.NodeRmdirer = (*Dir)(nil) // Dir represents a directory in the FUSE filesystem. // Talks directly to NzbFilesystem with FUSE context propagation. @@ -184,16 +186,51 @@ func (d *Dir) Rename(ctx context.Context, oldName string, newParent fs.InodeEmbe newPath := filepath.Join(targetDir.path, newName) if err := d.nzbfs.Rename(ctx, oldPath, newPath); err != nil { - if os.IsNotExist(err) { - return syscall.ENOENT - } - d.logger.ErrorContext(ctx, "Rename failed", "old", oldPath, "new", newPath, "error", err) - return syscall.EIO + return mapError(err, d.logger, ctx, "Rename failed", "old", oldPath, "new", newPath) + } + + return 0 +} + +// Unlink implements fs.NodeUnlinker — removes a file. +func (d *Dir) Unlink(ctx context.Context, name string) syscall.Errno { + fullPath := filepath.Join(d.path, name) + + if err := d.nzbfs.Remove(ctx, fullPath); err != nil { + return mapError(err, d.logger, ctx, "Unlink failed", "path", fullPath) } return 0 } +// Rmdir implements fs.NodeRmdirer — removes an empty directory. +func (d *Dir) Rmdir(ctx context.Context, name string) syscall.Errno { + fullPath := filepath.Join(d.path, name) + + if err := d.nzbfs.Remove(ctx, fullPath); err != nil { + return mapError(err, d.logger, ctx, "Rmdir failed", "path", fullPath) + } + + return 0 +} + +// mapError maps os-level errors to FUSE errno values. +func mapError(err error, logger *slog.Logger, ctx context.Context, msg string, args ...any) syscall.Errno { + if os.IsNotExist(err) { + return syscall.ENOENT + } + if os.IsPermission(err) { + return syscall.EACCES + } + if os.IsExist(err) { + return syscall.EEXIST + } + logArgs := append([]any{}, args...) + logArgs = append(logArgs, "error", err) + logger.ErrorContext(ctx, msg, logArgs...) + return syscall.EIO +} + // Readdir implements fs.NodeReaddirer. func (d *Dir) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno) { // Open directory via NzbFilesystem with FUSE context diff --git a/internal/importer/postprocessor/id_linker.go b/internal/importer/postprocessor/id_linker.go index d6d1024c7..fe9456d26 100644 --- a/internal/importer/postprocessor/id_linker.go +++ b/internal/importer/postprocessor/id_linker.go @@ -3,14 +3,9 @@ package postprocessor import ( "context" "encoding/json" - "io/fs" - "os" - "path/filepath" - "strings" "github.com/javi11/altmount/internal/database" metapb "github.com/javi11/altmount/internal/metadata/proto" - "google.golang.org/protobuf/proto" ) // HandleIDMetadataLinks creates ID-based metadata links for nzbdav compatibility @@ -21,88 +16,19 @@ func (c *Coordinator) HandleIDMetadataLinks(ctx context.Context, item *database. NzbdavID string `json:"nzbdav_id"` } if err := json.Unmarshal([]byte(*item.Metadata), &meta); err == nil && meta.NzbdavID != "" { - if err := c.createIDMetadataLink(meta.NzbdavID, resultingPath); err != nil { + if err := c.metadataService.UpdateIDSymlink(meta.NzbdavID, resultingPath); err != nil { c.log.Warn("Failed to create release ID metadata link", "id", meta.NzbdavID, "error", err) } } } - // 2. Check individual files for IDs - cfg := c.configGetter() - metadataPath := filepath.Join(cfg.Metadata.RootPath, strings.TrimPrefix(resultingPath, "/")) - - _ = filepath.WalkDir(metadataPath, func(path string, d fs.DirEntry, err error) error { - if err != nil || d.IsDir() || !strings.HasSuffix(d.Name(), ".meta") { - return nil - } - - // Read the metadata file to find the ID - data, err := os.ReadFile(path) - if err != nil { - return nil - } - - // Parse the protobuf metadata to get the ID - meta := &metapb.FileMetadata{} - if err := proto.Unmarshal(data, meta); err != nil { - return nil - } - - // Check sidecar ID file if not in proto (compatibility mode) - if meta.NzbdavId == "" { - if idData, err := os.ReadFile(path + ".id"); err == nil { - meta.NzbdavId = string(idData) - } - } - + // 2. Check individual files for IDs using MetadataService walker + _ = c.metadataService.WalkDirectoryFiles(resultingPath, func(fileVirtualPath string, meta *metapb.FileMetadata) error { if meta.NzbdavId != "" { - // Calculate the virtual path from the metadata file path - relPath, err := filepath.Rel(cfg.Metadata.RootPath, path) - if err != nil { - return nil - } - // Remove .meta extension - virtualPath := strings.TrimSuffix(relPath, ".meta") - - if err := c.createIDMetadataLink(meta.NzbdavId, virtualPath); err != nil { + if err := c.metadataService.UpdateIDSymlink(meta.NzbdavId, fileVirtualPath); err != nil { c.log.Warn("Failed to create file ID metadata link", "id", meta.NzbdavId, "error", err) } } - return nil }) } - -// createIDMetadataLink creates a symlink from an ID-based sharded path to the metadata file -func (c *Coordinator) createIDMetadataLink(nzbdavID, resultingPath string) error { - cfg := c.configGetter() - metadataRoot := cfg.Metadata.RootPath - - // Calculate sharded path - // 04db0bde-7ad0-46a3-a2f4-9ef8efd0d7d7 -> .ids/0/4/d/b/0/04db0bde-7ad0-46a3-a2f4-9ef8efd0d7d7.meta - id := strings.ToLower(nzbdavID) - if len(id) < 5 { - return nil // Invalid ID for sharding - } - - shardPath := filepath.Join(".ids", string(id[0]), string(id[1]), string(id[2]), string(id[3]), string(id[4])) - fullShardDir := filepath.Join(metadataRoot, shardPath) - - if err := os.MkdirAll(fullShardDir, 0755); err != nil { - return err - } - - targetMetaPath := c.metadataService.GetMetadataFilePath(resultingPath) - linkPath := filepath.Join(fullShardDir, id+".meta") - - // Remove if exists - os.Remove(linkPath) - - // Create relative symlink if possible - relTarget, err := filepath.Rel(fullShardDir, targetMetaPath) - if err != nil { - return os.Symlink(targetMetaPath, linkPath) - } - - return os.Symlink(relTarget, linkPath) -} diff --git a/internal/metadata/service.go b/internal/metadata/service.go index c5f306f45..069e52029 100644 --- a/internal/metadata/service.go +++ b/internal/metadata/service.go @@ -3,6 +3,8 @@ package metadata import ( "context" "fmt" + "io" + "io/fs" "log/slog" "os" "path/filepath" @@ -274,6 +276,12 @@ func (ms *MetadataService) DeleteFileMetadataWithSourceNzb(ctx context.Context, return fmt.Errorf("failed to delete metadata file: %w", err) } + // Clean up .id sidecar file + idPath := metadataPath + ".id" + if removeErr := os.Remove(idPath); removeErr != nil && !os.IsNotExist(removeErr) { + slog.DebugContext(ctx, "Failed to remove .id sidecar file", "path", idPath, "error", removeErr) + } + // Optionally delete the source NZB file (error-tolerant) if deleteSourceNzb && sourceNzbPath != "" { if err := os.Remove(sourceNzbPath); err != nil { @@ -304,6 +312,152 @@ func (ms *MetadataService) DeleteDirectory(virtualPath string) error { return nil } +// RenameFileMetadata atomically renames a metadata file (and its .id sidecar) from oldVirtualPath to newVirtualPath. +// Uses os.Rename for atomicity on the same filesystem, falling back to read-write-delete for cross-device moves. +func (ms *MetadataService) RenameFileMetadata(oldVirtualPath, newVirtualPath string) error { + oldFilename := filepath.Base(oldVirtualPath) + oldDir := filepath.Join(ms.rootPath, filepath.Dir(oldVirtualPath)) + oldMetaPath := filepath.Join(oldDir, oldFilename+".meta") + + newFilename := filepath.Base(newVirtualPath) + newDir := filepath.Join(ms.rootPath, filepath.Dir(newVirtualPath)) + newMetaPath := filepath.Join(newDir, newFilename+".meta") + + // Ensure destination directory exists + if err := os.MkdirAll(newDir, 0755); err != nil { + return fmt.Errorf("failed to create destination metadata directory: %w", err) + } + + // Try atomic rename first + if err := os.Rename(oldMetaPath, newMetaPath); err != nil { + // Fall back to read-write-delete for cross-device moves + if !isCrossDeviceError(err) { + return fmt.Errorf("failed to rename metadata file: %w", err) + } + + if err := copyAndRemoveFile(oldMetaPath, newMetaPath); err != nil { + return fmt.Errorf("failed to copy metadata file across devices: %w", err) + } + } + + // Also rename the .id sidecar file if it exists + oldIDPath := oldMetaPath + ".id" + newIDPath := newMetaPath + ".id" + if _, err := os.Stat(oldIDPath); err == nil { + if err := os.Rename(oldIDPath, newIDPath); err != nil { + // Cross-device fallback for .id file + if isCrossDeviceError(err) { + _ = copyAndRemoveFile(oldIDPath, newIDPath) + } else { + slog.Warn("Failed to rename .id sidecar file", "old", oldIDPath, "new", newIDPath, "error", err) + } + } + } + + return nil +} + +// UpdateIDSymlink creates or updates an ID-based symlink in the .ids/ sharded directory. +func (ms *MetadataService) UpdateIDSymlink(nzbdavID, virtualPath string) error { + id := strings.ToLower(nzbdavID) + if len(id) < 5 { + return nil // Invalid ID for sharding + } + + shardPath := filepath.Join(".ids", string(id[0]), string(id[1]), string(id[2]), string(id[3]), string(id[4])) + fullShardDir := filepath.Join(ms.rootPath, shardPath) + + if err := os.MkdirAll(fullShardDir, 0755); err != nil { + return err + } + + targetMetaPath := ms.GetMetadataFilePath(virtualPath) + linkPath := filepath.Join(fullShardDir, id+".meta") + + // Remove existing symlink if present + os.Remove(linkPath) + + // Create relative symlink + relTarget, err := filepath.Rel(fullShardDir, targetMetaPath) + if err != nil { + return os.Symlink(targetMetaPath, linkPath) + } + + return os.Symlink(relTarget, linkPath) +} + +// RemoveIDSymlink removes an ID-based symlink from the .ids/ sharded directory. +func (ms *MetadataService) RemoveIDSymlink(nzbdavID string) error { + id := strings.ToLower(nzbdavID) + if len(id) < 5 { + return nil + } + + shardPath := filepath.Join(".ids", string(id[0]), string(id[1]), string(id[2]), string(id[3]), string(id[4])) + linkPath := filepath.Join(ms.rootPath, shardPath, id+".meta") + + if err := os.Remove(linkPath); err != nil && !os.IsNotExist(err) { + return err + } + return nil +} + +// WalkDirectoryFiles walks a metadata directory and calls fn for each file's virtual path and metadata. +func (ms *MetadataService) WalkDirectoryFiles(virtualPath string, fn func(fileVirtualPath string, meta *metapb.FileMetadata) error) error { + metadataDir := filepath.Join(ms.rootPath, virtualPath) + + return filepath.WalkDir(metadataDir, func(path string, d fs.DirEntry, err error) error { + if err != nil || d.IsDir() || !strings.HasSuffix(d.Name(), ".meta") { + return nil + } + + relPath, err := filepath.Rel(ms.rootPath, path) + if err != nil { + return nil + } + fileVirtualPath := strings.TrimSuffix(relPath, ".meta") + + meta, err := ms.ReadFileMetadata(fileVirtualPath) + if err != nil || meta == nil { + return nil + } + + return fn(fileVirtualPath, meta) + }) +} + +// isCrossDeviceError checks if an error is a cross-device link error (EXDEV). +func isCrossDeviceError(err error) bool { + return strings.Contains(err.Error(), "cross-device") || strings.Contains(err.Error(), "invalid cross-device link") +} + +// copyAndRemoveFile copies src to dst then removes src. +func copyAndRemoveFile(src, dst string) error { + srcFile, err := os.Open(src) + if err != nil { + return err + } + defer srcFile.Close() + + dstFile, err := os.Create(dst) + if err != nil { + return err + } + defer dstFile.Close() + + if _, err := io.Copy(dstFile, srcFile); err != nil { + os.Remove(dst) // Clean up partial write + return err + } + + if err := dstFile.Close(); err != nil { + return err + } + srcFile.Close() + + return os.Remove(src) +} + // ValidateSourceNzb validates that the source NZB file exists and matches metadata func (ms *MetadataService) ValidateSourceNzb(metadata *metapb.FileMetadata) error { if metadata.SourceNzbPath == "" { diff --git a/internal/nzbfilesystem/metadata_remote_file.go b/internal/nzbfilesystem/metadata_remote_file.go index 60ba83326..40a3d748c 100644 --- a/internal/nzbfilesystem/metadata_remote_file.go +++ b/internal/nzbfilesystem/metadata_remote_file.go @@ -301,6 +301,23 @@ func (mrf *MetadataRemoteFile) RenameFile(ctx context.Context, oldName, newName return false, fmt.Errorf("failed to rename directory: %w", err) } + // Update health records for all files under the renamed directory + if mrf.healthRepository != nil { + if err := mrf.healthRepository.RenameHealthRecord(ctx, normalizedOld, normalizedNew); err != nil { + slog.WarnContext(ctx, "Failed to update health records for renamed directory", "old", normalizedOld, "new", normalizedNew, "error", err) + } + } + + // Update ID symlinks for all files with NzbdavId under the renamed directory + _ = mrf.metadataService.WalkDirectoryFiles(normalizedNew, func(fileVirtualPath string, meta *metapb.FileMetadata) error { + if meta.NzbdavId != "" { + if err := mrf.metadataService.UpdateIDSymlink(meta.NzbdavId, fileVirtualPath); err != nil { + slog.WarnContext(ctx, "Failed to update ID symlink after directory rename", "id", meta.NzbdavId, "path", fileVirtualPath, "error", err) + } + } + return nil + }) + return true, nil } @@ -311,29 +328,28 @@ func (mrf *MetadataRemoteFile) RenameFile(ctx context.Context, oldName, newName return false, nil } - // Read existing metadata + // Read metadata first to get NzbdavId before rename fileMeta, err := mrf.metadataService.ReadFileMetadata(normalizedOld) if err != nil { return false, fmt.Errorf("failed to read old metadata: %w", err) } - // Write to new location - if err := mrf.metadataService.WriteFileMetadata(normalizedNew, fileMeta); err != nil { - return false, fmt.Errorf("failed to write new metadata: %w", err) - } - - // Delete old location - cfg := mrf.configGetter() - deleteSourceNzb := cfg.Metadata.DeleteSourceNzbOnRemoval != nil && *cfg.Metadata.DeleteSourceNzbOnRemoval - if err := mrf.metadataService.DeleteFileMetadataWithSourceNzb(ctx, normalizedOld, deleteSourceNzb); err != nil { - return false, fmt.Errorf("failed to delete old metadata: %w", err) + // Use atomic rename instead of read-write-delete + if err := mrf.metadataService.RenameFileMetadata(normalizedOld, normalizedNew); err != nil { + return false, fmt.Errorf("failed to rename metadata: %w", err) } slog.InfoContext(ctx, "MOVE operation successful", "source", normalizedOld, "destination", normalizedNew) - // Clean up any health records for the new location and optionally for the directory + // Update ID symlink if file has a NzbdavId + if fileMeta != nil && fileMeta.NzbdavId != "" { + if err := mrf.metadataService.UpdateIDSymlink(fileMeta.NzbdavId, normalizedNew); err != nil { + slog.WarnContext(ctx, "Failed to update ID symlink during MOVE", "id", fileMeta.NzbdavId, "error", err) + } + } + + // Update health records if mrf.healthRepository != nil { - // Update the health record path to follow the move if err := mrf.healthRepository.RenameHealthRecord(ctx, normalizedOld, normalizedNew); err != nil { slog.WarnContext(ctx, "Failed to update health record path during MOVE", "old", normalizedOld, "new", normalizedNew, "error", err) }