Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions integration/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io"
"net"
"sync"
"testing"

transferapi "github.com/containerd/containerd/api/services/transfer/v1"
Expand Down Expand Up @@ -132,10 +133,14 @@ func (sc *vmStreamCreator) Create(ctx context.Context, id string) (streaming.Str
return &framedStream{conn: conn}, nil
}

// maxFrameSize is the maximum allowed frame payload (10 MiB).
const maxFrameSize = 10 << 20
Comment thread
dmcgowan marked this conversation as resolved.

// framedStream implements streaming.Stream over a net.Conn using
// length-prefixed proto framing (matching the vminitd vsockStream protocol).
type framedStream struct {
conn net.Conn
once sync.Once
}

func (s *framedStream) Send(a typeurl.Any) error {
Expand All @@ -157,6 +162,13 @@ func (s *framedStream) Recv() (typeurl.Any, error) {
if err := binary.Read(s.conn, binary.BigEndian, &length); err != nil {
return nil, err
}
// A zero-length frame is an application-level EOF marker.
if length == 0 {
return nil, io.EOF
}
if length > maxFrameSize {
return nil, fmt.Errorf("frame size %d exceeds maximum %d", length, maxFrameSize)
}
data := make([]byte, length)
if _, err := io.ReadFull(s.conn, data); err != nil {
return nil, fmt.Errorf("failed to read frame data: %w", err)
Expand All @@ -169,14 +181,15 @@ func (s *framedStream) Recv() (typeurl.Any, error) {
}

func (s *framedStream) Close() error {
// Use half-close (shutdown write) instead of full close. SendStream
// calls Close() after sending all data; a full close can discard
// buffered data the VM hasn't read yet. Shutdown SHUT_WR signals
// EOF to the reader while letting buffered data drain.
if sc, ok := s.conn.(interface{ CloseWrite() error }); ok {
return sc.CloseWrite()
}
return s.conn.Close()
var err error
s.once.Do(func() {
// Send a zero-length frame as an application-level EOF signal.
// We avoid CloseWrite()/Close() here because the vsock proxy
// sends a full bidirectional SHUTDOWN when it sees transport-level
// EOF, which kills the reverse direction (window updates) too.
err = binary.Write(s.conn, binary.BigEndian, uint32(0))
Comment thread
dmcgowan marked this conversation as resolved.
})
return err
}

// nopWriteCloser wraps an io.Writer with a no-op Close method.
Expand Down
21 changes: 17 additions & 4 deletions plugins/shim/streaming/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func init() {
})
}

// maxFrameSize is the maximum allowed frame payload (10 MiB).
const maxFrameSize = 10 << 20

Comment thread
dmcgowan marked this conversation as resolved.
type service struct {
sb sandbox.Sandbox
}
Expand Down Expand Up @@ -101,10 +104,13 @@ func (s *service) Stream(ctx context.Context, srv streamapi.TTRPCStreaming_Strea
// TTRPC -> VM: receive typeurl.Any from containerd, frame and write to VM
go func() {
err := bridgeTTRPCToVM(srv, vmConn)
// Half-close the write side so the VM sees EOF on its reads
// while still allowing data to flow back from VM -> TTRPC.
if cw, ok := vmConn.(interface{ CloseWrite() error }); ok {
cw.CloseWrite()
// Send a zero-length frame as an application-level EOF marker
// so the VM sees EOF on its reads. We avoid CloseWrite()
// because the vsock proxy turns transport-level shutdown into
// a bidirectional SHUTDOWN, which kills the reverse direction
// (VM -> TTRPC) and can cause the peer to lose in-flight data.
if eofErr := binary.Write(vmConn, binary.BigEndian, uint32(0)); eofErr != nil && err == nil {
err = fmt.Errorf("failed to write EOF marker to vm: %w", eofErr)
}
done <- err
}()
Expand Down Expand Up @@ -168,6 +174,13 @@ func bridgeVMToTTRPC(conn io.Reader, srv streamapi.TTRPCStreaming_StreamServer)
if err := binary.Read(conn, binary.BigEndian, &length); err != nil {
return err
}
// A zero-length frame is an application-level EOF marker.
if length == 0 {
return nil
}
if length > maxFrameSize {
return fmt.Errorf("frame size %d exceeds maximum %d", length, maxFrameSize)
}
data := make([]byte, length)
if _, err := io.ReadFull(conn, data); err != nil {
return fmt.Errorf("failed to read frame data from vm: %w", err)
Expand Down
24 changes: 23 additions & 1 deletion plugins/vminit/streaming/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,16 @@ func (sg *streamGetter) Get(ctx context.Context, name string) (streaming.Stream,
return &vsockStream{conn: conn}, nil
}

// maxFrameSize is the maximum allowed frame payload (10 MiB). Frames
// larger than this are rejected to prevent OOM from buggy/malicious peers.
const maxFrameSize = 10 << 20

// vsockStream wraps a net.Conn with length-prefixed proto framing to
// implement the streaming.Stream interface. Each message is framed as
// a 4-byte big-endian length prefix followed by serialized proto bytes.
type vsockStream struct {
conn net.Conn
once sync.Once // ensures Close sends EOF exactly once
}

func (s *vsockStream) Send(a typeurl.Any) error {
Expand All @@ -237,6 +242,13 @@ func (s *vsockStream) Recv() (typeurl.Any, error) {
if err := binary.Read(s.conn, binary.BigEndian, &length); err != nil {
return nil, err
}
// A zero-length frame is an application-level EOF marker.
if length == 0 {
return nil, io.EOF
}
if length > maxFrameSize {
return nil, fmt.Errorf("frame size %d exceeds maximum %d", length, maxFrameSize)
}
data := make([]byte, length)
if _, err := io.ReadFull(s.conn, data); err != nil {
return nil, fmt.Errorf("failed to read frame data: %w", err)
Expand All @@ -249,5 +261,15 @@ func (s *vsockStream) Recv() (typeurl.Any, error) {
}

func (s *vsockStream) Close() error {
return s.conn.Close()
var err error
s.once.Do(func() {
// Send a zero-length frame as an application-level EOF marker.
// Do NOT close the underlying connection here — Close() is called
// by the send direction while the receive direction may still be
// reading from the same conn. The shim-side bridge defers
// vmConn.Close() when both directions complete, which tears down
// the kernel-level vsock connection.
err = binary.Write(s.conn, binary.BigEndian, uint32(0))
Comment thread
dmcgowan marked this conversation as resolved.
})
return err
Comment thread
dmcgowan marked this conversation as resolved.
}
Loading
Loading