diff --git a/pkg/operator/staticpod/installerpod/cmd.go b/pkg/operator/staticpod/installerpod/cmd.go index 35424a56f5..68666ff196 100644 --- a/pkg/operator/staticpod/installerpod/cmd.go +++ b/pkg/operator/staticpod/installerpod/cmd.go @@ -35,6 +35,7 @@ import ( "github.com/openshift/library-go/pkg/operator/staticpod/internal" "github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir" "github.com/openshift/library-go/pkg/operator/staticpod/internal/flock" + "github.com/openshift/library-go/pkg/operator/staticpod/internal/fsutil" ) const stagingDirUID = "installer" @@ -622,8 +623,8 @@ func (o *InstallOptions) writePod(rawPodBytes []byte, manifestFileName, resource // Write secrets, config maps and pod to disk // This does not need timeout, instead we should fail hard when we are not able to write. klog.Infof("Writing pod manifest %q ...", path.Join(resourceDir, manifestFileName)) - if err := os.WriteFile(path.Join(resourceDir, manifestFileName), []byte(finalPodBytes), 0600); err != nil { - return err + if err := fsutil.WriteFileFsync(path.Join(resourceDir, manifestFileName), []byte(finalPodBytes), 0600); err != nil { + return fmt.Errorf("failed writing %q: %w", path.Join(resourceDir, manifestFileName), err) } // remove the existing file to ensure kubelet gets "create" event from inotify watchers @@ -633,8 +634,8 @@ func (o *InstallOptions) writePod(rawPodBytes []byte, manifestFileName, resource return err } klog.Infof("Writing static pod manifest %q ...\n%s", path.Join(o.PodManifestDir, manifestFileName), finalPodBytes) - if err := os.WriteFile(path.Join(o.PodManifestDir, manifestFileName), []byte(finalPodBytes), 0600); err != nil { - return err + if err := fsutil.WriteFileFsync(path.Join(o.PodManifestDir, manifestFileName), []byte(finalPodBytes), 0600); err != nil { + return fmt.Errorf("failed writing %q: %w", path.Join(o.PodManifestDir, manifestFileName), err) } return nil } diff --git a/pkg/operator/staticpod/internal/atomicdir/sync.go b/pkg/operator/staticpod/internal/atomicdir/sync.go index 0c2cc3dfff..779eb21288 100644 --- a/pkg/operator/staticpod/internal/atomicdir/sync.go +++ b/pkg/operator/staticpod/internal/atomicdir/sync.go @@ -8,15 +8,14 @@ import ( "k8s.io/klog/v2" "github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir/types" + "github.com/openshift/library-go/pkg/operator/staticpod/internal/fsutil" ) -// Sync can be used to atomically synchronize target directory with the given file content map. -// This is done by populating a staging directory, then atomically swapping it with the target directory. -// This effectively means that any extra files in the target directory are pruned. -// -// The staging directory needs to be explicitly specified. It is initially created using os.MkdirAll with targetDirPerm. -// It is then populated using files with filePerm. Once the atomic swap is performed, the staging directory -// (which is now the original target directory) is removed. +// Sync atomically and durably replaces the contents of targetDir with the given files. +// It writes files to a staging directory with fsync, then atomically swaps it with +// the target directory via renameat2(RENAME_EXCHANGE), fsyncing parent directories +// to ensure the swap is persisted. Extra files in targetDir are pruned. +// The old target directory (now at stagingDir) is removed after the swap. func Sync(targetDir string, targetDirPerm os.FileMode, stagingDir string, files map[string]types.File) error { return sync(&realFS, targetDir, targetDirPerm, stagingDir, files) } @@ -24,27 +23,29 @@ func Sync(targetDir string, targetDirPerm os.FileMode, stagingDir string, files type fileSystem struct { MkdirAll func(path string, perm os.FileMode) error RemoveAll func(path string) error - WriteFile func(name string, data []byte, perm os.FileMode) error + WriteFileFsync func(name string, data []byte, perm os.FileMode) error SwapDirectories func(dirA, dirB string) error + Fsync func(name string) error } var realFS = fileSystem{ MkdirAll: os.MkdirAll, RemoveAll: os.RemoveAll, - WriteFile: os.WriteFile, + WriteFileFsync: fsutil.WriteFileFsync, SwapDirectories: swap, + Fsync: fsutil.Fsync, } -// sync prepares a tmp directory and writes all files into that directory. -// Then it atomically swap the tmp directory for the target one. -// This is currently implemented as really atomically swapping directories. +// sync writes files into the staging directory, then durably swaps it with the target. +// Each file write is individually fsynced (including its parent directory entry) via +// fs.WriteFileFsync. After the swap, parent directories are fsynced to persist the exchange. // -// The same goal of atomic swap could be implemented using symlinks much like AtomicWriter does in +// Note: the upstream Kubernetes AtomicWriter uses symlinks for atomic updates but does +// not fsync, leaving file data in the page cache with no crash durability guarantee: // https://github.com/kubernetes/kubernetes/blob/v1.34.0/pkg/volume/util/atomic_writer.go#L58 -// The reason we don't do that is that we already have a directory populated and watched that needs to we swapped. -// In other words, it's for compatibility reasons. And if we were to migrate to the symlink approach, -// we would anyway need to atomically turn the current data directory into a symlink. -// This would all just increase complexity and require atomic swap on the OS level anyway. +// We use renameat2(RENAME_EXCHANGE) instead of symlinks because we need to swap an +// existing directory that is already being watched. Migrating to symlinks would still +// require an atomic swap at the OS level, adding complexity for no benefit. func sync(fs *fileSystem, targetDir string, targetDirPerm os.FileMode, stagingDir string, files map[string]types.File) (retErr error) { klog.Infof("Ensuring target directory %q exists ...", targetDir) if err := fs.MkdirAll(targetDir, targetDirPerm); err != nil { @@ -75,7 +76,7 @@ func sync(fs *fileSystem, targetDir string, targetDirPerm os.FileMode, stagingDi fullFilename := filepath.Join(stagingDir, filename) klog.Infof("Writing file %q ...", fullFilename) - if err := fs.WriteFile(fullFilename, file.Content, file.Perm); err != nil { + if err := fs.WriteFileFsync(fullFilename, file.Content, file.Perm); err != nil { return fmt.Errorf("failed writing %q: %w", fullFilename, err) } } @@ -84,5 +85,16 @@ func sync(fs *fileSystem, targetDir string, targetDirPerm os.FileMode, stagingDi if err := fs.SwapDirectories(targetDir, stagingDir); err != nil { return fmt.Errorf("failed swapping target directory %q with staging directory %q: %w", targetDir, stagingDir, err) } + + // fsync parent directories to ensure the swap is durable on disk. + // renameat2(RENAME_EXCHANGE) modifies parent directory entries, so the + // parents must be fsynced to persist which inode each name points to. + if err := fs.Fsync(filepath.Dir(targetDir)); err != nil { + return fmt.Errorf("failed syncing parent directory of %q: %w", targetDir, err) + } + if err := fs.Fsync(filepath.Dir(stagingDir)); err != nil { + return fmt.Errorf("failed syncing parent directory of %q: %w", stagingDir, err) + } + return } diff --git a/pkg/operator/staticpod/internal/atomicdir/sync_linux_test.go b/pkg/operator/staticpod/internal/atomicdir/sync_linux_test.go index 30e8c89ecd..24f1a9542a 100644 --- a/pkg/operator/staticpod/internal/atomicdir/sync_linux_test.go +++ b/pkg/operator/staticpod/internal/atomicdir/sync_linux_test.go @@ -265,12 +265,12 @@ func TestSync(t *testing.T) { }), errorTestCase("directory unchanged on failed to write the first file", func() *fileSystem { fs := newRealFS() - fs.WriteFile = failToWriteNth(fs.WriteFile, 0) + fs.WriteFileFsync = failToWriteNth(fs.WriteFileFsync, 0) return fs }), errorTestCase("directory unchanged on failed to write the second file", func() *fileSystem { fs := newRealFS() - fs.WriteFile = failToWriteNth(fs.WriteFile, 1) + fs.WriteFileFsync = failToWriteNth(fs.WriteFileFsync, 1) return fs }), errorTestCase("directory unchanged on failed to swap directories", func() *fileSystem { @@ -280,6 +280,64 @@ func TestSync(t *testing.T) { } return fs }), + { + name: "directory synchronized on failed to sync parent of target directory", + newFS: func() *fileSystem { + fs := newRealFS() + origFsync := realFS.Fsync + var fsyncCount int + fs.Fsync = func(name string) error { + fsyncCount++ + if fsyncCount == 1 { + return errors.New("nuked") + } + return origFsync(name) + } + return fs + }, + existingFiles: map[string]types.File{ + "tls.crt": {Content: []byte("TLS cert"), Perm: 0600}, + "tls.key": {Content: []byte("TLS key"), Perm: 0600}, + }, + filesToSync: map[string]types.File{ + "api.crt": {Content: []byte("rotated TLS cert"), Perm: 0600}, + "api.key": {Content: []byte("rotated TLS key"), Perm: 0600}, + }, + expectedFiles: map[string]types.File{ + "api.crt": {Content: []byte("rotated TLS cert"), Perm: 0600}, + "api.key": {Content: []byte("rotated TLS key"), Perm: 0600}, + }, + expectSyncError: true, + }, + { + name: "directory synchronized on failed to sync parent of staging directory", + newFS: func() *fileSystem { + fs := newRealFS() + origFsync := realFS.Fsync + var fsyncCount int + fs.Fsync = func(name string) error { + fsyncCount++ + if fsyncCount == 2 { + return errors.New("nuked") + } + return origFsync(name) + } + return fs + }, + existingFiles: map[string]types.File{ + "tls.crt": {Content: []byte("TLS cert"), Perm: 0600}, + "tls.key": {Content: []byte("TLS key"), Perm: 0600}, + }, + filesToSync: map[string]types.File{ + "api.crt": {Content: []byte("rotated TLS cert"), Perm: 0600}, + "api.key": {Content: []byte("rotated TLS key"), Perm: 0600}, + }, + expectedFiles: map[string]types.File{ + "api.crt": {Content: []byte("rotated TLS cert"), Perm: 0600}, + "api.key": {Content: []byte("rotated TLS key"), Perm: 0600}, + }, + expectSyncError: true, + }, { name: "directory synchronized then failing to remove temporary directory", newFS: func() *fileSystem { diff --git a/pkg/operator/staticpod/internal/fsutil/fsutil.go b/pkg/operator/staticpod/internal/fsutil/fsutil.go new file mode 100644 index 0000000000..5b205f52db --- /dev/null +++ b/pkg/operator/staticpod/internal/fsutil/fsutil.go @@ -0,0 +1,33 @@ +package fsutil + +import ( + "os" + "path/filepath" +) + +// Fsync fsyncs a file or directory to ensure it is durable on disk. +func Fsync(name string) error { + f, err := os.Open(name) + if err != nil { + return err + } + syncErr := f.Sync() + // close(2) can surface errors not caught by fsync(2), e.g. on NFS. + closeErr := f.Close() + if syncErr != nil { + return syncErr + } + return closeErr +} + +// WriteFileFsync writes data to a file and fsyncs both the file and its +// parent directory to ensure the write is durable on disk. +func WriteFileFsync(name string, data []byte, perm os.FileMode) error { + if err := os.WriteFile(name, data, perm); err != nil { + return err + } + if err := Fsync(name); err != nil { + return err + } + return Fsync(filepath.Dir(name)) +} diff --git a/pkg/operator/staticpod/internal/fsutil/fsutil_test.go b/pkg/operator/staticpod/internal/fsutil/fsutil_test.go new file mode 100644 index 0000000000..f21d3b91d1 --- /dev/null +++ b/pkg/operator/staticpod/internal/fsutil/fsutil_test.go @@ -0,0 +1,113 @@ +package fsutil + +import ( + "os" + "path/filepath" + "testing" +) + +func TestWriteFileFsync(t *testing.T) { + testCases := []struct { + name string + setup func(t *testing.T) string + data []byte + perm os.FileMode + expectError bool + }{ + { + name: "writes file with correct content and permissions", + setup: func(t *testing.T) string { + return filepath.Join(t.TempDir(), "test.txt") + }, + data: []byte("hello world"), + perm: 0600, + }, + { + name: "creates file in nonexistent directory fails", + setup: func(t *testing.T) string { + return filepath.Join(t.TempDir(), "nodir", "test.txt") + }, + data: []byte("hello"), + perm: 0600, + expectError: true, + }, + { + name: "overwrites existing file", + setup: func(t *testing.T) string { + p := filepath.Join(t.TempDir(), "test.txt") + if err := os.WriteFile(p, []byte("old content"), 0600); err != nil { + t.Fatal(err) + } + return p + }, + data: []byte("new content"), + perm: 0600, + }, + { + name: "writes empty file", + setup: func(t *testing.T) string { + return filepath.Join(t.TempDir(), "empty.txt") + }, + data: []byte{}, + perm: 0644, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + path := tc.setup(t) + err := WriteFileFsync(path, tc.data, tc.perm) + + if tc.expectError { + if err == nil { + t.Fatal("expected error, got nil") + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + got, err := os.ReadFile(path) + if err != nil { + t.Fatalf("failed to read back file: %v", err) + } + if string(got) != string(tc.data) { + t.Errorf("content mismatch: got %q, want %q", got, tc.data) + } + + info, err := os.Stat(path) + if err != nil { + t.Fatalf("failed to stat file: %v", err) + } + if info.Mode().Perm() != tc.perm { + t.Errorf("permission mismatch: got %o, want %o", info.Mode().Perm(), tc.perm) + } + }) + } +} + +func TestFsync(t *testing.T) { + t.Run("syncs existing file", func(t *testing.T) { + path := filepath.Join(t.TempDir(), "test.txt") + if err := os.WriteFile(path, []byte("data"), 0600); err != nil { + t.Fatal(err) + } + if err := Fsync(path); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("syncs existing directory", func(t *testing.T) { + dir := t.TempDir() + if err := Fsync(dir); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("fails on nonexistent path", func(t *testing.T) { + if err := Fsync(filepath.Join(t.TempDir(), "nonexistent")); err == nil { + t.Fatal("expected error, got nil") + } + }) +}