From 8b43ca90e3ddd13c1faa017fb9dca03cf8dc6a71 Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Thu, 30 Apr 2026 08:17:26 -0400 Subject: [PATCH 1/2] atomicdir: fsync files and directories for crash durability atomicdir.Sync writes files to a staging directory, atomically swaps it with the target directory via renameat2(RENAME_EXCHANGE), then deletes the old data. Without fsync, file data lives only in the kernel page cache. On ungraceful shutdown the journal replays the swap and deletion (metadata), but the file data was never flushed, leaving truncated or empty files. Introduce an fsutil package with WriteFileFsync (write + fsync file + fsync parent directory) and Fsync (fsync a path) primitives. Use WriteFileFsync for all file writes so each file is individually durable, and fsync both parent directories after the swap to persist which inode each directory name points to. --- .../staticpod/internal/atomicdir/sync.go | 48 +++++--- .../internal/atomicdir/sync_linux_test.go | 62 +++++++++- .../staticpod/internal/fsutil/fsutil.go | 33 +++++ .../staticpod/internal/fsutil/fsutil_test.go | 113 ++++++++++++++++++ 4 files changed, 236 insertions(+), 20 deletions(-) create mode 100644 pkg/operator/staticpod/internal/fsutil/fsutil.go create mode 100644 pkg/operator/staticpod/internal/fsutil/fsutil_test.go diff --git a/pkg/operator/staticpod/internal/atomicdir/sync.go b/pkg/operator/staticpod/internal/atomicdir/sync.go index 0c2cc3dfff..779eb21288 100644 --- a/pkg/operator/staticpod/internal/atomicdir/sync.go +++ b/pkg/operator/staticpod/internal/atomicdir/sync.go @@ -8,15 +8,14 @@ import ( "k8s.io/klog/v2" "github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir/types" + "github.com/openshift/library-go/pkg/operator/staticpod/internal/fsutil" ) -// Sync can be used to atomically synchronize target directory with the given file content map. -// This is done by populating a staging directory, then atomically swapping it with the target directory. -// This effectively means that any extra files in the target directory are pruned. -// -// The staging directory needs to be explicitly specified. It is initially created using os.MkdirAll with targetDirPerm. -// It is then populated using files with filePerm. Once the atomic swap is performed, the staging directory -// (which is now the original target directory) is removed. +// Sync atomically and durably replaces the contents of targetDir with the given files. +// It writes files to a staging directory with fsync, then atomically swaps it with +// the target directory via renameat2(RENAME_EXCHANGE), fsyncing parent directories +// to ensure the swap is persisted. Extra files in targetDir are pruned. +// The old target directory (now at stagingDir) is removed after the swap. func Sync(targetDir string, targetDirPerm os.FileMode, stagingDir string, files map[string]types.File) error { return sync(&realFS, targetDir, targetDirPerm, stagingDir, files) } @@ -24,27 +23,29 @@ func Sync(targetDir string, targetDirPerm os.FileMode, stagingDir string, files type fileSystem struct { MkdirAll func(path string, perm os.FileMode) error RemoveAll func(path string) error - WriteFile func(name string, data []byte, perm os.FileMode) error + WriteFileFsync func(name string, data []byte, perm os.FileMode) error SwapDirectories func(dirA, dirB string) error + Fsync func(name string) error } var realFS = fileSystem{ MkdirAll: os.MkdirAll, RemoveAll: os.RemoveAll, - WriteFile: os.WriteFile, + WriteFileFsync: fsutil.WriteFileFsync, SwapDirectories: swap, + Fsync: fsutil.Fsync, } -// sync prepares a tmp directory and writes all files into that directory. -// Then it atomically swap the tmp directory for the target one. -// This is currently implemented as really atomically swapping directories. +// sync writes files into the staging directory, then durably swaps it with the target. +// Each file write is individually fsynced (including its parent directory entry) via +// fs.WriteFileFsync. After the swap, parent directories are fsynced to persist the exchange. // -// The same goal of atomic swap could be implemented using symlinks much like AtomicWriter does in +// Note: the upstream Kubernetes AtomicWriter uses symlinks for atomic updates but does +// not fsync, leaving file data in the page cache with no crash durability guarantee: // https://github.com/kubernetes/kubernetes/blob/v1.34.0/pkg/volume/util/atomic_writer.go#L58 -// The reason we don't do that is that we already have a directory populated and watched that needs to we swapped. -// In other words, it's for compatibility reasons. And if we were to migrate to the symlink approach, -// we would anyway need to atomically turn the current data directory into a symlink. -// This would all just increase complexity and require atomic swap on the OS level anyway. +// We use renameat2(RENAME_EXCHANGE) instead of symlinks because we need to swap an +// existing directory that is already being watched. Migrating to symlinks would still +// require an atomic swap at the OS level, adding complexity for no benefit. func sync(fs *fileSystem, targetDir string, targetDirPerm os.FileMode, stagingDir string, files map[string]types.File) (retErr error) { klog.Infof("Ensuring target directory %q exists ...", targetDir) if err := fs.MkdirAll(targetDir, targetDirPerm); err != nil { @@ -75,7 +76,7 @@ func sync(fs *fileSystem, targetDir string, targetDirPerm os.FileMode, stagingDi fullFilename := filepath.Join(stagingDir, filename) klog.Infof("Writing file %q ...", fullFilename) - if err := fs.WriteFile(fullFilename, file.Content, file.Perm); err != nil { + if err := fs.WriteFileFsync(fullFilename, file.Content, file.Perm); err != nil { return fmt.Errorf("failed writing %q: %w", fullFilename, err) } } @@ -84,5 +85,16 @@ func sync(fs *fileSystem, targetDir string, targetDirPerm os.FileMode, stagingDi if err := fs.SwapDirectories(targetDir, stagingDir); err != nil { return fmt.Errorf("failed swapping target directory %q with staging directory %q: %w", targetDir, stagingDir, err) } + + // fsync parent directories to ensure the swap is durable on disk. + // renameat2(RENAME_EXCHANGE) modifies parent directory entries, so the + // parents must be fsynced to persist which inode each name points to. + if err := fs.Fsync(filepath.Dir(targetDir)); err != nil { + return fmt.Errorf("failed syncing parent directory of %q: %w", targetDir, err) + } + if err := fs.Fsync(filepath.Dir(stagingDir)); err != nil { + return fmt.Errorf("failed syncing parent directory of %q: %w", stagingDir, err) + } + return } diff --git a/pkg/operator/staticpod/internal/atomicdir/sync_linux_test.go b/pkg/operator/staticpod/internal/atomicdir/sync_linux_test.go index 30e8c89ecd..24f1a9542a 100644 --- a/pkg/operator/staticpod/internal/atomicdir/sync_linux_test.go +++ b/pkg/operator/staticpod/internal/atomicdir/sync_linux_test.go @@ -265,12 +265,12 @@ func TestSync(t *testing.T) { }), errorTestCase("directory unchanged on failed to write the first file", func() *fileSystem { fs := newRealFS() - fs.WriteFile = failToWriteNth(fs.WriteFile, 0) + fs.WriteFileFsync = failToWriteNth(fs.WriteFileFsync, 0) return fs }), errorTestCase("directory unchanged on failed to write the second file", func() *fileSystem { fs := newRealFS() - fs.WriteFile = failToWriteNth(fs.WriteFile, 1) + fs.WriteFileFsync = failToWriteNth(fs.WriteFileFsync, 1) return fs }), errorTestCase("directory unchanged on failed to swap directories", func() *fileSystem { @@ -280,6 +280,64 @@ func TestSync(t *testing.T) { } return fs }), + { + name: "directory synchronized on failed to sync parent of target directory", + newFS: func() *fileSystem { + fs := newRealFS() + origFsync := realFS.Fsync + var fsyncCount int + fs.Fsync = func(name string) error { + fsyncCount++ + if fsyncCount == 1 { + return errors.New("nuked") + } + return origFsync(name) + } + return fs + }, + existingFiles: map[string]types.File{ + "tls.crt": {Content: []byte("TLS cert"), Perm: 0600}, + "tls.key": {Content: []byte("TLS key"), Perm: 0600}, + }, + filesToSync: map[string]types.File{ + "api.crt": {Content: []byte("rotated TLS cert"), Perm: 0600}, + "api.key": {Content: []byte("rotated TLS key"), Perm: 0600}, + }, + expectedFiles: map[string]types.File{ + "api.crt": {Content: []byte("rotated TLS cert"), Perm: 0600}, + "api.key": {Content: []byte("rotated TLS key"), Perm: 0600}, + }, + expectSyncError: true, + }, + { + name: "directory synchronized on failed to sync parent of staging directory", + newFS: func() *fileSystem { + fs := newRealFS() + origFsync := realFS.Fsync + var fsyncCount int + fs.Fsync = func(name string) error { + fsyncCount++ + if fsyncCount == 2 { + return errors.New("nuked") + } + return origFsync(name) + } + return fs + }, + existingFiles: map[string]types.File{ + "tls.crt": {Content: []byte("TLS cert"), Perm: 0600}, + "tls.key": {Content: []byte("TLS key"), Perm: 0600}, + }, + filesToSync: map[string]types.File{ + "api.crt": {Content: []byte("rotated TLS cert"), Perm: 0600}, + "api.key": {Content: []byte("rotated TLS key"), Perm: 0600}, + }, + expectedFiles: map[string]types.File{ + "api.crt": {Content: []byte("rotated TLS cert"), Perm: 0600}, + "api.key": {Content: []byte("rotated TLS key"), Perm: 0600}, + }, + expectSyncError: true, + }, { name: "directory synchronized then failing to remove temporary directory", newFS: func() *fileSystem { diff --git a/pkg/operator/staticpod/internal/fsutil/fsutil.go b/pkg/operator/staticpod/internal/fsutil/fsutil.go new file mode 100644 index 0000000000..5b205f52db --- /dev/null +++ b/pkg/operator/staticpod/internal/fsutil/fsutil.go @@ -0,0 +1,33 @@ +package fsutil + +import ( + "os" + "path/filepath" +) + +// Fsync fsyncs a file or directory to ensure it is durable on disk. +func Fsync(name string) error { + f, err := os.Open(name) + if err != nil { + return err + } + syncErr := f.Sync() + // close(2) can surface errors not caught by fsync(2), e.g. on NFS. + closeErr := f.Close() + if syncErr != nil { + return syncErr + } + return closeErr +} + +// WriteFileFsync writes data to a file and fsyncs both the file and its +// parent directory to ensure the write is durable on disk. +func WriteFileFsync(name string, data []byte, perm os.FileMode) error { + if err := os.WriteFile(name, data, perm); err != nil { + return err + } + if err := Fsync(name); err != nil { + return err + } + return Fsync(filepath.Dir(name)) +} diff --git a/pkg/operator/staticpod/internal/fsutil/fsutil_test.go b/pkg/operator/staticpod/internal/fsutil/fsutil_test.go new file mode 100644 index 0000000000..f21d3b91d1 --- /dev/null +++ b/pkg/operator/staticpod/internal/fsutil/fsutil_test.go @@ -0,0 +1,113 @@ +package fsutil + +import ( + "os" + "path/filepath" + "testing" +) + +func TestWriteFileFsync(t *testing.T) { + testCases := []struct { + name string + setup func(t *testing.T) string + data []byte + perm os.FileMode + expectError bool + }{ + { + name: "writes file with correct content and permissions", + setup: func(t *testing.T) string { + return filepath.Join(t.TempDir(), "test.txt") + }, + data: []byte("hello world"), + perm: 0600, + }, + { + name: "creates file in nonexistent directory fails", + setup: func(t *testing.T) string { + return filepath.Join(t.TempDir(), "nodir", "test.txt") + }, + data: []byte("hello"), + perm: 0600, + expectError: true, + }, + { + name: "overwrites existing file", + setup: func(t *testing.T) string { + p := filepath.Join(t.TempDir(), "test.txt") + if err := os.WriteFile(p, []byte("old content"), 0600); err != nil { + t.Fatal(err) + } + return p + }, + data: []byte("new content"), + perm: 0600, + }, + { + name: "writes empty file", + setup: func(t *testing.T) string { + return filepath.Join(t.TempDir(), "empty.txt") + }, + data: []byte{}, + perm: 0644, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + path := tc.setup(t) + err := WriteFileFsync(path, tc.data, tc.perm) + + if tc.expectError { + if err == nil { + t.Fatal("expected error, got nil") + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + got, err := os.ReadFile(path) + if err != nil { + t.Fatalf("failed to read back file: %v", err) + } + if string(got) != string(tc.data) { + t.Errorf("content mismatch: got %q, want %q", got, tc.data) + } + + info, err := os.Stat(path) + if err != nil { + t.Fatalf("failed to stat file: %v", err) + } + if info.Mode().Perm() != tc.perm { + t.Errorf("permission mismatch: got %o, want %o", info.Mode().Perm(), tc.perm) + } + }) + } +} + +func TestFsync(t *testing.T) { + t.Run("syncs existing file", func(t *testing.T) { + path := filepath.Join(t.TempDir(), "test.txt") + if err := os.WriteFile(path, []byte("data"), 0600); err != nil { + t.Fatal(err) + } + if err := Fsync(path); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("syncs existing directory", func(t *testing.T) { + dir := t.TempDir() + if err := Fsync(dir); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("fails on nonexistent path", func(t *testing.T) { + if err := Fsync(filepath.Join(t.TempDir(), "nonexistent")); err == nil { + t.Fatal("expected error, got nil") + } + }) +} From 812fc1ac5c6ec005a35a1745239637d3a807e1b1 Mon Sep 17 00:00:00 2001 From: Luis Sanchez Date: Thu, 30 Apr 2026 08:17:34 -0400 Subject: [PATCH 2/2] installerpod: fsync pod manifest writes for crash durability writePod uses bare os.WriteFile plus a delete-then-write pattern for kubelet manifests. On ungraceful shutdown, the delete is journaled but the new file data may not have reached disk, leaving the manifest missing. Replace os.WriteFile with fsutil.WriteFileFsync, which writes, fsyncs the file, and fsyncs the parent directory in a single call, ensuring both the resource copy and the kubelet manifest are durable before the function returns. --- pkg/operator/staticpod/installerpod/cmd.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/operator/staticpod/installerpod/cmd.go b/pkg/operator/staticpod/installerpod/cmd.go index 35424a56f5..68666ff196 100644 --- a/pkg/operator/staticpod/installerpod/cmd.go +++ b/pkg/operator/staticpod/installerpod/cmd.go @@ -35,6 +35,7 @@ import ( "github.com/openshift/library-go/pkg/operator/staticpod/internal" "github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir" "github.com/openshift/library-go/pkg/operator/staticpod/internal/flock" + "github.com/openshift/library-go/pkg/operator/staticpod/internal/fsutil" ) const stagingDirUID = "installer" @@ -622,8 +623,8 @@ func (o *InstallOptions) writePod(rawPodBytes []byte, manifestFileName, resource // Write secrets, config maps and pod to disk // This does not need timeout, instead we should fail hard when we are not able to write. klog.Infof("Writing pod manifest %q ...", path.Join(resourceDir, manifestFileName)) - if err := os.WriteFile(path.Join(resourceDir, manifestFileName), []byte(finalPodBytes), 0600); err != nil { - return err + if err := fsutil.WriteFileFsync(path.Join(resourceDir, manifestFileName), []byte(finalPodBytes), 0600); err != nil { + return fmt.Errorf("failed writing %q: %w", path.Join(resourceDir, manifestFileName), err) } // remove the existing file to ensure kubelet gets "create" event from inotify watchers @@ -633,8 +634,8 @@ func (o *InstallOptions) writePod(rawPodBytes []byte, manifestFileName, resource return err } klog.Infof("Writing static pod manifest %q ...\n%s", path.Join(o.PodManifestDir, manifestFileName), finalPodBytes) - if err := os.WriteFile(path.Join(o.PodManifestDir, manifestFileName), []byte(finalPodBytes), 0600); err != nil { - return err + if err := fsutil.WriteFileFsync(path.Join(o.PodManifestDir, manifestFileName), []byte(finalPodBytes), 0600); err != nil { + return fmt.Errorf("failed writing %q: %w", path.Join(o.PodManifestDir, manifestFileName), err) } return nil }