Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/operator/staticpod/installerpod/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/openshift/library-go/pkg/operator/staticpod/internal"
"github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir"
"github.com/openshift/library-go/pkg/operator/staticpod/internal/flock"
"github.com/openshift/library-go/pkg/operator/staticpod/internal/fsutil"
)

const stagingDirUID = "installer"
Expand Down Expand Up @@ -622,8 +623,8 @@ func (o *InstallOptions) writePod(rawPodBytes []byte, manifestFileName, resource
// Write secrets, config maps and pod to disk
// This does not need timeout, instead we should fail hard when we are not able to write.
klog.Infof("Writing pod manifest %q ...", path.Join(resourceDir, manifestFileName))
if err := os.WriteFile(path.Join(resourceDir, manifestFileName), []byte(finalPodBytes), 0600); err != nil {
return err
if err := fsutil.WriteFileFsync(path.Join(resourceDir, manifestFileName), []byte(finalPodBytes), 0600); err != nil {
return fmt.Errorf("failed writing %q: %w", path.Join(resourceDir, manifestFileName), err)
}

// remove the existing file to ensure kubelet gets "create" event from inotify watchers
Expand All @@ -633,8 +634,8 @@ func (o *InstallOptions) writePod(rawPodBytes []byte, manifestFileName, resource
return err
}
klog.Infof("Writing static pod manifest %q ...\n%s", path.Join(o.PodManifestDir, manifestFileName), finalPodBytes)
if err := os.WriteFile(path.Join(o.PodManifestDir, manifestFileName), []byte(finalPodBytes), 0600); err != nil {
return err
if err := fsutil.WriteFileFsync(path.Join(o.PodManifestDir, manifestFileName), []byte(finalPodBytes), 0600); err != nil {
return fmt.Errorf("failed writing %q: %w", path.Join(o.PodManifestDir, manifestFileName), err)
}
return nil
}
Expand Down
48 changes: 30 additions & 18 deletions pkg/operator/staticpod/internal/atomicdir/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,44 @@ import (
"k8s.io/klog/v2"

"github.com/openshift/library-go/pkg/operator/staticpod/internal/atomicdir/types"
"github.com/openshift/library-go/pkg/operator/staticpod/internal/fsutil"
)

// Sync can be used to atomically synchronize target directory with the given file content map.
// This is done by populating a staging directory, then atomically swapping it with the target directory.
// This effectively means that any extra files in the target directory are pruned.
//
// The staging directory needs to be explicitly specified. It is initially created using os.MkdirAll with targetDirPerm.
// It is then populated using files with filePerm. Once the atomic swap is performed, the staging directory
// (which is now the original target directory) is removed.
// Sync atomically and durably replaces the contents of targetDir with the given files.
// It writes files to a staging directory with fsync, then atomically swaps it with
// the target directory via renameat2(RENAME_EXCHANGE), fsyncing parent directories
// to ensure the swap is persisted. Extra files in targetDir are pruned.
// The old target directory (now at stagingDir) is removed after the swap.
func Sync(targetDir string, targetDirPerm os.FileMode, stagingDir string, files map[string]types.File) error {
return sync(&realFS, targetDir, targetDirPerm, stagingDir, files)
}

type fileSystem struct {
MkdirAll func(path string, perm os.FileMode) error
RemoveAll func(path string) error
WriteFile func(name string, data []byte, perm os.FileMode) error
WriteFileFsync func(name string, data []byte, perm os.FileMode) error
SwapDirectories func(dirA, dirB string) error
Fsync func(name string) error
}

var realFS = fileSystem{
MkdirAll: os.MkdirAll,
RemoveAll: os.RemoveAll,
WriteFile: os.WriteFile,
WriteFileFsync: fsutil.WriteFileFsync,
SwapDirectories: swap,
Fsync: fsutil.Fsync,
}

// sync prepares a tmp directory and writes all files into that directory.
// Then it atomically swap the tmp directory for the target one.
// This is currently implemented as really atomically swapping directories.
// sync writes files into the staging directory, then durably swaps it with the target.
// Each file write is individually fsynced (including its parent directory entry) via
// fs.WriteFileFsync. After the swap, parent directories are fsynced to persist the exchange.
//
// The same goal of atomic swap could be implemented using symlinks much like AtomicWriter does in
// Note: the upstream Kubernetes AtomicWriter uses symlinks for atomic updates but does
// not fsync, leaving file data in the page cache with no crash durability guarantee:
// https://github.com/kubernetes/kubernetes/blob/v1.34.0/pkg/volume/util/atomic_writer.go#L58
// The reason we don't do that is that we already have a directory populated and watched that needs to we swapped.
// In other words, it's for compatibility reasons. And if we were to migrate to the symlink approach,
// we would anyway need to atomically turn the current data directory into a symlink.
// This would all just increase complexity and require atomic swap on the OS level anyway.
// We use renameat2(RENAME_EXCHANGE) instead of symlinks because we need to swap an
// existing directory that is already being watched. Migrating to symlinks would still
// require an atomic swap at the OS level, adding complexity for no benefit.
func sync(fs *fileSystem, targetDir string, targetDirPerm os.FileMode, stagingDir string, files map[string]types.File) (retErr error) {
klog.Infof("Ensuring target directory %q exists ...", targetDir)
if err := fs.MkdirAll(targetDir, targetDirPerm); err != nil {
Expand Down Expand Up @@ -75,7 +76,7 @@ func sync(fs *fileSystem, targetDir string, targetDirPerm os.FileMode, stagingDi
fullFilename := filepath.Join(stagingDir, filename)
klog.Infof("Writing file %q ...", fullFilename)

if err := fs.WriteFile(fullFilename, file.Content, file.Perm); err != nil {
if err := fs.WriteFileFsync(fullFilename, file.Content, file.Perm); err != nil {
return fmt.Errorf("failed writing %q: %w", fullFilename, err)
}
}
Expand All @@ -84,5 +85,16 @@ func sync(fs *fileSystem, targetDir string, targetDirPerm os.FileMode, stagingDi
if err := fs.SwapDirectories(targetDir, stagingDir); err != nil {
return fmt.Errorf("failed swapping target directory %q with staging directory %q: %w", targetDir, stagingDir, err)
}

// fsync parent directories to ensure the swap is durable on disk.
// renameat2(RENAME_EXCHANGE) modifies parent directory entries, so the
// parents must be fsynced to persist which inode each name points to.
if err := fs.Fsync(filepath.Dir(targetDir)); err != nil {
return fmt.Errorf("failed syncing parent directory of %q: %w", targetDir, err)
}
if err := fs.Fsync(filepath.Dir(stagingDir)); err != nil {
return fmt.Errorf("failed syncing parent directory of %q: %w", stagingDir, err)
}

return
}
62 changes: 60 additions & 2 deletions pkg/operator/staticpod/internal/atomicdir/sync_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,12 @@ func TestSync(t *testing.T) {
}),
errorTestCase("directory unchanged on failed to write the first file", func() *fileSystem {
fs := newRealFS()
fs.WriteFile = failToWriteNth(fs.WriteFile, 0)
fs.WriteFileFsync = failToWriteNth(fs.WriteFileFsync, 0)
return fs
}),
errorTestCase("directory unchanged on failed to write the second file", func() *fileSystem {
fs := newRealFS()
fs.WriteFile = failToWriteNth(fs.WriteFile, 1)
fs.WriteFileFsync = failToWriteNth(fs.WriteFileFsync, 1)
return fs
}),
errorTestCase("directory unchanged on failed to swap directories", func() *fileSystem {
Expand All @@ -280,6 +280,64 @@ func TestSync(t *testing.T) {
}
return fs
}),
{
name: "directory synchronized on failed to sync parent of target directory",
newFS: func() *fileSystem {
fs := newRealFS()
origFsync := realFS.Fsync
var fsyncCount int
fs.Fsync = func(name string) error {
fsyncCount++
if fsyncCount == 1 {
return errors.New("nuked")
}
return origFsync(name)
}
return fs
},
existingFiles: map[string]types.File{
"tls.crt": {Content: []byte("TLS cert"), Perm: 0600},
"tls.key": {Content: []byte("TLS key"), Perm: 0600},
},
filesToSync: map[string]types.File{
"api.crt": {Content: []byte("rotated TLS cert"), Perm: 0600},
"api.key": {Content: []byte("rotated TLS key"), Perm: 0600},
},
expectedFiles: map[string]types.File{
"api.crt": {Content: []byte("rotated TLS cert"), Perm: 0600},
"api.key": {Content: []byte("rotated TLS key"), Perm: 0600},
},
expectSyncError: true,
},
{
name: "directory synchronized on failed to sync parent of staging directory",
newFS: func() *fileSystem {
fs := newRealFS()
origFsync := realFS.Fsync
var fsyncCount int
fs.Fsync = func(name string) error {
fsyncCount++
if fsyncCount == 2 {
return errors.New("nuked")
}
return origFsync(name)
}
return fs
},
existingFiles: map[string]types.File{
"tls.crt": {Content: []byte("TLS cert"), Perm: 0600},
"tls.key": {Content: []byte("TLS key"), Perm: 0600},
},
filesToSync: map[string]types.File{
"api.crt": {Content: []byte("rotated TLS cert"), Perm: 0600},
"api.key": {Content: []byte("rotated TLS key"), Perm: 0600},
},
expectedFiles: map[string]types.File{
"api.crt": {Content: []byte("rotated TLS cert"), Perm: 0600},
"api.key": {Content: []byte("rotated TLS key"), Perm: 0600},
},
expectSyncError: true,
},
{
name: "directory synchronized then failing to remove temporary directory",
newFS: func() *fileSystem {
Expand Down
33 changes: 33 additions & 0 deletions pkg/operator/staticpod/internal/fsutil/fsutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package fsutil

import (
"os"
"path/filepath"
)

// Fsync fsyncs a file or directory to ensure it is durable on disk.
func Fsync(name string) error {
f, err := os.Open(name)
if err != nil {
return err
}
syncErr := f.Sync()
// close(2) can surface errors not caught by fsync(2), e.g. on NFS.
closeErr := f.Close()
if syncErr != nil {
return syncErr
}
return closeErr
}

// WriteFileFsync writes data to a file and fsyncs both the file and its
// parent directory to ensure the write is durable on disk.
func WriteFileFsync(name string, data []byte, perm os.FileMode) error {
if err := os.WriteFile(name, data, perm); err != nil {
return err
}
if err := Fsync(name); err != nil {
return err
}
return Fsync(filepath.Dir(name))
}
113 changes: 113 additions & 0 deletions pkg/operator/staticpod/internal/fsutil/fsutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package fsutil

import (
"os"
"path/filepath"
"testing"
)

func TestWriteFileFsync(t *testing.T) {
testCases := []struct {
name string
setup func(t *testing.T) string
data []byte
perm os.FileMode
expectError bool
}{
{
name: "writes file with correct content and permissions",
setup: func(t *testing.T) string {
return filepath.Join(t.TempDir(), "test.txt")
},
data: []byte("hello world"),
perm: 0600,
},
{
name: "creates file in nonexistent directory fails",
setup: func(t *testing.T) string {
return filepath.Join(t.TempDir(), "nodir", "test.txt")
},
data: []byte("hello"),
perm: 0600,
expectError: true,
},
{
name: "overwrites existing file",
setup: func(t *testing.T) string {
p := filepath.Join(t.TempDir(), "test.txt")
if err := os.WriteFile(p, []byte("old content"), 0600); err != nil {
t.Fatal(err)
}
return p
},
data: []byte("new content"),
perm: 0600,
},
{
name: "writes empty file",
setup: func(t *testing.T) string {
return filepath.Join(t.TempDir(), "empty.txt")
},
data: []byte{},
perm: 0644,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
path := tc.setup(t)
err := WriteFileFsync(path, tc.data, tc.perm)

if tc.expectError {
if err == nil {
t.Fatal("expected error, got nil")
}
return
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

got, err := os.ReadFile(path)
if err != nil {
t.Fatalf("failed to read back file: %v", err)
}
if string(got) != string(tc.data) {
t.Errorf("content mismatch: got %q, want %q", got, tc.data)
}

info, err := os.Stat(path)
if err != nil {
t.Fatalf("failed to stat file: %v", err)
}
if info.Mode().Perm() != tc.perm {
t.Errorf("permission mismatch: got %o, want %o", info.Mode().Perm(), tc.perm)
}
})
}
}

func TestFsync(t *testing.T) {
t.Run("syncs existing file", func(t *testing.T) {
path := filepath.Join(t.TempDir(), "test.txt")
if err := os.WriteFile(path, []byte("data"), 0600); err != nil {
t.Fatal(err)
}
if err := Fsync(path); err != nil {
t.Fatalf("unexpected error: %v", err)
}
})

t.Run("syncs existing directory", func(t *testing.T) {
dir := t.TempDir()
if err := Fsync(dir); err != nil {
t.Fatalf("unexpected error: %v", err)
}
})

t.Run("fails on nonexistent path", func(t *testing.T) {
if err := Fsync(filepath.Join(t.TempDir(), "nonexistent")); err == nil {
t.Fatal("expected error, got nil")
}
})
}