From 8085f0850bed6fa5c65a662c81eeeb517c3bc66a Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 9 Feb 2026 14:57:07 +0100 Subject: [PATCH 01/12] pkg/fileutils: use CopyFileRange if possible Signed-off-by: Giuseppe Scrivano --- storage/pkg/fileutils/reflink_linux.go | 51 +++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/storage/pkg/fileutils/reflink_linux.go b/storage/pkg/fileutils/reflink_linux.go index 9f5c6c90bb..d9625a0f0d 100644 --- a/storage/pkg/fileutils/reflink_linux.go +++ b/storage/pkg/fileutils/reflink_linux.go @@ -8,13 +8,62 @@ import ( ) // ReflinkOrCopy attempts to reflink the source to the destination fd. -// If reflinking fails or is unsupported, it falls back to io.Copy(). +// If reflinking fails, it tries copy_file_range for kernel-level copying. +// If that also fails, it falls back to io.Copy(). func ReflinkOrCopy(src, dst *os.File) error { err := unix.IoctlFileClone(int(dst.Fd()), int(src.Fd())) if err == nil { return nil } + srcInfo, statErr := src.Stat() + if statErr != nil { + _, err = io.Copy(dst, src) + return err + } + + if err := doCopyFileRange(src, dst, srcInfo.Size()); err == nil { + return nil + } + + // copy_file_range may have partially written data before failing, + // so reset both file offsets and truncate dst before falling back. + if _, err := src.Seek(0, io.SeekStart); err != nil { + return err + } + if _, err := dst.Seek(0, io.SeekStart); err != nil { + return err + } + if err := dst.Truncate(0); err != nil { + return err + } + _, err = io.Copy(dst, src) return err } + +// doCopyFileRange uses the copy_file_range syscall for kernel-level copying. +func doCopyFileRange(src, dst *os.File, size int64) error { + const maxChunk = 1 << 30 // 1GiB + remaining := size + srcFd := int(src.Fd()) + dstFd := int(dst.Fd()) + for remaining > 0 { + chunkSize := remaining + if chunkSize > maxChunk { + chunkSize = maxChunk + } + n, err := unix.CopyFileRange(srcFd, nil, dstFd, nil, int(chunkSize), 0) + if err != nil { + return err + } + if n == 0 { + if remaining > 0 { + return io.ErrUnexpectedEOF + } + break + } + remaining -= int64(n) + } + return nil +} From fa6bb303180c1fc584e41e25fc54a22eb9472ed0 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Thu, 19 Feb 2026 15:45:38 +0100 Subject: [PATCH 02/12] storage, chunked: new function GenerateDumpFromTarHeaders Signed-off-by: Giuseppe Scrivano --- storage/pkg/chunked/dump/dump.go | 62 ++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/storage/pkg/chunked/dump/dump.go b/storage/pkg/chunked/dump/dump.go index facd7a1694..acba2949bd 100644 --- a/storage/pkg/chunked/dump/dump.go +++ b/storage/pkg/chunked/dump/dump.go @@ -3,15 +3,18 @@ package dump import ( + "archive/tar" "bufio" "encoding/base64" "fmt" "io" "path/filepath" "reflect" + "strings" "time" "github.com/opencontainers/go-digest" + "go.podman.io/storage/pkg/archive" "go.podman.io/storage/pkg/chunked/internal/minimal" storagePath "go.podman.io/storage/pkg/chunked/internal/path" "golang.org/x/sys/unix" @@ -269,3 +272,62 @@ func GenerateDump(tocI any, verityDigests map[string]string) (io.Reader, error) }() return pipeR, nil } + +// GenerateDumpFromTarHeaders generates a composefs dump from stdlib tar headers, +// content digests, and verity digests. It converts the tar headers to +// minimal.FileMetadata entries internally and delegates to GenerateDump. +func GenerateDumpFromTarHeaders(headers []*tar.Header, contentDigests, verityDigests map[string]string) (io.Reader, error) { + var entries []minimal.FileMetadata + for _, hdr := range headers { + entry, err := fileMetadataFromTarHeader(hdr) + if err != nil { + return nil, err + } + if d, ok := contentDigests[hdr.Name]; ok { + entry.Digest = d + } + entries = append(entries, entry) + } + toc := &minimal.TOC{Version: 1, Entries: entries} + return GenerateDump(toc, verityDigests) +} + +// fileMetadataFromTarHeader creates a minimal.FileMetadata from a stdlib +// tar.Header. This mirrors minimal.NewFileMetadata (which uses the +// tar-split tar package) including AccessTime and ChangeTime. +func fileMetadataFromTarHeader(hdr *tar.Header) (minimal.FileMetadata, error) { + typ, err := minimal.GetType(hdr.Typeflag) + if err != nil { + return minimal.FileMetadata{}, err + } + xattrs := make(map[string]string) + for k, v := range hdr.PAXRecords { + xattrKey, ok := strings.CutPrefix(k, archive.PaxSchilyXattr) + if !ok { + continue + } + xattrs[xattrKey] = base64.StdEncoding.EncodeToString([]byte(v)) + } + return minimal.FileMetadata{ + Type: typ, + Name: hdr.Name, + Linkname: hdr.Linkname, + Mode: hdr.Mode, + Size: hdr.Size, + UID: hdr.Uid, + GID: hdr.Gid, + ModTime: timeIfNotZero(&hdr.ModTime), + AccessTime: timeIfNotZero(&hdr.AccessTime), + ChangeTime: timeIfNotZero(&hdr.ChangeTime), + Devmajor: hdr.Devmajor, + Devminor: hdr.Devminor, + Xattrs: xattrs, + }, nil +} + +func timeIfNotZero(t *time.Time) *time.Time { + if t == nil || t.IsZero() { + return nil + } + return t +} From 3e31b056b9a626276f880e3c1a0533bea3b7890e Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 11 Feb 2026 10:36:30 +0100 Subject: [PATCH 03/12] storage/vendor: add github.com/cgwalters/jsonrpc-fdpass-go Signed-off-by: Giuseppe Scrivano --- storage/go.mod | 2 + storage/go.sum | 4 + .../cgwalters/jsonrpc-fdpass-go/.gitignore | 23 ++ .../cgwalters/jsonrpc-fdpass-go/Justfile | 49 +++ .../cgwalters/jsonrpc-fdpass-go/LICENSE | 21 ++ .../cgwalters/jsonrpc-fdpass-go/README.md | 68 ++++ .../cgwalters/jsonrpc-fdpass-go/message.go | 216 +++++++++++++ .../cgwalters/jsonrpc-fdpass-go/transport.go | 302 ++++++++++++++++++ vendor/modules.txt | 3 + 9 files changed, 688 insertions(+) create mode 100644 vendor/github.com/cgwalters/jsonrpc-fdpass-go/.gitignore create mode 100644 vendor/github.com/cgwalters/jsonrpc-fdpass-go/Justfile create mode 100644 vendor/github.com/cgwalters/jsonrpc-fdpass-go/LICENSE create mode 100644 vendor/github.com/cgwalters/jsonrpc-fdpass-go/README.md create mode 100644 vendor/github.com/cgwalters/jsonrpc-fdpass-go/message.go create mode 100644 vendor/github.com/cgwalters/jsonrpc-fdpass-go/transport.go diff --git a/storage/go.mod b/storage/go.mod index c25f03adf9..ea04722a86 100644 --- a/storage/go.mod +++ b/storage/go.mod @@ -6,6 +6,7 @@ module go.podman.io/storage require ( github.com/BurntSushi/toml v1.6.0 + github.com/cgwalters/jsonrpc-fdpass-go v0.0.0-20260126203148-2bca851a3863 github.com/containerd/stargz-snapshotter/estargz v0.18.2 github.com/cyphar/filepath-securejoin v0.6.1 github.com/docker/go-units v0.5.0 @@ -19,6 +20,7 @@ require ( github.com/moby/sys/mountinfo v0.7.2 github.com/moby/sys/user v0.4.0 github.com/opencontainers/go-digest v1.0.0 + github.com/opencontainers/image-spec v1.1.1 github.com/opencontainers/runtime-spec v1.3.0 github.com/opencontainers/selinux v1.13.1 github.com/sirupsen/logrus v1.9.4 diff --git a/storage/go.sum b/storage/go.sum index d90d86395f..67e253f83e 100644 --- a/storage/go.sum +++ b/storage/go.sum @@ -2,6 +2,8 @@ cyphar.com/go-pathrs v0.2.1 h1:9nx1vOgwVvX1mNBWDu93+vaceedpbsDqo+XuBGL40b8= cyphar.com/go-pathrs v0.2.1/go.mod h1:y8f1EMG7r+hCuFf/rXsKqMJrJAUoADZGNh5/vZPKcGc= github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk= github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/cgwalters/jsonrpc-fdpass-go v0.0.0-20260126203148-2bca851a3863 h1:IxSkyu1DEg3XekvAJs3JNNiDg8fPJgR5BItqb1ZSLWI= +github.com/cgwalters/jsonrpc-fdpass-go v0.0.0-20260126203148-2bca851a3863/go.mod h1:naXj4fiEUjm+AaZDEGja5cJgASuAZwlsPSDBewGK+iY= github.com/containerd/stargz-snapshotter/estargz v0.18.2 h1:yXkZFYIzz3eoLwlTUZKz2iQ4MrckBxJjkmD16ynUTrw= github.com/containerd/stargz-snapshotter/estargz v0.18.2/go.mod h1:XyVU5tcJ3PRpkA9XS2T5us6Eg35yM0214Y+wvrZTBrY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -52,6 +54,8 @@ github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFd github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= +github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= github.com/opencontainers/runtime-spec v1.3.0 h1:YZupQUdctfhpZy3TM39nN9Ika5CBWT5diQ8ibYCRkxg= github.com/opencontainers/runtime-spec v1.3.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/selinux v1.13.1 h1:A8nNeceYngH9Ow++M+VVEwJVpdFmrlxsN22F+ISDCJE= diff --git a/vendor/github.com/cgwalters/jsonrpc-fdpass-go/.gitignore b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/.gitignore new file mode 100644 index 0000000000..1ee0e6b891 --- /dev/null +++ b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/.gitignore @@ -0,0 +1,23 @@ +# Binaries +*.exe +*.exe~ +*.dll +*.so +*.dylib +/echo + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool +*.out + +# Dependency directories +vendor/ + +# Build output +/target/ + +# Integration test Rust build artifacts +tests-integration/target/ +tests-integration/Cargo.lock diff --git a/vendor/github.com/cgwalters/jsonrpc-fdpass-go/Justfile b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/Justfile new file mode 100644 index 0000000000..b76683a7ed --- /dev/null +++ b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/Justfile @@ -0,0 +1,49 @@ +# Format and lint Go code +check: + go fmt ./... + go vet ./... + +# Run unit tests +unit: + go test -v ./... + +# Run unit tests with race detector +test-race: + go test -race -v ./... + +# Build all packages +build: + go build ./... + +# Build the example +build-example: + go build -o target/echo ./examples/echo + +# Run all tests +test-all: unit + +# Clean build artifacts +clean: + rm -rf target/ + rm -rf tests-integration/target/ + go clean ./... + +# Full CI check (format, lint, test) +ci: check unit + +# Run the integration tests against the Rust implementation +# Requires: cargo, go +test-integration: build-integration-server + go test -v ./tests-integration/... + +# Build the Rust integration test server +build-integration-server: + cargo build --manifest-path tests-integration/Cargo.toml + +# Run the echo server example +run-server socket="/tmp/echo.sock": + go run ./examples/echo server {{socket}} + +# Run the echo client example +run-client socket="/tmp/echo.sock": + go run ./examples/echo client {{socket}} diff --git a/vendor/github.com/cgwalters/jsonrpc-fdpass-go/LICENSE b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/LICENSE new file mode 100644 index 0000000000..4e800f548b --- /dev/null +++ b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Colin Walters + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/cgwalters/jsonrpc-fdpass-go/README.md b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/README.md new file mode 100644 index 0000000000..7c5a6bf1e7 --- /dev/null +++ b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/README.md @@ -0,0 +1,68 @@ +# jsonrpc-fdpass-go + +A Go implementation of JSON-RPC 2.0 with file descriptor passing over Unix domain sockets. + +This library implements the protocol specified in [jsonrpc-fdpass](https://github.com/cgwalters/jsonrpc-fdpass). + +## Protocol Overview + +- **Transport**: Unix domain sockets (SOCK_STREAM) +- **Framing**: Self-delimiting JSON (streaming parser) +- **FD Passing**: Via sendmsg/recvmsg with SCM_RIGHTS ancillary data +- **FD Count**: Top-level `fds` field indicates the number of file descriptors attached + +When file descriptors are attached to a message, the `fds` field is automatically +set to the count of FDs. File descriptors are passed positionally—the application +layer defines the semantic mapping between FD positions and parameters. + +## Installation + +```bash +go get github.com/cgwalters/jsonrpc-fdpass-go +``` + +## Usage + +```go +package main + +import ( + "net" + "os" + + fdpass "github.com/cgwalters/jsonrpc-fdpass-go" +) + +func main() { + // Connect to a Unix socket + conn, _ := net.DialUnix("unix", nil, &net.UnixAddr{Name: "/tmp/socket.sock", Net: "unix"}) + + // Create sender and receiver + sender := fdpass.NewSender(conn) + receiver := fdpass.NewReceiver(conn) + + // Send a request with a file descriptor + file, _ := os.Open("example.txt") + defer file.Close() + + req := fdpass.NewRequest("readFile", map[string]interface{}{ + "path": "example.txt", + }, 1) + + msg := &fdpass.MessageWithFds{ + Message: req, + FileDescriptors: []*os.File{file}, + } + + // The sender automatically sets the "fds" field to 1 + sender.Send(msg) + + // Receive response + resp, _ := receiver.Receive() + // Handle resp.Message and resp.FileDescriptors +} +``` + +## License + +MIT diff --git a/vendor/github.com/cgwalters/jsonrpc-fdpass-go/message.go b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/message.go new file mode 100644 index 0000000000..b4dcde7e83 --- /dev/null +++ b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/message.go @@ -0,0 +1,216 @@ +// Package fdpass implements JSON-RPC 2.0 with file descriptor passing over Unix domain sockets. +package fdpass + +import ( + "encoding/json" + "os" +) + +// JSONRPCVersion is the JSON-RPC protocol version. +const JSONRPCVersion = "2.0" + +// FDsKey is the JSON key for the file descriptor count field. +const FDsKey = "fds" + +// FileDescriptorErrorCode is the error code for FD-related protocol errors. +const FileDescriptorErrorCode = -32050 + +// Request represents a JSON-RPC 2.0 request. +type Request struct { + JsonRpc string `json:"jsonrpc"` + Method string `json:"method"` + Params interface{} `json:"params,omitempty"` + ID interface{} `json:"id"` + // Fds is the number of file descriptors attached to this message. + Fds *int `json:"fds,omitempty"` +} + +// Response represents a JSON-RPC 2.0 response. +type Response struct { + JsonRpc string `json:"jsonrpc"` + Result interface{} `json:"result,omitempty"` + Error *Error `json:"error,omitempty"` + ID interface{} `json:"id"` + // Fds is the number of file descriptors attached to this message. + Fds *int `json:"fds,omitempty"` +} + +// Notification represents a JSON-RPC 2.0 notification (a request without an ID). +type Notification struct { + JsonRpc string `json:"jsonrpc"` + Method string `json:"method"` + Params interface{} `json:"params,omitempty"` + // Fds is the number of file descriptors attached to this message. + Fds *int `json:"fds,omitempty"` +} + +// Error represents a JSON-RPC 2.0 error object. +type Error struct { + Code int `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` +} + +func (e *Error) Error() string { + return e.Message +} + +// MessageWithFds wraps a JSON-RPC message with associated file descriptors. +type MessageWithFds struct { + // Message is the JSON-RPC message (Request, Response, or Notification). + Message interface{} + // FileDescriptors are the file descriptors to pass with this message. + // The order corresponds to indices 0..N-1 matching the message's fds count. + FileDescriptors []*os.File +} + +// NewRequest creates a new JSON-RPC request. +func NewRequest(method string, params interface{}, id interface{}) *Request { + return &Request{ + JsonRpc: JSONRPCVersion, + Method: method, + Params: params, + ID: id, + } +} + +// NewResponse creates a new successful JSON-RPC response. +func NewResponse(result interface{}, id interface{}) *Response { + return &Response{ + JsonRpc: JSONRPCVersion, + Result: result, + ID: id, + } +} + +// NewErrorResponse creates a new error JSON-RPC response. +func NewErrorResponse(err *Error, id interface{}) *Response { + return &Response{ + JsonRpc: JSONRPCVersion, + Error: err, + ID: id, + } +} + +// NewNotification creates a new JSON-RPC notification. +func NewNotification(method string, params interface{}) *Notification { + return &Notification{ + JsonRpc: JSONRPCVersion, + Method: method, + Params: params, + } +} + +// GetFDCount reads the file descriptor count from a JSON value. +// Returns 0 if the `fds` field is absent or not a valid number. +func GetFDCount(value map[string]interface{}) int { + if fds, ok := value[FDsKey]; ok { + switch v := fds.(type) { + case float64: + return int(v) + case int: + return v + } + } + return 0 +} + +// FileDescriptorError creates a standard FD error for protocol violations. +func FileDescriptorError() *Error { + return &Error{ + Code: FileDescriptorErrorCode, + Message: "File Descriptor Error", + } +} + +// SetFDs sets the fds count on a Request. +func (r *Request) SetFDs(count int) { + if count > 0 { + r.Fds = &count + } else { + r.Fds = nil + } +} + +// GetFDs returns the fds count from a Request. +func (r *Request) GetFDs() int { + if r.Fds != nil { + return *r.Fds + } + return 0 +} + +// SetFDs sets the fds count on a Response. +func (r *Response) SetFDs(count int) { + if count > 0 { + r.Fds = &count + } else { + r.Fds = nil + } +} + +// GetFDs returns the fds count from a Response. +func (r *Response) GetFDs() int { + if r.Fds != nil { + return *r.Fds + } + return 0 +} + +// SetFDs sets the fds count on a Notification. +func (n *Notification) SetFDs(count int) { + if count > 0 { + n.Fds = &count + } else { + n.Fds = nil + } +} + +// GetFDs returns the fds count from a Notification. +func (n *Notification) GetFDs() int { + if n.Fds != nil { + return *n.Fds + } + return 0 +} + +// ParseMessage parses a raw JSON message into the appropriate type. +// It returns one of *Request, *Response, or *Notification. +func ParseMessage(data []byte) (interface{}, error) { + // First parse as a generic map to determine type + var raw map[string]json.RawMessage + if err := json.Unmarshal(data, &raw); err != nil { + return nil, err + } + + // Determine message type based on fields present + _, hasMethod := raw["method"] + _, hasID := raw["id"] + _, hasResult := raw["result"] + _, hasError := raw["error"] + + if hasMethod && hasID { + // Request + var req Request + if err := json.Unmarshal(data, &req); err != nil { + return nil, err + } + return &req, nil + } else if hasResult || hasError { + // Response + var resp Response + if err := json.Unmarshal(data, &resp); err != nil { + return nil, err + } + return &resp, nil + } else if hasMethod { + // Notification + var notif Notification + if err := json.Unmarshal(data, ¬if); err != nil { + return nil, err + } + return ¬if, nil + } + + return nil, &Error{Code: -32600, Message: "Invalid JSON-RPC message"} +} diff --git a/vendor/github.com/cgwalters/jsonrpc-fdpass-go/transport.go b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/transport.go new file mode 100644 index 0000000000..3a65af9f59 --- /dev/null +++ b/vendor/github.com/cgwalters/jsonrpc-fdpass-go/transport.go @@ -0,0 +1,302 @@ +package fdpass + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "sync" + + "golang.org/x/sys/unix" +) + +const ( + // MaxFDsPerMessage is the maximum number of file descriptors per message. + MaxFDsPerMessage = 8 + // ReadBufferSize is the size of the read buffer. + ReadBufferSize = 4096 +) + +var ( + // ErrConnectionClosed is returned when the connection is closed. + ErrConnectionClosed = errors.New("connection closed") + // ErrFramingError is returned when JSON parsing fails. + ErrFramingError = errors.New("framing error: invalid JSON") + // ErrMismatchedCount is returned when the number of FDs doesn't match the fds field. + ErrMismatchedCount = errors.New("mismatched file descriptor count") +) + +// Sender sends JSON-RPC messages with file descriptors over a Unix socket. +type Sender struct { + conn *net.UnixConn + mu sync.Mutex +} + +// NewSender creates a new Sender for the given Unix connection. +func NewSender(conn *net.UnixConn) *Sender { + return &Sender{conn: conn} +} + +// Send sends a message with optional file descriptors. +func (s *Sender) Send(msg *MessageWithFds) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Set the fds field on the message based on the number of file descriptors + fdCount := len(msg.FileDescriptors) + switch m := msg.Message.(type) { + case *Request: + m.SetFDs(fdCount) + case *Response: + m.SetFDs(fdCount) + case *Notification: + m.SetFDs(fdCount) + } + + // Serialize the message with the fds field set + msgData, err := json.Marshal(msg.Message) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + // Get the raw file descriptor for the socket + rawConn, err := s.conn.SyscallConn() + if err != nil { + return fmt.Errorf("failed to get syscall conn: %w", err) + } + + var sendErr error + err = rawConn.Control(func(fd uintptr) { + sendErr = s.sendWithFDs(int(fd), msgData, msg.FileDescriptors) + }) + if err != nil { + return err + } + return sendErr +} + +func (s *Sender) sendWithFDs(sockfd int, data []byte, files []*os.File) error { + bytesSent := 0 + fdsSent := false + + for bytesSent < len(data) { + remaining := data[bytesSent:] + + var n int + var err error + + if !fdsSent && len(files) > 0 { + // First chunk with FDs: use sendmsg with ancillary data + fds := make([]int, len(files)) + for i, f := range files { + fds[i] = int(f.Fd()) + } + + rights := unix.UnixRights(fds...) + n, err = unix.SendmsgN(sockfd, remaining, rights, nil, 0) + if err != nil { + return fmt.Errorf("sendmsg failed: %w", err) + } + fdsSent = true + } else { + // No FDs or FDs already sent: use regular send + n, err = unix.Write(sockfd, remaining) + if err != nil { + return fmt.Errorf("write failed: %w", err) + } + } + + bytesSent += n + } + + return nil +} + +// Receiver receives JSON-RPC messages with file descriptors from a Unix socket. +type Receiver struct { + conn *net.UnixConn + buffer []byte + fdQueue []*os.File + mu sync.Mutex +} + +// NewReceiver creates a new Receiver for the given Unix connection. +func NewReceiver(conn *net.UnixConn) *Receiver { + return &Receiver{ + conn: conn, + buffer: make([]byte, 0), + fdQueue: make([]*os.File, 0), + } +} + +// Receive receives the next message with its file descriptors. +func (r *Receiver) Receive() (*MessageWithFds, error) { + r.mu.Lock() + defer r.mu.Unlock() + + for { + // Try to parse a complete message from the buffer + msg, err := r.tryParseMessage() + if err != nil { + return nil, err + } + if msg != nil { + return msg, nil + } + + // Need more data + if err := r.readMoreData(); err != nil { + return nil, err + } + } +} + +func (r *Receiver) tryParseMessage() (*MessageWithFds, error) { + if len(r.buffer) == 0 { + return nil, nil + } + + // Use streaming JSON decoder to find message boundaries + decoder := json.NewDecoder(bytes.NewReader(r.buffer)) + var value map[string]interface{} + + err := decoder.Decode(&value) + if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) { + // Incomplete JSON - need more data + return nil, nil + } + if err != nil { + // Actual parse error - framing error + return nil, fmt.Errorf("%w: %v", ErrFramingError, err) + } + + // Successfully parsed a complete JSON value + // Use InputOffset to find consumed bytes (Go 1.21+) + bytesConsumed := decoder.InputOffset() + + // Extract the consumed bytes for re-parsing + consumedData := r.buffer[:bytesConsumed] + + // Remove consumed bytes from buffer + r.buffer = r.buffer[bytesConsumed:] + + // Read the fds count from the message + fdCount := GetFDCount(value) + + // Check we have enough FDs + if fdCount > len(r.fdQueue) { + return nil, fmt.Errorf("%w: expected %d FDs, have %d in queue", + ErrMismatchedCount, fdCount, len(r.fdQueue)) + } + + // Dequeue FDs + fds := make([]*os.File, fdCount) + copy(fds, r.fdQueue[:fdCount]) + r.fdQueue = r.fdQueue[fdCount:] + + // Parse the message into the appropriate type + msg, err := ParseMessage(consumedData) + if err != nil { + return nil, err + } + + return &MessageWithFds{ + Message: msg, + FileDescriptors: fds, + }, nil +} + +func (r *Receiver) readMoreData() error { + rawConn, err := r.conn.SyscallConn() + if err != nil { + return fmt.Errorf("failed to get syscall conn: %w", err) + } + + var readErr error + var bytesRead int + var receivedFDs []*os.File + + err = rawConn.Read(func(fd uintptr) bool { + bytesRead, receivedFDs, readErr = r.recvWithFDs(int(fd)) + // Return true to indicate we're done with this read operation + // Return false only if we get EAGAIN/EWOULDBLOCK + if readErr != nil { + if errors.Is(readErr, unix.EAGAIN) || errors.Is(readErr, unix.EWOULDBLOCK) { + readErr = nil + return false // Tell runtime to wait and retry + } + } + return true + }) + + if err != nil { + return err + } + if readErr != nil { + return readErr + } + + if bytesRead == 0 && len(receivedFDs) == 0 { + return ErrConnectionClosed + } + + // Append received FDs to queue + r.fdQueue = append(r.fdQueue, receivedFDs...) + + return nil +} + +func (r *Receiver) recvWithFDs(sockfd int) (int, []*os.File, error) { + buf := make([]byte, ReadBufferSize) + // Allocate space for control message (for up to MaxFDsPerMessage FDs) + // Each FD is 4 bytes (int32), use CmsgSpace to get properly aligned size + oob := make([]byte, unix.CmsgSpace(MaxFDsPerMessage*4)) + + n, oobn, _, _, err := unix.Recvmsg(sockfd, buf, oob, unix.MSG_CMSG_CLOEXEC) + if err != nil { + return 0, nil, err + } + + // Append data to buffer + if n > 0 { + r.buffer = append(r.buffer, buf[:n]...) + } + + // Parse control messages for FDs + var files []*os.File + if oobn > 0 { + scms, err := unix.ParseSocketControlMessage(oob[:oobn]) + if err != nil { + return n, nil, fmt.Errorf("failed to parse control message: %w", err) + } + + for _, scm := range scms { + fds, err := unix.ParseUnixRights(&scm) + if err != nil { + continue + } + for _, fd := range fds { + files = append(files, os.NewFile(uintptr(fd), "")) + } + } + } + + return n, files, nil +} + +// Close closes the receiver and any pending file descriptors in the queue. +func (r *Receiver) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + // Close any FDs remaining in the queue to prevent leaks + for _, f := range r.fdQueue { + f.Close() + } + r.fdQueue = nil + + return nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 854785ebf1..25e2be70fa 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -30,6 +30,9 @@ github.com/VividCortex/ewma # github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d ## explicit github.com/acarl005/stripansi +# github.com/cgwalters/jsonrpc-fdpass-go v0.0.0-20260126203148-2bca851a3863 +## explicit; go 1.21 +github.com/cgwalters/jsonrpc-fdpass-go # github.com/checkpoint-restore/checkpointctl v1.5.0 ## explicit; go 1.24.6 github.com/checkpoint-restore/checkpointctl/lib From bf6cc4ff19761e280d3ff456e19ff1227989a054 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 11 Feb 2026 10:51:18 +0100 Subject: [PATCH 04/12] storage/pkg/splitfdstream: new package Signed-off-by: Giuseppe Scrivano --- storage/pkg/splitfdstream/server_linux.go | 597 ++++++++++++++++++ .../pkg/splitfdstream/server_unsupported.go | 64 ++ storage/pkg/splitfdstream/types.go | 229 +++++++ 3 files changed, 890 insertions(+) create mode 100644 storage/pkg/splitfdstream/server_linux.go create mode 100644 storage/pkg/splitfdstream/server_unsupported.go create mode 100644 storage/pkg/splitfdstream/types.go diff --git a/storage/pkg/splitfdstream/server_linux.go b/storage/pkg/splitfdstream/server_linux.go new file mode 100644 index 0000000000..58187c8506 --- /dev/null +++ b/storage/pkg/splitfdstream/server_linux.go @@ -0,0 +1,597 @@ +//go:build linux + +package splitfdstream + +import ( + "errors" + "fmt" + "io" + "net" + "os" + "runtime" + "sync" + + fdpass "github.com/cgwalters/jsonrpc-fdpass-go" + "golang.org/x/sys/unix" +) + +// JSON-RPC 2.0 standard error codes as documented here: https://www.jsonrpc.org/specification +const ( + jsonrpcInvalidRequest = -32600 + jsonrpcMethodNotFound = -32601 + jsonrpcInvalidParams = -32602 + jsonrpcServerError = -32000 +) + +// sendRetry retries sender.Send on EAGAIN (non-blocking socket buffer full). +func sendRetry(sender *fdpass.Sender, msg *fdpass.MessageWithFds) error { + for { + err := sender.Send(msg) + if err == nil { + return nil + } + if errors.Is(err, unix.EAGAIN) || errors.Is(err, unix.EWOULDBLOCK) { + runtime.Gosched() + continue + } + return err + } +} + +// JSONRPCServer manages a JSON-RPC server using the external library. +type JSONRPCServer struct { + driver SplitFDStreamDriver + store Store + listener net.Listener + running bool + mu sync.RWMutex + shutdown chan struct{} + connections sync.WaitGroup +} + +// NewJSONRPCServer creates a new JSON-RPC server. +func NewJSONRPCServer(driver SplitFDStreamDriver, store Store) *JSONRPCServer { + return &JSONRPCServer{ + driver: driver, + store: store, + shutdown: make(chan struct{}), + } +} + +// Start starts the JSON-RPC server listening on the given Unix socket. +func (s *JSONRPCServer) Start(socketPath string) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.running { + return fmt.Errorf("server already running") + } + + os.Remove(socketPath) + + listener, err := net.Listen("unix", socketPath) + if err != nil { + return fmt.Errorf("failed to listen on %s: %w", socketPath, err) + } + + s.listener = listener + s.running = true + + go s.acceptConnections() + + return nil +} + +// Stop stops the JSON-RPC server. +func (s *JSONRPCServer) Stop() error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.running { + return nil + } + + close(s.shutdown) + if s.listener != nil { + s.listener.Close() + } + s.connections.Wait() + s.running = false + + return nil +} + +func (s *JSONRPCServer) acceptConnections() { + for { + conn, err := s.listener.Accept() + if err != nil { + select { + case <-s.shutdown: + return + default: + continue + } + } + + unixConn, ok := conn.(*net.UnixConn) + if !ok { + conn.Close() + continue + } + + s.TrackConnection() + go s.HandleConnection(unixConn) + } +} + +// TrackConnection increments the connection counter. Call this before +// spawning a goroutine that calls HandleConnection so that Stop()'s +// Wait() cannot return before the handler has started. +func (s *JSONRPCServer) TrackConnection() { + s.connections.Add(1) +} + +// HandleConnection handles a single client connection. +// The caller must call TrackConnection before spawning HandleConnection +// in a goroutine. +func (s *JSONRPCServer) HandleConnection(conn *net.UnixConn) { + defer s.connections.Done() + defer conn.Close() + + receiver := fdpass.NewReceiver(conn) + sender := fdpass.NewSender(conn) + defer receiver.Close() + + for { + select { + case <-s.shutdown: + return + default: + } + + msgWithFds, err := receiver.Receive() + if err != nil { + return + } + + req, ok := msgWithFds.Message.(*fdpass.Request) + if !ok { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcInvalidRequest, Message: "Invalid Request"}, + nil, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + return + } + continue + } + + s.handleRequest(sender, req, msgWithFds.FileDescriptors) + } +} + +func (s *JSONRPCServer) handleRequest(sender *fdpass.Sender, req *fdpass.Request, fds []*os.File) { + switch req.Method { + case "GetSplitFDStream": + s.handleGetSplitFDStream(sender, req) + case "GetImage": + s.handleGetImage(sender, req) + default: + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcMethodNotFound, Message: fmt.Sprintf("method %q not found", req.Method)}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending method-not-found response: %v\n", err) + } + } +} + +func (s *JSONRPCServer) handleGetSplitFDStream(sender *fdpass.Sender, req *fdpass.Request) { + params, ok := req.Params.(map[string]interface{}) + if !ok { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcInvalidParams, Message: "params must be an object"}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + layerID, _ := params["layerId"].(string) + if layerID == "" { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcInvalidParams, Message: "layerId is required"}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + parentID, _ := params["parentId"].(string) + + stream, fileFDs, err := s.driver.GetSplitFDStream(layerID, parentID, &GetSplitFDStreamOpts{}) + if err != nil { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcServerError, Message: err.Error()}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + // Write stream data directly to a memfd to avoid holding the entire + // stream in memory twice (once in the byte slice, once in the memfd). + streamFd, err := unix.MemfdCreate("splitfdstream", unix.MFD_CLOEXEC) + if err != nil { + stream.Close() + for _, f := range fileFDs { + f.Close() + } + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcServerError, Message: fmt.Sprintf("memfd_create: %v", err)}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + streamFile := os.NewFile(uintptr(streamFd), "splitfdstream") + streamSize, err := io.Copy(streamFile, stream) + stream.Close() + if err != nil { + streamFile.Close() + for _, f := range fileFDs { + f.Close() + } + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcServerError, Message: fmt.Sprintf("failed to write stream to memfd: %v", err)}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + if _, err := streamFile.Seek(0, 0); err != nil { + streamFile.Close() + for _, f := range fileFDs { + f.Close() + } + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcServerError, Message: fmt.Sprintf("memfd seek: %v", err)}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + // Prepend the stream memfd to the file descriptor list. + // allFDs[0] = stream data, allFDs[1:] = content file descriptors. + allFDs := make([]*os.File, 0, 1+len(fileFDs)) + allFDs = append(allFDs, streamFile) + allFDs = append(allFDs, fileFDs...) + + // Send the response with the first batch of FDs. + // The library limits to MaxFDsPerMessage per sendmsg, so remaining + // FDs are sent as follow-up "fds" notifications. + firstBatch := allFDs + if len(firstBatch) > fdpass.MaxFDsPerMessage { + firstBatch = allFDs[:fdpass.MaxFDsPerMessage] + } + + result := map[string]interface{}{ + "streamSize": streamSize, + "totalFDs": len(allFDs), + } + + resp := fdpass.NewResponse(result, req.ID) + if err := sendRetry(sender, &fdpass.MessageWithFds{ + Message: resp, + FileDescriptors: firstBatch, + }); err != nil { + fmt.Fprintf(os.Stderr, "error sending initial response: %v\n", err) + return + } + + // Send remaining FDs in batches via notifications + for i := fdpass.MaxFDsPerMessage; i < len(allFDs); i += fdpass.MaxFDsPerMessage { + end := i + fdpass.MaxFDsPerMessage + if end > len(allFDs) { + end = len(allFDs) + } + batch := allFDs[i:end] + + notif := fdpass.NewNotification("fds", nil) + if err := sendRetry(sender, &fdpass.MessageWithFds{ + Message: notif, + FileDescriptors: batch, + }); err != nil { + fmt.Fprintf(os.Stderr, "error sending FD batch at %d/%d: %v\n", i, len(allFDs), err) + return + } + } +} + +func (s *JSONRPCServer) handleGetImage(sender *fdpass.Sender, req *fdpass.Request) { + params, ok := req.Params.(map[string]interface{}) + if !ok { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcInvalidParams, Message: "params must be an object"}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + imageID, _ := params["imageId"].(string) + if imageID == "" { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcInvalidParams, Message: "imageId is required"}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + if s.store == nil { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcServerError, Message: "store not available for image operations"}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + metadata, err := GetImageMetadata(s.store, imageID) + if err != nil { + resp := fdpass.NewErrorResponse( + &fdpass.Error{Code: jsonrpcServerError, Message: fmt.Sprintf("failed to get image metadata: %v", err)}, + req.ID, + ) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending error response: %v\n", err) + } + return + } + + result := map[string]interface{}{ + "manifest": string(metadata.ManifestJSON), + "config": string(metadata.ConfigJSON), + "layerDigests": metadata.LayerDigests, + } + + resp := fdpass.NewResponse(result, req.ID) + if err := sendRetry(sender, &fdpass.MessageWithFds{Message: resp}); err != nil { + fmt.Fprintf(os.Stderr, "error sending image response: %v\n", err) + return + } +} + +// JSONRPCClient implements a JSON-RPC client. +type JSONRPCClient struct { + conn *net.UnixConn + sender *fdpass.Sender + receiver *fdpass.Receiver + mu sync.Mutex + nextID int64 +} + +// NewJSONRPCClient connects to a JSON-RPC server on the given Unix socket. +func NewJSONRPCClient(socketPath string) (*JSONRPCClient, error) { + conn, err := net.Dial("unix", socketPath) + if err != nil { + return nil, fmt.Errorf("failed to connect to socket: %w", err) + } + + unixConn, ok := conn.(*net.UnixConn) + if !ok { + conn.Close() + return nil, fmt.Errorf("connection is not a unix socket") + } + + return &JSONRPCClient{ + conn: unixConn, + sender: fdpass.NewSender(unixConn), + receiver: fdpass.NewReceiver(unixConn), + nextID: 1, + }, nil +} + +// Close closes the client connection. +func (c *JSONRPCClient) Close() error { + if c.receiver != nil { + c.receiver.Close() + } + if c.conn != nil { + return c.conn.Close() + } + return nil +} + +// GetSplitFDStream sends a GetSplitFDStream request and returns the response. +func (c *JSONRPCClient) GetSplitFDStream(layerID, parentID string) ([]byte, []*os.File, error) { + c.mu.Lock() + id := c.nextID + c.nextID++ + c.mu.Unlock() + + req := fdpass.NewRequest("GetSplitFDStream", map[string]interface{}{ + "layerId": layerID, + "parentId": parentID, + }, id) + + if err := sendRetry(c.sender, &fdpass.MessageWithFds{Message: req}); err != nil { + return nil, nil, fmt.Errorf("failed to send request: %w", err) + } + + // Receive the initial response with stream data and first batch of FDs + respMsg, err := c.receiver.Receive() + if err != nil { + return nil, nil, fmt.Errorf("failed to receive response: %w", err) + } + + resp, ok := respMsg.Message.(*fdpass.Response) + if !ok { + return nil, nil, fmt.Errorf("unexpected response type: %T", respMsg.Message) + } + + if resp.Error != nil { + return nil, nil, fmt.Errorf("server error: %s", resp.Error.Message) + } + + result, ok := resp.Result.(map[string]interface{}) + if !ok { + return nil, nil, fmt.Errorf("unexpected result type: %T", resp.Result) + } + + // Collect FDs: first batch came with the response + var allFDs []*os.File + allFDs = append(allFDs, respMsg.FileDescriptors...) + + // Read totalFDs to know how many more to expect + totalFDs := 0 + if tf, ok := result["totalFDs"].(float64); ok { + totalFDs = int(tf) + } + + // Receive remaining FDs from follow-up notifications + for len(allFDs) < totalFDs { + msg, err := c.receiver.Receive() + if err != nil { + for _, f := range allFDs { + f.Close() + } + return nil, nil, fmt.Errorf("failed to receive FD batch (%d/%d received): %w", len(allFDs), totalFDs, err) + } + allFDs = append(allFDs, msg.FileDescriptors...) + } + + if len(allFDs) == 0 { + return nil, nil, fmt.Errorf("no file descriptors received") + } + + // allFDs[0] is a memfd containing the stream data, the rest are content FDs + streamFile := allFDs[0] + contentFDs := allFDs[1:] + + streamData, err := io.ReadAll(streamFile) + streamFile.Close() + if err != nil { + for _, f := range contentFDs { + f.Close() + } + return nil, nil, fmt.Errorf("failed to read stream data from fd: %w", err) + } + + return streamData, contentFDs, nil +} + +// GetImage sends a GetImage request and returns image metadata. +func (c *JSONRPCClient) GetImage(imageID string) (*ImageMetadata, error) { + c.mu.Lock() + id := c.nextID + c.nextID++ + c.mu.Unlock() + + req := fdpass.NewRequest("GetImage", map[string]interface{}{ + "imageId": imageID, + }, id) + + if err := sendRetry(c.sender, &fdpass.MessageWithFds{Message: req}); err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + + respMsg, err := c.receiver.Receive() + if err != nil { + return nil, fmt.Errorf("failed to receive response: %w", err) + } + + resp, ok := respMsg.Message.(*fdpass.Response) + if !ok { + return nil, fmt.Errorf("unexpected response type: %T", respMsg.Message) + } + + if resp.Error != nil { + return nil, fmt.Errorf("server error: %s", resp.Error.Message) + } + + result, ok := resp.Result.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("unexpected result type: %T", resp.Result) + } + + manifestJSON, _ := result["manifest"].(string) + configJSON, _ := result["config"].(string) + + layerDigestsInterface, _ := result["layerDigests"].([]interface{}) + layerDigests := make([]string, len(layerDigestsInterface)) + for i, v := range layerDigestsInterface { + layerDigests[i], _ = v.(string) + } + + return &ImageMetadata{ + ManifestJSON: []byte(manifestJSON), + ConfigJSON: []byte(configJSON), + LayerDigests: layerDigests, + }, nil +} + +// CreateSocketPair creates a pair of connected UNIX sockets. +func CreateSocketPair() (*net.UnixConn, *net.UnixConn, error) { + fds, err := unix.Socketpair(unix.AF_UNIX, unix.SOCK_STREAM, 0) + if err != nil { + return nil, nil, fmt.Errorf("failed to create socket pair: %w", err) + } + + clientFile := os.NewFile(uintptr(fds[0]), "client") + serverFile := os.NewFile(uintptr(fds[1]), "server") + + clientConn, err := net.FileConn(clientFile) + if err != nil { + clientFile.Close() + serverFile.Close() + return nil, nil, fmt.Errorf("failed to create client connection: %w", err) + } + + serverConn, err := net.FileConn(serverFile) + if err != nil { + clientConn.Close() + serverFile.Close() + return nil, nil, fmt.Errorf("failed to create server connection: %w", err) + } + + clientFile.Close() + serverFile.Close() + + clientUnix, ok := clientConn.(*net.UnixConn) + if !ok { + clientConn.Close() + serverConn.Close() + return nil, nil, fmt.Errorf("failed to cast client to UnixConn") + } + + serverUnix, ok := serverConn.(*net.UnixConn) + if !ok { + clientConn.Close() + serverConn.Close() + return nil, nil, fmt.Errorf("failed to cast server to UnixConn") + } + + return clientUnix, serverUnix, nil +} diff --git a/storage/pkg/splitfdstream/server_unsupported.go b/storage/pkg/splitfdstream/server_unsupported.go new file mode 100644 index 0000000000..3495efb5f7 --- /dev/null +++ b/storage/pkg/splitfdstream/server_unsupported.go @@ -0,0 +1,64 @@ +//go:build !linux + +package splitfdstream + +import ( + "fmt" + "net" + "os" +) + +// JSONRPCServer is not supported on this platform. +type JSONRPCServer struct{} + +// NewJSONRPCServer creates a new JSON-RPC server stub for unsupported platforms. +func NewJSONRPCServer(driver SplitFDStreamDriver, store Store) *JSONRPCServer { + return &JSONRPCServer{} +} + +// TrackConnection is not supported on this platform. +func (s *JSONRPCServer) TrackConnection() { +} + +// HandleConnection is not supported on this platform. +func (s *JSONRPCServer) HandleConnection(conn *net.UnixConn) { + panic("JSONRPCServer is not supported on this platform") +} + +// Start is not supported on this platform. +func (s *JSONRPCServer) Start(socketPath string) error { + return fmt.Errorf("JSONRPCServer is not supported on this platform") +} + +// Stop is not supported on this platform. +func (s *JSONRPCServer) Stop() error { + return fmt.Errorf("JSONRPCServer is not supported on this platform") +} + +// JSONRPCClient is not supported on this platform. +type JSONRPCClient struct{} + +// NewJSONRPCClient creates a new JSON-RPC client stub for unsupported platforms. +func NewJSONRPCClient(socketPath string) (*JSONRPCClient, error) { + return nil, fmt.Errorf("JSONRPCClient is not supported on this platform") +} + +// Close is not supported on this platform. +func (c *JSONRPCClient) Close() error { + return fmt.Errorf("JSONRPCClient is not supported on this platform") +} + +// GetSplitFDStream is not supported on this platform. +func (c *JSONRPCClient) GetSplitFDStream(layerID, parentID string) ([]byte, []*os.File, error) { + return nil, nil, fmt.Errorf("GetSplitFDStream is not supported on this platform") +} + +// GetImage is not supported on this platform. +func (c *JSONRPCClient) GetImage(imageID string) (*ImageMetadata, error) { + return nil, fmt.Errorf("GetImage is not supported on this platform") +} + +// CreateSocketPair is not supported on this platform. +func CreateSocketPair() (*net.UnixConn, *net.UnixConn, error) { + return nil, nil, fmt.Errorf("CreateSocketPair is not supported on this platform") +} diff --git a/storage/pkg/splitfdstream/types.go b/storage/pkg/splitfdstream/types.go new file mode 100644 index 0000000000..ed3ca515b3 --- /dev/null +++ b/storage/pkg/splitfdstream/types.go @@ -0,0 +1,229 @@ +package splitfdstream + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "io" + "os" + "strings" + + v1 "github.com/opencontainers/image-spec/specs-go/v1" + "go.podman.io/storage/pkg/idtools" +) + +// Store represents the minimal interface needed for image metadata access. +type Store interface { + ImageBigData(id, key string) ([]byte, error) + ListImageBigData(id string) ([]string, error) + ResolveImageID(id string) (actualID string, topLayerID string, err error) + LayerParent(id string) (parentID string, err error) +} + +// SplitFDStreamDriver defines the interface that storage drivers must implement +// to support splitfdstream operations. +type SplitFDStreamDriver interface { + // ApplySplitFDStream applies a splitfdstream to a layer. + ApplySplitFDStream(options *ApplySplitFDStreamOpts) (int64, error) + + // GetSplitFDStream generates a splitfdstream for a layer. + GetSplitFDStream(id, parent string, options *GetSplitFDStreamOpts) (io.ReadCloser, []*os.File, error) +} + +// ImageMetadata holds manifest and config data for an OCI image. +type ImageMetadata struct { + ManifestJSON []byte `json:"manifest"` + ConfigJSON []byte `json:"config"` + LayerDigests []string `json:"layerDigests"` +} + +// findManifest finds the image manifest from BigData keys. +// It looks for manifest-* keys containing an image manifest (has "config" field), +// filtering out manifest lists/indexes. +func findManifest(store Store, imageID string) ([]byte, error) { + availableKeys, err := store.ListImageBigData(imageID) + if err != nil { + return nil, fmt.Errorf("failed to list BigData keys for %s: %w", imageID, err) + } + + // Try manifest-* keys that contain an actual image manifest (not a manifest list). + // An image manifest has a config descriptor; a manifest list/index does not. + for _, key := range availableKeys { + if !strings.HasPrefix(key, "manifest") { + continue + } + data, err := store.ImageBigData(imageID, key) + if err != nil { + continue + } + var manifest v1.Manifest + if err := json.Unmarshal(data, &manifest); err != nil { + continue + } + if manifest.Config.MediaType != "" { + return data, nil + } + } + + // Fall back to generic "manifest" key + data, err := store.ImageBigData(imageID, "manifest") + if err != nil { + return nil, fmt.Errorf("no manifest found for image %s", imageID) + } + return data, nil +} + +// findConfig finds the image config from BigData keys. +// Config is typically stored under a digest-format key (e.g., "sha256:abc..."). +func findConfig(store Store, imageID string) ([]byte, error) { + availableKeys, err := store.ListImageBigData(imageID) + if err != nil { + return nil, fmt.Errorf("failed to list BigData keys for %s: %w", imageID, err) + } + + // Look for digest-format keys that aren't manifests + for _, key := range availableKeys { + if strings.Contains(key, ":") && !strings.HasPrefix(key, "manifest") { + data, err := store.ImageBigData(imageID, key) + if err == nil { + return data, nil + } + } + } + + return nil, fmt.Errorf("no config found for image %s", imageID) +} + +// GetImageMetadata retrieves manifest, config, and layer information for an image. +func GetImageMetadata(store Store, imageID string) (*ImageMetadata, error) { + actualID, topLayerID, err := store.ResolveImageID(imageID) + if err != nil { + return nil, fmt.Errorf("failed to resolve image %s: %w", imageID, err) + } + + manifestJSON, err := findManifest(store, actualID) + if err != nil { + return nil, fmt.Errorf("failed to get manifest for %s (resolved to %s): %w", imageID, actualID, err) + } + + configJSON, err := findConfig(store, actualID) + if err != nil { + return nil, fmt.Errorf("failed to get config for %s (resolved to %s): %w", imageID, actualID, err) + } + + // Walk the layer chain using store.LayerParent + var layerIDs []string + layerID := topLayerID + for layerID != "" { + layerIDs = append(layerIDs, layerID) + parentID, err := store.LayerParent(layerID) + if err != nil { + break + } + layerID = parentID + } + + // Fall back to manifest layer digests if layer chain traversal failed + if len(layerIDs) == 0 { + var manifest v1.Manifest + if err := json.Unmarshal(manifestJSON, &manifest); err != nil { + return nil, fmt.Errorf("failed to parse manifest: %w", err) + } + layerIDs = make([]string, len(manifest.Layers)) + for i, layer := range manifest.Layers { + layerIDs[i] = layer.Digest.String() + } + } + + return &ImageMetadata{ + ManifestJSON: manifestJSON, + ConfigJSON: configJSON, + LayerDigests: layerIDs, + }, nil +} + +// ApplySplitFDStreamOpts provides options for ApplySplitFDStream operations. +type ApplySplitFDStreamOpts struct { + LayerID string + Stream io.Reader + FileDescriptors []*os.File + IgnoreChownErrors bool + MountLabel string + StagingDir string + IDMappings *idtools.IDMappings + ForceMask *os.FileMode +} + +// Validate checks if the options are valid. +func (opts *ApplySplitFDStreamOpts) Validate() error { + if opts.LayerID == "" && opts.StagingDir == "" { + return fmt.Errorf("either LayerID or StagingDir must be specified") + } + return nil +} + +// GetSplitFDStreamOpts provides options for GetSplitFDStream operations. +type GetSplitFDStreamOpts struct { + MountLabel string + IDMappings *idtools.IDMappings +} + +// SplitFDStreamWriter writes data in the composefs-rs splitfdstream format. +// The format uses signed 64-bit little-endian prefixes: +// - Negative prefix: abs(prefix) bytes of inline data follow +// - Non-negative prefix: reference to external file descriptor at index prefix +type SplitFDStreamWriter struct { + writer io.Writer +} + +// NewWriter creates a new SplitFDStreamWriter. +func NewWriter(w io.Writer) *SplitFDStreamWriter { + return &SplitFDStreamWriter{writer: w} +} + +// WriteInline writes inline data with a negative prefix indicating the data length. +func (w *SplitFDStreamWriter) WriteInline(data []byte) error { + if len(data) == 0 { + return nil + } + prefix := int64(-len(data)) + if err := binary.Write(w.writer, binary.LittleEndian, prefix); err != nil { + return fmt.Errorf("failed to write inline prefix: %w", err) + } + if _, err := w.writer.Write(data); err != nil { + return fmt.Errorf("failed to write inline data: %w", err) + } + return nil +} + +// WriteInlinePrefix writes a negative prefix indicating that size bytes of +// inline data will follow. Use WriteRaw to write the actual data in chunks. +// This is useful when the data is too large to fit in a single WriteInline call. +func (w *SplitFDStreamWriter) WriteInlinePrefix(size int64) error { + if size <= 0 { + return nil + } + prefix := -size + if err := binary.Write(w.writer, binary.LittleEndian, prefix); err != nil { + return fmt.Errorf("failed to write inline prefix: %w", err) + } + return nil +} + +// WriteRaw writes raw data without any prefix framing. +// Must be preceded by a WriteInlinePrefix call with the total size. +func (w *SplitFDStreamWriter) WriteRaw(data []byte) error { + if _, err := w.writer.Write(data); err != nil { + return fmt.Errorf("failed to write raw data: %w", err) + } + return nil +} + +// WriteExternal writes a reference to an external file descriptor. +func (w *SplitFDStreamWriter) WriteExternal(fdIndex int) error { + prefix := int64(fdIndex) + if err := binary.Write(w.writer, binary.LittleEndian, prefix); err != nil { + return fmt.Errorf("failed to write external fd reference: %w", err) + } + return nil +} From 1ef55d9345d847e20b0b1ee9974618abfc58bcf1 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 16 Feb 2026 18:28:15 +0100 Subject: [PATCH 05/12] storage/store: add new APIs to resolve image ID Signed-off-by: Giuseppe Scrivano --- storage/store.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/storage/store.go b/storage/store.go index 3d8ea50759..caa21fd7c9 100644 --- a/storage/store.go +++ b/storage/store.go @@ -509,6 +509,12 @@ type Store interface { // Image returns a specific image. Image(id string) (*Image, error) + // ResolveImageID resolves an image reference to its actual ID and top layer ID. + ResolveImageID(id string) (string, string, error) + + // LayerParent returns the parent layer ID for the given layer. + LayerParent(id string) (string, error) + // ImagesByTopLayer returns a list of images which reference the specified // layer as their top layer. They will have different IDs and names // and may have different metadata, big data items, and flags. @@ -3584,6 +3590,22 @@ func (s *store) Image(id string) (*Image, error) { return nil, fmt.Errorf("locating image with ID %q: %w", id, ErrImageUnknown) } +func (s *store) ResolveImageID(id string) (string, string, error) { + img, err := s.Image(id) + if err != nil { + return "", "", err + } + return img.ID, img.TopLayer, nil +} + +func (s *store) LayerParent(id string) (string, error) { + l, err := s.Layer(id) + if err != nil { + return "", err + } + return l.Parent, nil +} + func (s *store) ImagesByTopLayer(id string) ([]*Image, error) { layer, err := s.Layer(id) if err != nil { From f49145343b50ffafbbc5162f6059fe48c085beb4 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Thu, 5 Feb 2026 13:15:03 +0000 Subject: [PATCH 06/12] storage: add SplitFDStreamStore interface Extend the store with splitfdstream capabilities exposed via a UNIX socket for JSON-RPC communication. Signed-off-by: Giuseppe Scrivano --- storage/store.go | 63 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/storage/store.go b/storage/store.go index caa21fd7c9..1c119de64b 100644 --- a/storage/store.go +++ b/storage/store.go @@ -32,6 +32,7 @@ import ( "go.podman.io/storage/pkg/ioutils" "go.podman.io/storage/pkg/lockfile" "go.podman.io/storage/pkg/parsers" + "go.podman.io/storage/pkg/splitfdstream" "go.podman.io/storage/pkg/stringutils" "go.podman.io/storage/pkg/system" "go.podman.io/storage/types" @@ -618,6 +619,15 @@ type Store interface { Dedup(DedupArgs) (drivers.DedupResult, error) } +// SplitFDStreamStore extends the Store interface with splitfdstream capabilities. +// This API is experimental and can be changed without bumping the major version number. +type SplitFDStreamStore interface { + Store + + // SplitFDStreamSocket returns a socket for splitfdstream operations. + SplitFDStreamSocket() (*os.File, error) +} + // AdditionalLayer represents a layer that is contained in the additional layer store // This API is experimental and can be changed without bumping the major version number. type AdditionalLayer interface { @@ -789,9 +799,14 @@ type store struct { layerStoreUseGetters rwLayerStore // Almost all users should use the provided accessors instead of accessing this field directly. roLayerStoresUseGetters []roLayerStore // Almost all users should use the provided accessors instead of accessing this field directly. - // FIXME: The following fields need locking, and don’t have it. + // FIXME: The following fields need locking, and don't have it. additionalUIDs *idSet // Set by getAvailableIDs() additionalGIDs *idSet // Set by getAvailableIDs() + + // jsonRPCServer manages the JSON-RPC server for storage operations. + // This API is experimental and can be changed without bumping the major version number. + // Protected by graphLock (via startUsingGraphDriver). + jsonRPCServer *splitfdstream.JSONRPCServer } // GetStore attempts to find an already-created Store object matching the @@ -4113,3 +4128,49 @@ func (s *store) Dedup(req DedupArgs) (drivers.DedupResult, error) { return rlstore.dedup(r) }) } + +// SplitFDStreamSocket returns a UNIX socket file descriptor for split FD stream operations. +// JSON-RPC requests for split FD stream operations are sent over this socket. +// The caller is responsible for closing the returned file when done. +// This API is experimental and can be changed without bumping the major version number. +func (s *store) SplitFDStreamSocket() (*os.File, error) { + if err := s.startUsingGraphDriver(); err != nil { + return nil, err + } + defer s.stopUsingGraphDriver() + + // Check if driver supports splitfdstream operations + if _, ok := s.graphDriver.(splitfdstream.SplitFDStreamDriver); !ok { + return nil, fmt.Errorf("driver %s does not support split FD stream operations: %w", s.graphDriver.String(), drivers.ErrNotSupported) + } + + // Create socket pair - one end for the caller, one end for the server + clientConn, serverConn, err := splitfdstream.CreateSocketPair() + if err != nil { + return nil, fmt.Errorf("failed to create socket pair: %w", err) + } + + // Get file descriptor from client connection before starting + // the server goroutine, so cleanup is straightforward on error. + clientFile, err := clientConn.File() + if err != nil { + clientConn.Close() + serverConn.Close() + return nil, fmt.Errorf("failed to get file from connection: %w", err) + } + clientConn.Close() + + // Initialize server if not already created + if s.jsonRPCServer == nil { + driver := s.graphDriver.(splitfdstream.SplitFDStreamDriver) + s.jsonRPCServer = splitfdstream.NewJSONRPCServer(driver, s) + } + + // Start handling the server connection in a goroutine. + // Add to the WaitGroup before spawning the goroutine to avoid + // a race with Stop() -> connections.Wait(). + s.jsonRPCServer.TrackConnection() + go s.jsonRPCServer.HandleConnection(serverConn) + + return clientFile, nil +} From e23a04d3e889cece0b618c5a328ecd3cd55f498f Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 9 Feb 2026 14:27:12 +0100 Subject: [PATCH 07/12] storage/archive,chrootarchive: add support for splitfdstream Signed-off-by: Giuseppe Scrivano --- storage/pkg/archive/archive.go | 81 ++++++- storage/pkg/archive/archive_test.go | 3 +- storage/pkg/archive/diff.go | 6 +- storage/pkg/archive/splitfdstream.go | 97 ++++++++ storage/pkg/chrootarchive/init_unix.go | 1 + .../pkg/chrootarchive/splitfdstream_unix.go | 218 ++++++++++++++++++ .../splitfdstream_unsupported.go | 17 ++ 7 files changed, 408 insertions(+), 15 deletions(-) create mode 100644 storage/pkg/archive/splitfdstream.go create mode 100644 storage/pkg/chrootarchive/splitfdstream_unix.go create mode 100644 storage/pkg/chrootarchive/splitfdstream_unsupported.go diff --git a/storage/pkg/archive/archive.go b/storage/pkg/archive/archive.go index 79343ba832..492d9bf0e3 100644 --- a/storage/pkg/archive/archive.go +++ b/storage/pkg/archive/archive.go @@ -79,6 +79,11 @@ const ( tarExt = "tar" windows = "windows" darwin = "darwin" + + // CopyBufferSize is the buffer size used for reading file content + // during archive extraction and splitfdstream operations. 1 MiB + // balances syscall overhead against memory usage. + CopyBufferSize = 1 << 20 ) var xattrsToIgnore = map[string]any{ @@ -702,7 +707,7 @@ func (ta *tarWriter) addFile(headers *addFileData) error { return nil } -func extractTarFileEntry(path, extractDir string, hdr *tar.Header, reader io.Reader, Lchown bool, chownOpts *idtools.IDPair, inUserns, ignoreChownErrors bool, forceMask *os.FileMode, buffer []byte) error { +func extractTarFileEntry(path, extractDir string, hdr *tar.Header, writeContent func(*os.File) error, Lchown bool, chownOpts *idtools.IDPair, inUserns, ignoreChownErrors bool, forceMask *os.FileMode) error { // hdr.Mode is in linux format, which we can use for sycalls, // but for os.Foo() calls we need the mode converted to os.FileMode, // so use hdrInfo.Mode() (they differ for e.g. setuid bits) @@ -739,9 +744,11 @@ func extractTarFileEntry(path, extractDir string, hdr *tar.Header, reader io.Rea if err != nil { return err } - if _, err := io.CopyBuffer(file, reader, buffer); err != nil { - file.Close() - return err + if writeContent != nil { + if err := writeContent(file); err != nil { + file.Close() + return err + } } if err := file.Close(); err != nil { return err @@ -1085,17 +1092,67 @@ func TarWithOptions(srcPath string, options *TarOptions) (io.ReadCloser, error) return pipeReader, nil } -// Unpack unpacks the decompressedArchive to dest with options. -func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) error { +// TarEntryIterator abstracts iteration over tar entries. +// Standard implementation wraps tar.Reader; splitfdstream provides +// entries from its chunk-based format with reflink support. +type TarEntryIterator interface { + // Next advances to the next entry and returns its header. + Next() (*tar.Header, error) + // WriteContentTo writes the current entry's file content to dst. + // Only called for TypeReg entries with Size > 0. + WriteContentTo(dst *os.File) error +} + +// tarReaderIterator implements TarEntryIterator for a standard tar.Reader. +type tarReaderIterator struct { + tr *tar.Reader + trBuf *bufio.Reader + buffer []byte +} + +func newTarReaderIterator(decompressedArchive io.Reader) *tarReaderIterator { tr := tar.NewReader(decompressedArchive) trBuf := pools.BufioReader32KPool.Get(nil) - defer pools.BufioReader32KPool.Put(trBuf) + return &tarReaderIterator{ + tr: tr, + trBuf: trBuf, + buffer: make([]byte, CopyBufferSize), + } +} + +func (i *tarReaderIterator) Next() (*tar.Header, error) { + hdr, err := i.tr.Next() + if err != nil { + return nil, err + } + i.trBuf.Reset(i.tr) + return hdr, nil +} + +func (i *tarReaderIterator) WriteContentTo(dst *os.File) error { + _, err := io.CopyBuffer(dst, i.trBuf, i.buffer) + return err +} +func (i *tarReaderIterator) close() { + pools.BufioReader32KPool.Put(i.trBuf) +} + +// Unpack unpacks the decompressedArchive to dest with options. +func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) error { + iter := newTarReaderIterator(decompressedArchive) + defer iter.close() + return UnpackFromIterator(iter, dest, options) +} + +// UnpackFromIterator unpacks tar entries from the given iterator to dest with options. +// This allows plugging in alternative sources of tar entries (e.g., splitfdstream) +// while reusing the full extraction logic including xattrs, whiteouts, device nodes, etc. +func UnpackFromIterator(iter TarEntryIterator, dest string, options *TarOptions) error { var dirs []*tar.Header idMappings := idtools.NewIDMappingsFromMaps(options.UIDMaps, options.GIDMaps) rootIDs := idMappings.RootPair() whiteoutConverter := GetWhiteoutConverter(options.WhiteoutFormat, options.WhiteoutData) - buffer := make([]byte, 1<<20) doChown := !options.NoLchown if options.ForceMask != nil { @@ -1107,7 +1164,7 @@ func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) err // Iterate through the files in the archive. loop: for { - hdr, err := tr.Next() + hdr, err := iter.Next() if err == io.EOF { // end of tar archive break @@ -1181,7 +1238,6 @@ loop: } } } - trBuf.Reset(tr) chownOpts := options.ChownOpts if err := remapIDs(nil, idMappings, chownOpts, hdr); err != nil { @@ -1202,7 +1258,10 @@ loop: chownOpts = &idtools.IDPair{UID: hdr.Uid, GID: hdr.Gid} } - if err = extractTarFileEntry(path, dest, hdr, trBuf, doChown, chownOpts, options.InUserNS, options.IgnoreChownErrors, options.ForceMask, buffer); err != nil { + writeContent := func(dst *os.File) error { + return iter.WriteContentTo(dst) + } + if err = extractTarFileEntry(path, dest, hdr, writeContent, doChown, chownOpts, options.InUserNS, options.IgnoreChownErrors, options.ForceMask); err != nil { return err } diff --git a/storage/pkg/archive/archive_test.go b/storage/pkg/archive/archive_test.go index d17dd64dee..90ea76fc7e 100644 --- a/storage/pkg/archive/archive_test.go +++ b/storage/pkg/archive/archive_test.go @@ -751,8 +751,7 @@ func TestTarWithOptions(t *testing.T) { func TestTypeXGlobalHeaderDoesNotFail(t *testing.T) { hdr := tar.Header{Typeflag: tar.TypeXGlobalHeader} tmpDir := t.TempDir() - buffer := make([]byte, 1<<20) - err := extractTarFileEntry(filepath.Join(tmpDir, "pax_global_header"), tmpDir, &hdr, nil, true, nil, false, false, nil, buffer) + err := extractTarFileEntry(filepath.Join(tmpDir, "pax_global_header"), tmpDir, &hdr, nil, true, nil, false, false, nil) if err != nil { t.Fatal(err) } diff --git a/storage/pkg/archive/diff.go b/storage/pkg/archive/diff.go index 355d65f212..64be4f8fdd 100644 --- a/storage/pkg/archive/diff.go +++ b/storage/pkg/archive/diff.go @@ -104,7 +104,8 @@ func UnpackLayer(dest string, layer io.Reader, options *TarOptions) (size int64, } defer os.RemoveAll(aufsTempdir) } - if err := extractTarFileEntry(filepath.Join(aufsTempdir, basename), dest, hdr, tr, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask, buffer); err != nil { + writeContent := func(dst *os.File) error { _, err := io.CopyBuffer(dst, tr, buffer); return err } + if err := extractTarFileEntry(filepath.Join(aufsTempdir, basename), dest, hdr, writeContent, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask); err != nil { return 0, err } } @@ -209,7 +210,8 @@ func UnpackLayer(dest string, layer io.Reader, options *TarOptions) (size int64, return 0, err } - if err := extractTarFileEntry(path, dest, srcHdr, srcData, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask, buffer); err != nil { + writeContent := func(dst *os.File) error { _, err := io.CopyBuffer(dst, srcData, buffer); return err } + if err := extractTarFileEntry(path, dest, srcHdr, writeContent, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask); err != nil { return 0, err } diff --git a/storage/pkg/archive/splitfdstream.go b/storage/pkg/archive/splitfdstream.go new file mode 100644 index 0000000000..efadfd88f2 --- /dev/null +++ b/storage/pkg/archive/splitfdstream.go @@ -0,0 +1,97 @@ +package archive + +import ( + "archive/tar" + "bytes" + "encoding/binary" + "fmt" + "io" + "os" + + "go.podman.io/storage/pkg/fileutils" +) + +// splitFDStreamIterator implements TarEntryIterator for splitfdstream data. +// It parses the splitfdstream format and provides tar headers. +// For external FD references, WriteContentTo uses ReflinkOrCopy to efficiently +// copy file content via reflinks when possible. +type splitFDStreamIterator struct { + stream io.Reader + fds []*os.File + contentFD *os.File // FD for current entry's content (external reference) + content io.Reader // reader for current entry's inline content +} + +// NewSplitFDStreamIterator creates a TarEntryIterator that reads entries from +// a splitfdstream-formatted stream, using the provided file descriptors for +// external content references. +func NewSplitFDStreamIterator(stream io.Reader, fds []*os.File) TarEntryIterator { + return &splitFDStreamIterator{ + stream: stream, + fds: fds, + } +} + +func (i *splitFDStreamIterator) Next() (*tar.Header, error) { + var prefix int64 + err := binary.Read(i.stream, binary.LittleEndian, &prefix) + if err == io.EOF { + return nil, io.EOF + } + if err != nil { + return nil, fmt.Errorf("failed to read chunk prefix: %w", err) + } + + if prefix >= 0 { + return nil, fmt.Errorf("expected inline chunk for tar header, got external reference %d", prefix) + } + + // Inline chunk: read the serialized tar header + dataLen := int(-prefix) + headerData := make([]byte, dataLen) + if _, err := io.ReadFull(i.stream, headerData); err != nil { + return nil, fmt.Errorf("failed to read inline data: %w", err) + } + + header, err := tar.NewReader(bytes.NewReader(headerData)).Next() + if err != nil { + return nil, fmt.Errorf("failed to parse tar header from inline chunk: %w", err) + } + + // Reset content state + i.contentFD = nil + i.content = nil + + // For regular files with content, read the next chunk to determine source + if header.Typeflag == tar.TypeReg && header.Size > 0 { + if err := binary.Read(i.stream, binary.LittleEndian, &prefix); err != nil { + return nil, fmt.Errorf("failed to read content chunk prefix for %q: %w", header.Name, err) + } + + if prefix < 0 { + // Inline content + contentLen := -prefix + i.content = io.LimitReader(i.stream, contentLen) + } else { + // External content from FD + fdIndex := int(prefix) + if fdIndex >= len(i.fds) { + return nil, fmt.Errorf("fd index %d out of range (have %d fds)", fdIndex, len(i.fds)) + } + i.contentFD = i.fds[fdIndex] + } + } + + return header, nil +} + +func (i *splitFDStreamIterator) WriteContentTo(dst *os.File) error { + if i.contentFD != nil { + return fileutils.ReflinkOrCopy(i.contentFD, dst) + } + if i.content != nil { + _, err := io.Copy(dst, i.content) + return err + } + return nil +} diff --git a/storage/pkg/chrootarchive/init_unix.go b/storage/pkg/chrootarchive/init_unix.go index 1b566c817e..2ad9b78627 100644 --- a/storage/pkg/chrootarchive/init_unix.go +++ b/storage/pkg/chrootarchive/init_unix.go @@ -13,6 +13,7 @@ import ( func init() { reexec.Register("storage-applyLayer", applyLayer) reexec.Register("storage-untar", untar) + reexec.Register("storage-untar-splitfdstream", untarSplitFDStream) reexec.Register("storage-tar", tar) } diff --git a/storage/pkg/chrootarchive/splitfdstream_unix.go b/storage/pkg/chrootarchive/splitfdstream_unix.go new file mode 100644 index 0000000000..d0d02b1e21 --- /dev/null +++ b/storage/pkg/chrootarchive/splitfdstream_unix.go @@ -0,0 +1,218 @@ +//go:build !windows && !darwin + +package chrootarchive + +import ( + "bytes" + "flag" + "fmt" + "io" + "os" + "path/filepath" + "runtime" + "strconv" + "sync" + + "golang.org/x/sys/unix" + + "go.podman.io/storage/pkg/archive" + "go.podman.io/storage/pkg/fileutils" + "go.podman.io/storage/pkg/idtools" + "go.podman.io/storage/pkg/reexec" + "go.podman.io/storage/pkg/unshare" +) + +// fdSocketDescriptor is the file descriptor number for the Unix socket used to +// pass content FDs from the parent to the child process via SCM_RIGHTS. +// FD 3 = tar options, FD 4 = root dir, FD 5 = FD socket. +const fdSocketDescriptor = 5 + +// UnpackSplitFDStream unpacks a splitfdstream into dest within a chroot for security isolation. +// The stream contains splitfdstream-formatted data read from stdin, and fds are the external +// file descriptors referenced by the stream for reflink-based copying. +// +// Content FDs are sent to the child process via SCM_RIGHTS over a Unix socket +// after the child starts, rather than inherited via ExtraFiles at fork time. +// This avoids exceeding the file descriptor limit during the child's dynamic +// linker phase (EMFILE when loading shared libraries). +func UnpackSplitFDStream(stream io.Reader, fds []*os.File, dest string, options *archive.TarOptions) error { + if options == nil { + options = &archive.TarOptions{} + options.InUserNS = unshare.IsRootless() + } + + idMappings := idtools.NewIDMappingsFromMaps(options.UIDMaps, options.GIDMaps) + rootIDs := idMappings.RootPair() + + dest = filepath.Clean(dest) + if err := fileutils.Exists(dest); os.IsNotExist(err) { + if err := idtools.MkdirAllAndChownNew(dest, 0o755, rootIDs); err != nil { + return err + } + } + + destVal, err := newUnpackDestination(dest, dest) + if err != nil { + return err + } + defer destVal.Close() + + // Create pipe for tar options + r, w, err := os.Pipe() + if err != nil { + return fmt.Errorf("splitfdstream pipe failure: %w", err) + } + + // Create a Unix socketpair for passing content FDs to the child process. + socketPair, err := unix.Socketpair(unix.AF_UNIX, unix.SOCK_STREAM|unix.SOCK_CLOEXEC, 0) + if err != nil { + r.Close() + w.Close() + return fmt.Errorf("splitfdstream socketpair failure: %w", err) + } + parentSocketFD := socketPair[0] + childSocket := os.NewFile(uintptr(socketPair[1]), "splitfdstream-child-socket") + + numFDs := strconv.Itoa(len(fds)) + cmd := reexec.Command("storage-untar-splitfdstream", destVal.dest, procPathForFd(rootFileDescriptor), numFDs) + cmd.Stdin = stream + + cmd.ExtraFiles = append(cmd.ExtraFiles, r) // fd 3: tar options + cmd.ExtraFiles = append(cmd.ExtraFiles, destVal.root) // fd 4: root dir + cmd.ExtraFiles = append(cmd.ExtraFiles, childSocket) // fd 5: FD socket + + output := bytes.NewBuffer(nil) + cmd.Stdout = output + cmd.Stderr = output + + if err := cmd.Start(); err != nil { + w.Close() + unix.Close(parentSocketFD) + childSocket.Close() + return fmt.Errorf("splitfdstream untar error on re-exec cmd: %w", err) + } + + // Parent no longer needs the child's socket end or the read end of the pipe + childSocket.Close() + r.Close() + + // Send content FDs to the child via SCM_RIGHTS on the Unix socket. + var sendErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + defer unix.Close(parentSocketFD) + for _, fd := range fds { + rights := unix.UnixRights(int(fd.Fd())) + if err := unix.Sendmsg(parentSocketFD, []byte{0}, rights, nil, 0); err != nil { + sendErr = fmt.Errorf("failed to send FD via SCM_RIGHTS: %w", err) + return + } + } + }() + + if err := json.NewEncoder(w).Encode(options); err != nil { + w.Close() + return fmt.Errorf("splitfdstream untar json encode to pipe failed: %w", err) + } + w.Close() + + if err := cmd.Wait(); err != nil { + wg.Wait() + return fmt.Errorf("splitfdstream unpacking failed (error: %w; output: %s)", err, output) + } + wg.Wait() + if sendErr != nil { + return sendErr + } + return nil +} + +// untarSplitFDStream is the reexec handler for "storage-untar-splitfdstream". +// It chroots into the destination and unpacks the splitfdstream from stdin. +func untarSplitFDStream() { + runtime.LockOSThread() + flag.Parse() + + var options archive.TarOptions + + // Read the options from the pipe (FD 3) + if err := json.NewDecoder(os.NewFile(tarOptionsDescriptor, "options")).Decode(&options); err != nil { + fatal(err) + } + + dest := flag.Arg(0) + root := flag.Arg(1) + numFDs, err := strconv.Atoi(flag.Arg(2)) + if err != nil { + fatal(fmt.Errorf("invalid numFDs argument %q: %w", flag.Arg(2), err)) + } + + // Handle root directory FD for chroot (same pattern as untar) + if root == procPathForFd(rootFileDescriptor) { + rootFd := os.NewFile(rootFileDescriptor, "tar-root") + defer rootFd.Close() + if err := unix.Fchdir(int(rootFd.Fd())); err != nil { + fatal(err) + } + root = "." + } else if root == "" { + root = dest + } + + if err := chroot(root); err != nil { + fatal(err) + } + + // Raise the file descriptor soft limit to the hard limit to + // accommodate the content FDs that will be received from the parent. + if numFDs > 0 { + var rLimit unix.Rlimit + if err := unix.Getrlimit(unix.RLIMIT_NOFILE, &rLimit); err == nil { + rLimit.Cur = rLimit.Max + _ = unix.Setrlimit(unix.RLIMIT_NOFILE, &rLimit) + } + } + + // Receive content FDs from the parent via SCM_RIGHTS on the Unix socket (FD 5). + fds := make([]*os.File, 0, numFDs) + if numFDs > 0 { + buf := make([]byte, 1) + oob := make([]byte, unix.CmsgSpace(4)) + for i := range numFDs { + _, oobn, _, _, err := unix.Recvmsg(fdSocketDescriptor, buf, oob, 0) + if err != nil { + fatal(fmt.Errorf("receiving content FD %d: %w", i, err)) + } + scms, err := unix.ParseSocketControlMessage(oob[:oobn]) + if err != nil { + fatal(fmt.Errorf("parsing socket control message for FD %d: %w", i, err)) + } + if len(scms) == 0 { + fatal(fmt.Errorf("no control message received for FD %d", i)) + } + receivedFDs, err := unix.ParseUnixRights(&scms[0]) + if err != nil { + fatal(fmt.Errorf("parsing unix rights for FD %d: %w", i, err)) + } + fds = append(fds, os.NewFile(uintptr(receivedFDs[0]), fmt.Sprintf("content-fd-%d", i))) + } + } + unix.Close(fdSocketDescriptor) + + iter := archive.NewSplitFDStreamIterator(os.Stdin, fds) + if err := archive.UnpackFromIterator(iter, dest, &options); err != nil { + fatal(err) + } + // fully consume stdin in case it is zero padded + if _, err := flush(os.Stdin); err != nil { + fatal(err) + } + + for _, f := range fds { + f.Close() + } + + os.Exit(0) +} diff --git a/storage/pkg/chrootarchive/splitfdstream_unsupported.go b/storage/pkg/chrootarchive/splitfdstream_unsupported.go new file mode 100644 index 0000000000..55f922de67 --- /dev/null +++ b/storage/pkg/chrootarchive/splitfdstream_unsupported.go @@ -0,0 +1,17 @@ +//go:build windows || darwin + +package chrootarchive + +import ( + "fmt" + "io" + "os" + "runtime" + + "go.podman.io/storage/pkg/archive" +) + +// UnpackSplitFDStream is not supported on this platform. +func UnpackSplitFDStream(stream io.Reader, fds []*os.File, dest string, options *archive.TarOptions) error { + return fmt.Errorf("UnpackSplitFDStream is not supported on %s", runtime.GOOS) +} From be78f57c5c6f7c5a38f0678c58efe8598a63de8b Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Thu, 19 Feb 2026 11:48:33 +0100 Subject: [PATCH 08/12] storage, overlay: use openat2 instead of using procfs Signed-off-by: Giuseppe Scrivano --- storage/drivers/overlay/overlay.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/storage/drivers/overlay/overlay.go b/storage/drivers/overlay/overlay.go index 00974c890c..9e0d614d33 100644 --- a/storage/drivers/overlay/overlay.go +++ b/storage/drivers/overlay/overlay.go @@ -2140,17 +2140,24 @@ func (g *overlayFileGetter) Get(path string) (io.ReadCloser, error) { buf := make([]byte, unix.PathMax) for _, d := range g.diffDirs { if f, found := g.composefsMounts[d]; found { - // there is no *at equivalent for getxattr, but it can be emulated by opening the file under /proc/self/fd/$FD/$PATH - len, err := unix.Getxattr(fmt.Sprintf("/proc/self/fd/%d/%s", int(f.Fd()), path), "trusted.overlay.redirect", buf) + cfd, err := unix.Openat2(int(f.Fd()), path, &unix.OpenHow{ + Flags: unix.O_RDONLY | unix.O_CLOEXEC | unix.O_PATH, + Resolve: unix.RESOLVE_NO_SYMLINKS | unix.RESOLVE_BENEATH, + }) + if err != nil { + continue + } + n, err := unix.Fgetxattr(cfd, "trusted.overlay.redirect", buf) + unix.Close(cfd) if err != nil { if errors.Is(err, unix.ENODATA) { continue } - return nil, &fs.PathError{Op: "getxattr", Path: path, Err: err} + return nil, &fs.PathError{Op: "fgetxattr", Path: path, Err: err} } // the xattr value is the path to the file in the composefs layer diff directory - return os.Open(filepath.Join(d, string(buf[:len]))) + return os.Open(filepath.Join(d, string(buf[:n]))) } f, err := os.Open(filepath.Join(d, path)) From aec5e5d68b47f2436fe40d59efc4c3ff029691e7 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Thu, 19 Feb 2026 15:46:34 +0100 Subject: [PATCH 09/12] storage, overlay: factor function out Signed-off-by: Giuseppe Scrivano --- storage/drivers/overlay/composefs.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/storage/drivers/overlay/composefs.go b/storage/drivers/overlay/composefs.go index 713aeed3cb..fe88705bc3 100644 --- a/storage/drivers/overlay/composefs.go +++ b/storage/drivers/overlay/composefs.go @@ -3,10 +3,12 @@ package overlay import ( + "archive/tar" "bytes" "encoding/binary" "errors" "fmt" + "io" "os" "os/exec" "path/filepath" @@ -43,14 +45,25 @@ func getComposefsBlob(dataDir string) string { } func generateComposeFsBlob(verityDigests map[string]string, toc any, composefsDir string) error { - if err := os.MkdirAll(composefsDir, 0o700); err != nil { + dumpReader, err := dump.GenerateDump(toc, verityDigests) + if err != nil { return err } + return writeComposeFsBlob(dumpReader, composefsDir) +} - dumpReader, err := dump.GenerateDump(toc, verityDigests) +func generateComposeFsBlobFromHeaders(headers []*tar.Header, contentDigests, verityDigests map[string]string, composefsDir string) error { + dumpReader, err := dump.GenerateDumpFromTarHeaders(headers, contentDigests, verityDigests) if err != nil { return err } + return writeComposeFsBlob(dumpReader, composefsDir) +} + +func writeComposeFsBlob(dumpReader io.Reader, composefsDir string) error { + if err := os.MkdirAll(composefsDir, 0o700); err != nil { + return err + } destFile := getComposefsBlob(composefsDir) writerJSON, err := getComposeFsHelper() From cab06946604c5e7f532ac136ee0757a8d62c3e35 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 9 Feb 2026 09:20:59 +0100 Subject: [PATCH 10/12] storage/overlay: implement SplitFDStreamDriver Implement the SplitFDStreamDriver interface for the overlay driver, enabling efficient layer operations with reflink support. Signed-off-by: Giuseppe Scrivano --- storage/drivers/overlay/composefs.go | 1 + storage/drivers/overlay/overlay.go | 5 +- .../drivers/overlay/overlay_splitfdstream.go | 508 ++++++++++++++++++ storage/drivers/overlay/splitfdstream_test.go | 58 ++ 4 files changed, 571 insertions(+), 1 deletion(-) create mode 100644 storage/drivers/overlay/overlay_splitfdstream.go create mode 100644 storage/drivers/overlay/splitfdstream_test.go diff --git a/storage/drivers/overlay/composefs.go b/storage/drivers/overlay/composefs.go index fe88705bc3..aad4773314 100644 --- a/storage/drivers/overlay/composefs.go +++ b/storage/drivers/overlay/composefs.go @@ -81,6 +81,7 @@ func writeComposeFsBlob(dumpReader io.Reader, composefsDir string) error { outFile.Close() return fmt.Errorf("failed to reopen %s as read-only: %w", destFile, err) } + defer roFile.Close() err = func() error { // a scope to close outFile before setting fsverity on the read-only fd. diff --git a/storage/drivers/overlay/overlay.go b/storage/drivers/overlay/overlay.go index 9e0d614d33..2481206d69 100644 --- a/storage/drivers/overlay/overlay.go +++ b/storage/drivers/overlay/overlay.go @@ -2145,7 +2145,10 @@ func (g *overlayFileGetter) Get(path string) (io.ReadCloser, error) { Resolve: unix.RESOLVE_NO_SYMLINKS | unix.RESOLVE_BENEATH, }) if err != nil { - continue + if errors.Is(err, unix.ENOENT) || errors.Is(err, unix.ELOOP) { + continue + } + return nil, &fs.PathError{Op: "openat2", Path: path, Err: err} } n, err := unix.Fgetxattr(cfd, "trusted.overlay.redirect", buf) unix.Close(cfd) diff --git a/storage/drivers/overlay/overlay_splitfdstream.go b/storage/drivers/overlay/overlay_splitfdstream.go new file mode 100644 index 0000000000..b4b254e915 --- /dev/null +++ b/storage/drivers/overlay/overlay_splitfdstream.go @@ -0,0 +1,508 @@ +//go:build linux + +package overlay + +import ( + "archive/tar" + "bytes" + "errors" + "fmt" + "hash" + "io" + "os" + "path/filepath" + "strings" + + "github.com/opencontainers/go-digest" + "github.com/sirupsen/logrus" + "go.podman.io/storage/pkg/archive" + "go.podman.io/storage/pkg/chrootarchive" + "go.podman.io/storage/pkg/directory" + "go.podman.io/storage/pkg/fileutils" + "go.podman.io/storage/pkg/fsverity" + "go.podman.io/storage/pkg/idtools" + "go.podman.io/storage/pkg/splitfdstream" + "go.podman.io/storage/pkg/unshare" + "golang.org/x/sys/unix" +) + +// untarSplitFDStream defines the splitfdstream untar method (through chrootarchive for security isolation) +var untarSplitFDStream = chrootarchive.UnpackSplitFDStream + +// ApplySplitFDStream applies changes from a split FD stream to the specified layer. +// It iterates over the splitfdstream entries and extracts them using +// archive.UnpackFromIterator, which enables reflink-based copying for +// external file descriptor references. +func (d *Driver) ApplySplitFDStream(options *splitfdstream.ApplySplitFDStreamOpts) (int64, error) { + if options == nil { + return 0, fmt.Errorf("options cannot be nil") + } + if err := options.Validate(); err != nil { + return 0, fmt.Errorf("invalid options: %w", err) + } + + var diffPath string + + if options.StagingDir != "" { + diffPath = options.StagingDir + logrus.Debugf("overlay: ApplySplitFDStream applying to staging dir %s", diffPath) + } else { + dir := d.dir(options.LayerID) + if err := fileutils.Exists(dir); err != nil { + return 0, fmt.Errorf("layer %s does not exist: %w", options.LayerID, err) + } + + var err error + diffPath, err = d.getDiffPath(options.LayerID) + if err != nil { + return 0, fmt.Errorf("failed to get diff path for layer %s: %w", options.LayerID, err) + } + + logrus.Debugf("overlay: ApplySplitFDStream applying to layer %s at %s", options.LayerID, diffPath) + } + + // For composefs layers, process the splitfdstream iterator directly: + // build the TOC from tar headers and write regular files to flat + // content-addressable paths, preserving reflinks from FD references. + if d.usingComposefs && options.LayerID != "" { + iter := archive.NewSplitFDStreamIterator(options.Stream, options.FileDescriptors) + headers, contentDigests, verityDigests, err := extractToFlatLayout(iter, diffPath) + if err != nil { + return 0, fmt.Errorf("failed to extract to flat layout: %w", err) + } + if err := generateComposeFsBlobFromHeaders(headers, contentDigests, verityDigests, d.getComposefsData(options.LayerID)); err != nil { + return 0, fmt.Errorf("failed to generate composefs blob: %w", err) + } + return directory.Size(diffPath) + } + + idMappings := options.IDMappings + if idMappings == nil { + idMappings = &idtools.IDMappings{} + } + + if err := untarSplitFDStream(options.Stream, options.FileDescriptors, diffPath, &archive.TarOptions{ + UIDMaps: idMappings.UIDs(), + GIDMaps: idMappings.GIDs(), + IgnoreChownErrors: options.IgnoreChownErrors || d.options.ignoreChownErrors, + WhiteoutFormat: d.getWhiteoutFormat(), + ForceMask: options.ForceMask, + InUserNS: unshare.IsRootless(), + }); err != nil { + return 0, fmt.Errorf("failed to apply split FD stream: %w", err) + } + + return directory.Size(diffPath) +} + +// GetSplitFDStream generates a split FD stream from the layer differences. +// The returned ReadCloser contains the splitfdstream-formatted data, and the +// []*os.File slice contains the external file descriptors referenced by the stream. +// Regular files are passed as external file descriptors for reflink-based copying. +// The caller is responsible for closing both the ReadCloser and all file descriptors. +func (d *Driver) GetSplitFDStream(id, parent string, options *splitfdstream.GetSplitFDStreamOpts) (io.ReadCloser, []*os.File, error) { + if options == nil { + return nil, nil, fmt.Errorf("options cannot be nil") + } + + dir := d.dir(id) + if err := fileutils.Exists(dir); err != nil { + return nil, nil, fmt.Errorf("layer %s does not exist: %w", id, err) + } + + // Check if this is a composefs layer and mount the EROFS blob if so. + // The mount FD is used to resolve file paths to their flat storage paths + // via the trusted.overlay.redirect xattr. + composefsData := d.getComposefsData(id) + composefsMountFd := -1 + if err := fileutils.Exists(composefsData); err == nil { + fd, err := openComposefsMount(composefsData) + if err != nil { + return nil, nil, fmt.Errorf("failed to mount composefs for layer %s: %w", id, err) + } + composefsMountFd = fd + defer unix.Close(composefsMountFd) + } else if !errors.Is(err, unix.ENOENT) { + return nil, nil, err + } + + logrus.Debugf("overlay: GetSplitFDStream for layer %s with parent %s", id, parent) + + // Set up ID mappings + idMappings := options.IDMappings + if idMappings == nil { + idMappings = &idtools.IDMappings{} + } + + // Get the diff path for file access (used for FD references) + diffPath, err := d.getDiffPath(id) + if err != nil { + return nil, nil, fmt.Errorf("failed to get diff path for layer %s: %w", id, err) + } + + // Use Diff() to generate the tar stream - it handles naiveDiff + // and all the edge cases correctly. + tarStream, err := d.Diff(id, idMappings, parent, nil, options.MountLabel) + if err != nil { + return nil, nil, fmt.Errorf("failed to generate diff for layer %s: %w", id, err) + } + defer tarStream.Close() + + // Buffer the splitfdstream data in memory + var buf bytes.Buffer + var fds []*os.File + writer := splitfdstream.NewWriter(&buf) + + // Convert tar stream to splitfdstream + err = convertTarToSplitFDStream(tarStream, writer, diffPath, composefsMountFd, &fds) + if err != nil { + // Close any opened FDs on error + for _, f := range fds { + f.Close() + } + return nil, nil, fmt.Errorf("failed to convert tar to splitfdstream: %w", err) + } + + logrus.Debugf("overlay: GetSplitFDStream complete for layer %s: streamSize=%d, numFDs=%d", id, buf.Len(), len(fds)) + return io.NopCloser(bytes.NewReader(buf.Bytes())), fds, nil +} + +// convertTarToSplitFDStream converts a tar stream to a splitfdstream by parsing +// tar headers and replacing file content with file descriptor references. +func convertTarToSplitFDStream(tarStream io.ReadCloser, writer *splitfdstream.SplitFDStreamWriter, diffPath string, composefsMountFd int, fds *[]*os.File) error { + tr := tar.NewReader(tarStream) + + // Open diff directory for safe file access + diffDirFd, err := unix.Open(diffPath, unix.O_RDONLY|unix.O_DIRECTORY|unix.O_CLOEXEC, 0) + if err != nil { + return fmt.Errorf("failed to open diff directory %s: %w", diffPath, err) + } + defer unix.Close(diffDirFd) + + // Reusable buffer for inline content, lazily allocated + var buf []byte + + for { + header, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to read tar header: %w", err) + } + + // Write the tar header as inline data + var headerBuf bytes.Buffer + tw := tar.NewWriter(&headerBuf) + if err := tw.WriteHeader(header); err != nil { + return fmt.Errorf("failed to serialize tar header for %s: %w", header.Name, err) + } + if err := writer.WriteInline(headerBuf.Bytes()); err != nil { + return fmt.Errorf("failed to write tar header for %s: %w", header.Name, err) + } + + // Handle file content + if header.Typeflag == tar.TypeReg && header.Size > 0 { + // Try to open file and write FD reference + ok, err := tryWriteFileAsFDReference(writer, diffDirFd, composefsMountFd, header, fds) + if err != nil { + return fmt.Errorf("failed to write FD reference for %s: %w", header.Name, err) + } + if ok { + // Skip the content in the tar stream since we're using FD reference + if _, err := io.CopyN(io.Discard, tr, header.Size); err != nil { + return fmt.Errorf("failed to skip content for %s: %w", header.Name, err) + } + } else { + if buf == nil { + buf = make([]byte, archive.CopyBufferSize) + } + // File not found in diff directory (e.g., naiveDiff was used), + // write content inline from the tar stream. + // Write a single prefix for the total size, then stream + // data in chunks. The reader expects exactly one prefix + // per file entry. + if err := writer.WriteInlinePrefix(header.Size); err != nil { + return fmt.Errorf("failed to write inline prefix for %s: %w", header.Name, err) + } + remaining := header.Size + for remaining > 0 { + toRead := int64(len(buf)) + if toRead > remaining { + toRead = remaining + } + n, err := io.ReadFull(tr, buf[:toRead]) + if err != nil { + return fmt.Errorf("failed to read content for %s: %w", header.Name, err) + } + if err := writer.WriteRaw(buf[:n]); err != nil { + return fmt.Errorf("failed to write inline content for %s: %w", header.Name, err) + } + remaining -= int64(n) + } + } + } + } + + return nil +} + +// tryWriteFileAsFDReference tries to open a file and write an FD reference to the splitfdstream. +// Returns (true, nil) if the file was successfully written as FD reference. +// Returns (false, nil) if the file doesn't exist in the diff directory (caller should write inline). +// Returns (_, error) on other errors. +// +// When composefsMountFd >= 0, the diff directory uses a flat layout (files stored by digest). +// The file path is resolved by reading the trusted.overlay.redirect xattr from the composefs mount. +func tryWriteFileAsFDReference(writer *splitfdstream.SplitFDStreamWriter, diffDirFd int, composefsMountFd int, header *tar.Header, fds *[]*os.File) (bool, error) { + // Clean the file name to prevent path traversal + cleanName := filepath.Clean(header.Name) + if strings.Contains(cleanName, "..") { + return false, fmt.Errorf("invalid file path: %s", header.Name) + } + + var fd int + var openErr error + + if composefsMountFd >= 0 { + // Composefs: open the file in the composefs mount to read the redirect xattr, + // which gives the flat storage path in the diff directory. + cfd, err := unix.Openat2(composefsMountFd, cleanName, &unix.OpenHow{ + Flags: unix.O_RDONLY | unix.O_CLOEXEC | unix.O_PATH, + Resolve: unix.RESOLVE_NO_SYMLINKS | unix.RESOLVE_BENEATH, + }) + if err != nil { + if errors.Is(err, unix.ENOENT) || errors.Is(err, unix.ELOOP) { + return false, nil + } + return false, fmt.Errorf("failed to open %s in composefs mount: %w", cleanName, err) + } + buf := make([]byte, unix.PathMax) + n, err := unix.Fgetxattr(cfd, "trusted.overlay.redirect", buf) + unix.Close(cfd) + if err != nil { + if errors.Is(err, unix.ENODATA) { + return false, nil + } + return false, fmt.Errorf("failed to get redirect xattr for %s: %w", cleanName, err) + } + + flatPath := string(buf[:n]) + if strings.Contains(flatPath, "..") || filepath.IsAbs(flatPath) { + return false, fmt.Errorf("invalid redirect xattr value for %s: %s", cleanName, flatPath) + } + + fd, openErr = unix.Openat2(diffDirFd, flatPath, &unix.OpenHow{ + Flags: unix.O_RDONLY | unix.O_CLOEXEC, + Resolve: unix.RESOLVE_NO_SYMLINKS | unix.RESOLVE_BENEATH, + }) + } else { + // Non-composefs: open directly by name under the diff directory + fd, openErr = unix.Openat2(diffDirFd, cleanName, &unix.OpenHow{ + Flags: unix.O_RDONLY | unix.O_CLOEXEC, + Resolve: unix.RESOLVE_NO_SYMLINKS | unix.RESOLVE_BENEATH, + }) + } + + if openErr != nil { + if errors.Is(openErr, unix.ENOENT) || errors.Is(openErr, unix.ELOOP) { + return false, nil + } + return false, fmt.Errorf("failed to open %s: %w", cleanName, openErr) + } + + // Verify it's still a regular file + var fdStat unix.Stat_t + if err := unix.Fstat(fd, &fdStat); err != nil { + unix.Close(fd) + return false, fmt.Errorf("failed to fstat opened file %s: %w", cleanName, err) + } + if fdStat.Mode&unix.S_IFMT != unix.S_IFREG { + unix.Close(fd) + return false, fmt.Errorf("file %s is not a regular file", cleanName) + } + + // Create os.File from fd + f := os.NewFile(uintptr(fd), cleanName) + if f == nil { + unix.Close(fd) + return false, fmt.Errorf("failed to create File from fd for %s", cleanName) + } + + fdIndex := len(*fds) + + // Write FD reference before appending to the slice so that on + // error the caller's cleanup loop does not see a stale entry. + if err := writer.WriteExternal(fdIndex); err != nil { + f.Close() + return false, fmt.Errorf("failed to write external FD reference: %w", err) + } + + *fds = append(*fds, f) + + return true, nil +} + +// extractToFlatLayout iterates a TarEntryIterator and writes regular file +// content to flat content-addressable paths under flatDir (preserving reflinks +// from FD references via WriteContentTo). It returns the collected tar headers, +// content digests, and verity digests for composefs blob generation. +func extractToFlatLayout(iter archive.TarEntryIterator, flatDir string) ([]*tar.Header, map[string]string, map[string]string, error) { + var headers []*tar.Header + contentDigests := make(map[string]string) + verityDigests := make(map[string]string) + createdDirs := make(map[string]struct{}) + + for { + hdr, err := iter.Next() + if err == io.EOF { + break + } + if err != nil { + return nil, nil, nil, err + } + + headers = append(headers, hdr) + + if hdr.Typeflag == tar.TypeReg && hdr.Size > 0 { + dgst, flatPath, verity, err := writeContentToFlatPath(flatDir, func(dst *os.File) error { + return iter.WriteContentTo(dst) + }, createdDirs) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to write content for %s: %w", hdr.Name, err) + } + contentDigests[hdr.Name] = dgst.String() + if verity != "" { + verityDigests[flatPath] = verity + } + } + } + + return headers, contentDigests, verityDigests, nil +} + +// writeContentToFlatPath creates a temporary file, writes content via callback while +// computing digest, and atomically places it at the content-addressable path. +// This follows the same pattern as destinationFile in the chunked package. +func writeContentToFlatPath(flatDir string, writeContent func(dst *os.File) error, createdDirs map[string]struct{}) (digest.Digest, string, string, error) { + flatFile, err := openFlatContentFile(flatDir) + if err != nil { + return "", "", "", err + } + defer func() { + if flatFile.file != nil { + flatFile.file.Close() + } + }() + + // Write content to file directly (for reflink support), then compute digest + if err := writeContent(flatFile.file); err != nil { + return "", "", "", err + } + + // Now compute digest by reading back (following destinationFile validation pattern) + if _, err := flatFile.file.Seek(0, io.SeekStart); err != nil { + return "", "", "", err + } + if _, err := io.Copy(flatFile.hash, flatFile.file); err != nil { + return "", "", "", err + } + + // Get digest and place file atomically + dgst := flatFile.digester.Digest() + flatPath, err := regularFilePathForValidatedDigest(dgst) + if err != nil { + return "", "", "", err + } + + // Create directory structure if needed + flatSubDir := filepath.Dir(flatPath) + if _, exists := createdDirs[flatSubDir]; !exists { + if err := os.MkdirAll(filepath.Join(flatDir, flatSubDir), 0o755); err != nil { + return "", "", "", err + } + createdDirs[flatSubDir] = struct{}{} + } + + // Atomically link to final path + destPath := filepath.Join(flatDir, flatPath) + procPath := fmt.Sprintf("/proc/self/fd/%d", flatFile.file.Fd()) + if err := unix.Linkat(unix.AT_FDCWD, procPath, unix.AT_FDCWD, destPath, unix.AT_SYMLINK_FOLLOW); err != nil { + if !errors.Is(err, unix.EEXIST) { + return "", "", "", fmt.Errorf("failed to link to %s: %w", destPath, err) + } + } + + // Enable fs-verity if supported (same pattern as destinationFile) + verity := enableFlatFileVerity(destPath, flatPath) + + return dgst, flatPath, verity, nil +} + +// flatContentFile follows the same validation pattern as destinationFile but for content-addressable storage +type flatContentFile struct { + file *os.File + digester digest.Digester + hash hash.Hash +} + +// openFlatContentFile creates a flatContentFile using the same pattern as openDestinationFile +func openFlatContentFile(flatDir string) (*flatContentFile, error) { + tmpFile, err := openLinkableTmpFile(flatDir) + if err != nil { + return nil, fmt.Errorf("failed to create temp file: %w", err) + } + + // Follow same pattern as destinationFile for validation + digester := digest.Canonical.Digester() + hash := digester.Hash() + + return &flatContentFile{ + file: tmpFile, + digester: digester, + hash: hash, + }, nil +} + +// enableFlatFileVerity enables fs-verity on a flat file and returns the verity digest +func enableFlatFileVerity(destPath, flatPath string) string { + roFile, err := os.Open(destPath) + if err != nil { + return "" + } + defer roFile.Close() + + if err := fsverity.EnableVerity(flatPath, int(roFile.Fd())); err != nil { + return "" + } + verity, err := fsverity.MeasureVerity(flatPath, int(roFile.Fd())) + if err != nil { + return "" + } + return verity +} + +// openLinkableTmpFile creates a temporary file that can be linked to a final path +// via /proc/self/fd/N. Uses O_TMPFILE when supported, falls back to CreateTemp + unlink. +func openLinkableTmpFile(dir string) (*os.File, error) { + file, err := os.OpenFile(dir, unix.O_TMPFILE|unix.O_RDWR|unix.O_CLOEXEC, 0o644) + if err == nil { + return file, nil + } + // Fallback: create and immediately unlink + file, err = os.CreateTemp(dir, ".flatfile-*") + if err != nil { + return nil, err + } + _ = os.Remove(file.Name()) + return file, nil +} + +// regularFilePathForValidatedDigest returns the path used in the composefs backing store +func regularFilePathForValidatedDigest(d digest.Digest) (string, error) { + if algo := d.Algorithm(); algo != digest.SHA256 { + return "", fmt.Errorf("unexpected digest algorithm %q", algo) + } + e := d.Encoded() + return e[0:2] + "/" + e[2:], nil +} diff --git a/storage/drivers/overlay/splitfdstream_test.go b/storage/drivers/overlay/splitfdstream_test.go new file mode 100644 index 0000000000..ea2fd2b2b8 --- /dev/null +++ b/storage/drivers/overlay/splitfdstream_test.go @@ -0,0 +1,58 @@ +//go:build linux + +package overlay + +import ( + "testing" + + "go.podman.io/storage/pkg/splitfdstream" +) + +func TestApplySplitFDStreamStub(t *testing.T) { + driver := &Driver{ + home: t.TempDir(), + } + + // Test with nil options + _, err := driver.ApplySplitFDStream(nil) + if err == nil { + t.Error("Expected error with nil options") + } + + // Test with valid options but non-existent layer + opts := &splitfdstream.ApplySplitFDStreamOpts{LayerID: "non-existent-layer"} + _, err = driver.ApplySplitFDStream(opts) + if err == nil { + t.Error("Expected error for non-existent layer") + } +} + +func TestGetSplitFDStreamStub(t *testing.T) { + driver := &Driver{ + home: t.TempDir(), + } + + // Test with nil options + _, _, err := driver.GetSplitFDStream("test-layer", "parent-layer", nil) + if err == nil { + t.Error("Expected error with nil options") + } + + // Test with valid options but non-existent layer + opts := &splitfdstream.GetSplitFDStreamOpts{} + _, _, err = driver.GetSplitFDStream("non-existent-layer", "parent-layer", opts) + if err == nil { + t.Error("Expected error for non-existent layer") + } +} + +// TestOverlayImplementsSplitFDStreamDriver verifies that the overlay driver +// implements the SplitFDStreamDriver interface via type assertion. +func TestOverlayImplementsSplitFDStreamDriver(t *testing.T) { + driver := &Driver{} + + // Verify the driver implements SplitFDStreamDriver + if _, ok := interface{}(driver).(splitfdstream.SplitFDStreamDriver); !ok { + t.Error("Expected overlay driver to implement SplitFDStreamDriver interface") + } +} From 040b2e22be6dac4a0272b4d644de4697f72b31c6 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 9 Feb 2026 10:52:54 +0100 Subject: [PATCH 11/12] storage/cmd: new commands json-rpc-server and apply-splitfdstream Signed-off-by: Giuseppe Scrivano --- .../cmd/containers-storage/splitfdstream.go | 206 ++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 storage/cmd/containers-storage/splitfdstream.go diff --git a/storage/cmd/containers-storage/splitfdstream.go b/storage/cmd/containers-storage/splitfdstream.go new file mode 100644 index 0000000000..bc81fd5572 --- /dev/null +++ b/storage/cmd/containers-storage/splitfdstream.go @@ -0,0 +1,206 @@ +//go:build linux + +package main + +import ( + "bytes" + "fmt" + "os" + "os/signal" + "path/filepath" + "syscall" + + "go.podman.io/storage" + graphdriver "go.podman.io/storage/drivers" + "go.podman.io/storage/pkg/archive" + "go.podman.io/storage/pkg/mflag" + "go.podman.io/storage/pkg/splitfdstream" +) + +const defaultJSONRPCSocket = "json-rpc.sock" + +var ( + splitfdstreamSocket = "" + applyFdstreamSocket = "" + applyFdstreamParent = "" + applyFdstreamMountLabel = "" +) + +// splitFDStreamDiffer implements graphdriver.Differ for splitfdstream data +type splitFDStreamDiffer struct { + streamData []byte + fds []*os.File + store storage.Store +} + +func (d *splitFDStreamDiffer) ApplyDiff(dest string, options *archive.TarOptions, differOpts *graphdriver.DifferOptions) (graphdriver.DriverWithDifferOutput, error) { + driver, err := d.store.GraphDriver() + if err != nil { + return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("failed to get graph driver: %w", err) + } + + splitDriver, ok := driver.(splitfdstream.SplitFDStreamDriver) + if !ok { + return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("driver %s does not support splitfdstream", driver.String()) + } + + opts := &splitfdstream.ApplySplitFDStreamOpts{ + Stream: bytes.NewReader(d.streamData), + FileDescriptors: d.fds, + StagingDir: dest, + } + + size, err := splitDriver.ApplySplitFDStream(opts) + if err != nil { + return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("failed to apply splitfdstream to staging dir %s: %w", dest, err) + } + + return graphdriver.DriverWithDifferOutput{ + Target: dest, + Size: size, + }, nil +} + +func (d *splitFDStreamDiffer) Close() error { + return nil +} + +func splitfdstreamServer(flags *mflag.FlagSet, action string, m storage.Store, args []string) (int, error) { + driver, err := m.GraphDriver() + if err != nil { + return 1, fmt.Errorf("failed to get graph driver: %w", err) + } + + splitDriver, ok := driver.(splitfdstream.SplitFDStreamDriver) + if !ok { + return 1, fmt.Errorf("driver %s does not support splitfdstream", driver.String()) + } + server := splitfdstream.NewJSONRPCServer(splitDriver, m) + + socketPath := splitfdstreamSocket + if socketPath == "" { + socketPath = filepath.Join(m.RunRoot(), defaultJSONRPCSocket) + } + + if err := server.Start(socketPath); err != nil { + return 1, fmt.Errorf("failed to start server: %w", err) + } + defer func() { _ = server.Stop() }() + + fmt.Printf("%s\n", socketPath) + + // Wait for interrupt signal + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + + return 0, nil +} + +func applySplitfdstream(flags *mflag.FlagSet, action string, m storage.Store, args []string) (int, error) { + layerID := args[0] + + socketPath := applyFdstreamSocket + if socketPath == "" { + socketPath = filepath.Join(m.RunRoot(), defaultJSONRPCSocket) + } + + defer func() { + if _, err := m.Shutdown(false); err != nil { + fmt.Fprintf(os.Stderr, "warning: failed to shutdown storage: %v\n", err) + } + }() + + client, err := splitfdstream.NewJSONRPCClient(socketPath) + if err != nil { + return 1, fmt.Errorf("failed to connect to server: %w", err) + } + defer client.Close() + + // Get splitfdstream data from remote server + streamData, fds, err := client.GetSplitFDStream(layerID, "") + if err != nil { + return 1, fmt.Errorf("failed to get splitfdstream from server: %w", err) + } + + // Close received FDs when done + defer func() { + for _, fd := range fds { + fd.Close() + } + }() + + // Create a custom differ for splitfdstream data + differ := &splitFDStreamDiffer{ + streamData: streamData, + fds: fds, + store: m, + } + defer differ.Close() + + // Prepare the staged layer + diffOptions := &graphdriver.ApplyDiffWithDifferOpts{} + diffOutput, err := m.PrepareStagedLayer(diffOptions, differ) + if err != nil { + return 1, fmt.Errorf("failed to prepare staged layer: %w", err) + } + + // Apply the staged layer to create the final layer + applyArgs := storage.ApplyStagedLayerOptions{ + ID: layerID, + ParentLayer: applyFdstreamParent, + MountLabel: applyFdstreamMountLabel, + Writeable: false, + LayerOptions: &storage.LayerOptions{}, + DiffOutput: diffOutput, + DiffOptions: diffOptions, + } + + layer, err := m.ApplyStagedLayer(applyArgs) + if err != nil { + // Clean up the staged layer on failure + if cleanupErr := m.CleanupStagedLayer(diffOutput); cleanupErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to cleanup staged layer: %v\n", cleanupErr) + } + return 1, fmt.Errorf("failed to apply staged layer: %w", err) + } + + // Output the result + if jsonOutput { + return outputJSON(map[string]interface{}{"id": layer.ID, "size": diffOutput.Size}) + } + fmt.Printf("%s\n", layer.ID) + return 0, nil +} + +func init() { + commands = append(commands, command{ + names: []string{"json-rpc-server"}, + optionsHelp: "[options]", + usage: "Start a JSON-RPC server", + minArgs: 0, + maxArgs: 0, + action: splitfdstreamServer, + addFlags: func(flags *mflag.FlagSet, cmd *command) { + flags.StringVar(&splitfdstreamSocket, []string{"-socket"}, "", + "Path to UNIX socket") + }, + }) + commands = append(commands, command{ + names: []string{"apply-splitfdstream"}, + optionsHelp: "[options] layerID", + usage: "Fetch a layer from remote server and apply it locally", + minArgs: 1, + maxArgs: 1, + action: applySplitfdstream, + addFlags: func(flags *mflag.FlagSet, cmd *command) { + flags.StringVar(&applyFdstreamSocket, []string{"-socket"}, "", + "Path to remote UNIX socket") + flags.StringVar(&applyFdstreamParent, []string{"-parent"}, "", + "Parent layer ID for the new layer") + flags.StringVar(&applyFdstreamMountLabel, []string{"-mount-label"}, "", + "SELinux mount label for the layer") + flags.BoolVar(&jsonOutput, []string{"-json", "j"}, jsonOutput, "Prefer JSON output") + }, + }) +} From 42fcc4e555a62809f61b8ccfca3a055511241668 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 9 Feb 2026 11:14:41 +0100 Subject: [PATCH 12/12] storage/tests: add tests for splitfdstream Signed-off-by: Giuseppe Scrivano --- storage/tests/splitfdstream.bats | 133 +++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100755 storage/tests/splitfdstream.bats diff --git a/storage/tests/splitfdstream.bats b/storage/tests/splitfdstream.bats new file mode 100755 index 0000000000..79c09f3b39 --- /dev/null +++ b/storage/tests/splitfdstream.bats @@ -0,0 +1,133 @@ +#!/usr/bin/env bats + +load helpers + +# start_server launches the json-rpc-server in the background, fully +# detached from bats file descriptors so it won't block test output. +# Sets SERVER_PID to the actual binary PID. +start_server() { + local extra_args=("$@") + ${STORAGE_BINARY} --graph ${TESTDIR}/root --run ${TESTDIR}/runroot \ + --storage-driver ${STORAGE_DRIVER} \ + ${STORAGE_OPTION:+--storage-opt=${STORAGE_OPTION}} \ + json-rpc-server "${extra_args[@]}" \ + /dev/null 2>&1 3>&- 4>&- 5>&- 6>&- 7>&- 8>&- 9>&- & + SERVER_PID=$! +} + +# stop_server kills the json-rpc-server and waits for it to exit. +stop_server() { + if [[ -n "$SERVER_PID" ]]; then + kill "$SERVER_PID" 2>/dev/null || true + wait "$SERVER_PID" 2>/dev/null || true + SERVER_PID= + fi +} + +# Override teardown to stop the server before the default teardown runs +# storage wipe. If the server is still running it holds the store lock +# and wipe would deadlock. +teardown() { + stop_server + run storage wipe + if [[ $status -ne 0 ]] ; then + echo "$output" + fi + run storage shutdown + if [[ $status -ne 0 ]] ; then + echo "$output" + fi + rm -fr ${TESTDIR} +} + +@test "splitfdstream json-rpc-server and apply-splitfdstream" { + case "$STORAGE_DRIVER" in + overlay*) + ;; + *) + skip "driver $STORAGE_DRIVER does not support splitfdstream" + ;; + esac + + # Create and populate a test layer + populate + + # Get the socket path from runroot + local runroot=`storage status 2>&1 | awk '/^Run Root:/{print $3}'` + local socket_path="$runroot/json-rpc.sock" + + # Start the JSON-RPC server in the background + start_server --socket "$socket_path" + + # Wait for socket to be created (max 10 seconds) + local count=0 + while [[ ! -S "$socket_path" && $count -lt 50 ]]; do + sleep 0.2 + count=$((count + 1)) + done + + # Check that the socket exists + [ -S "$socket_path" ] + + # Create a new layer using apply-splitfdstream + # This should connect to our JSON-RPC server and fetch the layer + run storage --debug=false apply-splitfdstream --socket "$socket_path" "$lowerlayer" + echo "apply-splitfdstream output: $output" + [ "$status" -eq 0 ] + [ "$output" != "" ] + + applied_layer="$output" + + # Verify the layer was created + run storage --debug=false layers + [ "$status" -eq 0 ] + [[ "$output" =~ "$applied_layer" ]] + + # Check that we can mount the applied layer + run storage --debug=false mount "$applied_layer" + [ "$status" -eq 0 ] + [ "$output" != "" ] + local applied_mount="$output" + + # Verify some expected content exists (from populate function) + [ -f "$applied_mount/layer1file1" ] + [ -f "$applied_mount/layer1file2" ] + [ -d "$applied_mount/layerdir1" ] + + # Unmount the layer + run storage unmount "$applied_layer" + [ "$status" -eq 0 ] + + # Kill the server before teardown runs storage wipe (which needs the store lock) + stop_server +} + +@test "splitfdstream server socket path uses runroot" { + case "$STORAGE_DRIVER" in + overlay*) + ;; + *) + skip "driver $STORAGE_DRIVER does not support splitfdstream" + ;; + esac + + # Get the expected socket path from runroot + local runroot=`storage status 2>&1 | awk '/^Run Root:/{print $3}'` + local expected_socket="$runroot/json-rpc.sock" + + # Start the JSON-RPC server in the background + start_server + + # Wait for socket to be created (max 10 seconds) + local count=0 + while [[ ! -S "$expected_socket" && $count -lt 50 ]]; do + sleep 0.2 + count=$((count + 1)) + done + + # Verify the socket is created in the correct location + [ -S "$expected_socket" ] + + # Kill the server before teardown runs storage wipe (which needs the store lock) + stop_server +}