Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions storage/cmd/containers-storage/splitfdstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
//go:build linux

package main

import (
"bytes"
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"

"go.podman.io/storage"
graphdriver "go.podman.io/storage/drivers"
"go.podman.io/storage/pkg/archive"
"go.podman.io/storage/pkg/mflag"
"go.podman.io/storage/pkg/splitfdstream"
)

const defaultJSONRPCSocket = "json-rpc.sock"

var (
splitfdstreamSocket = ""
applyFdstreamSocket = ""
applyFdstreamParent = ""
applyFdstreamMountLabel = ""
)

// splitFDStreamDiffer implements graphdriver.Differ for splitfdstream data
type splitFDStreamDiffer struct {
streamData []byte
fds []*os.File
store storage.Store
}

func (d *splitFDStreamDiffer) ApplyDiff(dest string, options *archive.TarOptions, differOpts *graphdriver.DifferOptions) (graphdriver.DriverWithDifferOutput, error) {
driver, err := d.store.GraphDriver()
if err != nil {
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("failed to get graph driver: %w", err)
}

splitDriver, ok := driver.(splitfdstream.SplitFDStreamDriver)
if !ok {
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("driver %s does not support splitfdstream", driver.String())
}

opts := &splitfdstream.ApplySplitFDStreamOpts{
Stream: bytes.NewReader(d.streamData),
FileDescriptors: d.fds,
StagingDir: dest,
}

size, err := splitDriver.ApplySplitFDStream(opts)
if err != nil {
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("failed to apply splitfdstream to staging dir %s: %w", dest, err)
}

return graphdriver.DriverWithDifferOutput{
Target: dest,
Size: size,
}, nil
}

func (d *splitFDStreamDiffer) Close() error {
return nil
}

func splitfdstreamServer(flags *mflag.FlagSet, action string, m storage.Store, args []string) (int, error) {
driver, err := m.GraphDriver()
if err != nil {
return 1, fmt.Errorf("failed to get graph driver: %w", err)
}

splitDriver, ok := driver.(splitfdstream.SplitFDStreamDriver)
if !ok {
return 1, fmt.Errorf("driver %s does not support splitfdstream", driver.String())
}
server := splitfdstream.NewJSONRPCServer(splitDriver, m)

socketPath := splitfdstreamSocket
if socketPath == "" {
socketPath = filepath.Join(m.RunRoot(), defaultJSONRPCSocket)
}

if err := server.Start(socketPath); err != nil {
return 1, fmt.Errorf("failed to start server: %w", err)
}
defer func() { _ = server.Stop() }()

fmt.Printf("%s\n", socketPath)

// Wait for interrupt signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh

return 0, nil
}

func applySplitfdstream(flags *mflag.FlagSet, action string, m storage.Store, args []string) (int, error) {
layerID := args[0]

socketPath := applyFdstreamSocket
if socketPath == "" {
socketPath = filepath.Join(m.RunRoot(), defaultJSONRPCSocket)
}

defer func() {
if _, err := m.Shutdown(false); err != nil {
fmt.Fprintf(os.Stderr, "warning: failed to shutdown storage: %v\n", err)
}
}()

client, err := splitfdstream.NewJSONRPCClient(socketPath)
if err != nil {
return 1, fmt.Errorf("failed to connect to server: %w", err)
}
defer client.Close()

// Get splitfdstream data from remote server
streamData, fds, err := client.GetSplitFDStream(layerID, "")
if err != nil {
return 1, fmt.Errorf("failed to get splitfdstream from server: %w", err)
}

// Close received FDs when done
defer func() {
for _, fd := range fds {
fd.Close()
}
}()

// Create a custom differ for splitfdstream data
differ := &splitFDStreamDiffer{
streamData: streamData,
fds: fds,
store: m,
}
defer differ.Close()

// Prepare the staged layer
diffOptions := &graphdriver.ApplyDiffWithDifferOpts{}
diffOutput, err := m.PrepareStagedLayer(diffOptions, differ)
if err != nil {
return 1, fmt.Errorf("failed to prepare staged layer: %w", err)
}

// Apply the staged layer to create the final layer
applyArgs := storage.ApplyStagedLayerOptions{
ID: layerID,
ParentLayer: applyFdstreamParent,
MountLabel: applyFdstreamMountLabel,
Writeable: false,
LayerOptions: &storage.LayerOptions{},
DiffOutput: diffOutput,
DiffOptions: diffOptions,
}

layer, err := m.ApplyStagedLayer(applyArgs)
if err != nil {
// Clean up the staged layer on failure
if cleanupErr := m.CleanupStagedLayer(diffOutput); cleanupErr != nil {
fmt.Fprintf(os.Stderr, "warning: failed to cleanup staged layer: %v\n", cleanupErr)
}
return 1, fmt.Errorf("failed to apply staged layer: %w", err)
}

// Output the result
if jsonOutput {
return outputJSON(map[string]interface{}{"id": layer.ID, "size": diffOutput.Size})
}
fmt.Printf("%s\n", layer.ID)
return 0, nil
}

func init() {
commands = append(commands, command{
names: []string{"json-rpc-server"},
optionsHelp: "[options]",
usage: "Start a JSON-RPC server",
minArgs: 0,
maxArgs: 0,
action: splitfdstreamServer,
addFlags: func(flags *mflag.FlagSet, cmd *command) {
flags.StringVar(&splitfdstreamSocket, []string{"-socket"}, "",
"Path to UNIX socket")
},
})
commands = append(commands, command{
names: []string{"apply-splitfdstream"},
optionsHelp: "[options] layerID",
usage: "Fetch a layer from remote server and apply it locally",
minArgs: 1,
maxArgs: 1,
action: applySplitfdstream,
addFlags: func(flags *mflag.FlagSet, cmd *command) {
flags.StringVar(&applyFdstreamSocket, []string{"-socket"}, "",
"Path to remote UNIX socket")
flags.StringVar(&applyFdstreamParent, []string{"-parent"}, "",
"Parent layer ID for the new layer")
flags.StringVar(&applyFdstreamMountLabel, []string{"-mount-label"}, "",
"SELinux mount label for the layer")
flags.BoolVar(&jsonOutput, []string{"-json", "j"}, jsonOutput, "Prefer JSON output")
},
})
}
18 changes: 16 additions & 2 deletions storage/drivers/overlay/composefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
package overlay

import (
"archive/tar"
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -43,14 +45,25 @@ func getComposefsBlob(dataDir string) string {
}

func generateComposeFsBlob(verityDigests map[string]string, toc any, composefsDir string) error {
if err := os.MkdirAll(composefsDir, 0o700); err != nil {
dumpReader, err := dump.GenerateDump(toc, verityDigests)
if err != nil {
return err
}
return writeComposeFsBlob(dumpReader, composefsDir)
}

dumpReader, err := dump.GenerateDump(toc, verityDigests)
func generateComposeFsBlobFromHeaders(headers []*tar.Header, contentDigests, verityDigests map[string]string, composefsDir string) error {
dumpReader, err := dump.GenerateDumpFromTarHeaders(headers, contentDigests, verityDigests)
if err != nil {
return err
}
return writeComposeFsBlob(dumpReader, composefsDir)
}

func writeComposeFsBlob(dumpReader io.Reader, composefsDir string) error {
if err := os.MkdirAll(composefsDir, 0o700); err != nil {
return err
}

destFile := getComposefsBlob(composefsDir)
writerJSON, err := getComposeFsHelper()
Expand All @@ -68,6 +81,7 @@ func generateComposeFsBlob(verityDigests map[string]string, toc any, composefsDi
outFile.Close()
return fmt.Errorf("failed to reopen %s as read-only: %w", destFile, err)
}
defer roFile.Close()

err = func() error {
// a scope to close outFile before setting fsverity on the read-only fd.
Expand Down
18 changes: 14 additions & 4 deletions storage/drivers/overlay/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -2140,17 +2140,27 @@ func (g *overlayFileGetter) Get(path string) (io.ReadCloser, error) {
buf := make([]byte, unix.PathMax)
for _, d := range g.diffDirs {
if f, found := g.composefsMounts[d]; found {
// there is no *at equivalent for getxattr, but it can be emulated by opening the file under /proc/self/fd/$FD/$PATH
len, err := unix.Getxattr(fmt.Sprintf("/proc/self/fd/%d/%s", int(f.Fd()), path), "trusted.overlay.redirect", buf)
cfd, err := unix.Openat2(int(f.Fd()), path, &unix.OpenHow{
Flags: unix.O_RDONLY | unix.O_CLOEXEC | unix.O_PATH,
Resolve: unix.RESOLVE_NO_SYMLINKS | unix.RESOLVE_BENEATH,
})
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AI: When Openat2 returns any error, the code does continue to try the next diffDir. This silently ignores errors like EACCES, ENOMEM, etc. Only ENOENT (and possibly ELOOP for RESOLVE_NO_SYMLINKS) should trigger continue; other errors should be returned.

Also, RESOLVE_NO_SYMLINKS is a behavior change from the old procfs approach which followed symlinks. If any legitimate composefs path contains a symlink component, this will silently skip the entry. Worth documenting that this is intentional.

if errors.Is(err, unix.ENOENT) || errors.Is(err, unix.ELOOP) {
continue
}
return nil, &fs.PathError{Op: "openat2", Path: path, Err: err}
}
n, err := unix.Fgetxattr(cfd, "trusted.overlay.redirect", buf)
unix.Close(cfd)
if err != nil {
if errors.Is(err, unix.ENODATA) {
continue
}
return nil, &fs.PathError{Op: "getxattr", Path: path, Err: err}
return nil, &fs.PathError{Op: "fgetxattr", Path: path, Err: err}
}

// the xattr value is the path to the file in the composefs layer diff directory
return os.Open(filepath.Join(d, string(buf[:len])))
return os.Open(filepath.Join(d, string(buf[:n])))
}

f, err := os.Open(filepath.Join(d, path))
Expand Down
Loading
Loading