From 6735717bbd755feae4af2ceda232075c133d388f Mon Sep 17 00:00:00 2001 From: Dawei Huang Date: Thu, 26 Feb 2026 09:13:01 -0600 Subject: [PATCH] =?UTF-8?q?Fix=20TransferToRemote=20DPU=20connection:=20re?= =?UTF-8?q?place=20localhost=20loopback=20with=20=E2=80=A6=20(#591)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix TransferToRemote DPU connection: replace localhost loopback with direct DPU connection HandleTransferToRemoteForDPUStreaming previously dialed localhost:8080 with insecure credentials to issue a File/Put RPC that the DPU proxy would intercept and forward. This breaks when the gNMI server runs on a different port or with TLS enabled. Replace the loopback pattern with dpuproxy.GetDPUConnection(), which resolves DPU info via Redis and returns a cached gRPC connection directly to the target DPU. This eliminates the hardcoded address, the insecure dial, and the need for DPU routing metadata headers on the outgoing request. Also removes HandleTransferToRemoteForDPU (disk-based two-phase version) which was never called in production. Signed-off-by: Dawei Huang * Fix gomonkey inlining issue with NewFileClient on Go 1.24 Extract gnoi_file_pb.NewFileClient into a package-level variable so tests can patch it via ApplyGlobalVar instead of ApplyFunc. The generated protobuf function is tiny and gets inlined by Go 1.24, which defeats gomonkey's function-level patching and causes nil pointer panics in CI. Signed-off-by: Dawei Huang --------- Signed-off-by: Dawei Huang --- gnmi_server/gnoi_file_test.go | 2 +- pkg/gnoi/file/dpu_monkey_test.go | 134 +++------- pkg/gnoi/file/file.go | 179 +------------ pkg/gnoi/file/file_test.go | 415 ++++++----------------------- pkg/interceptors/dpuproxy/proxy.go | 32 +++ pkg/interceptors/setup.go | 1 + 6 files changed, 155 insertions(+), 608 deletions(-) diff --git a/gnmi_server/gnoi_file_test.go b/gnmi_server/gnoi_file_test.go index bfdc7f38..17c314ac 100644 --- a/gnmi_server/gnoi_file_test.go +++ b/gnmi_server/gnoi_file_test.go @@ -240,7 +240,7 @@ func TestGnoiFileServer(t *testing.T) { // Mock HandleTransferToRemoteForDPUStreaming to succeed patches.ApplyFunc(gnoifile.HandleTransferToRemoteForDPUStreaming, - func(ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest, dpuIndex string, dpuAddr string) (*gnoi_file_pb.TransferToRemoteResponse, error) { + func(ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest, dpuIndex string) (*gnoi_file_pb.TransferToRemoteResponse, error) { return &gnoi_file_pb.TransferToRemoteResponse{}, nil }) diff --git a/pkg/gnoi/file/dpu_monkey_test.go b/pkg/gnoi/file/dpu_monkey_test.go index 3895f2c6..7ff35aac 100644 --- a/pkg/gnoi/file/dpu_monkey_test.go +++ b/pkg/gnoi/file/dpu_monkey_test.go @@ -2,18 +2,16 @@ package file import ( "context" - "crypto/md5" "io" "os" "strings" "testing" - "time" "github.com/agiledragon/gomonkey/v2" "github.com/openconfig/gnoi/common" gnoi_file_pb "github.com/openconfig/gnoi/file" - "github.com/openconfig/gnoi/types" "github.com/sonic-net/sonic-gnmi/internal/download" + "github.com/sonic-net/sonic-gnmi/pkg/interceptors/dpuproxy" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -22,44 +20,19 @@ func TestDPU_SuccessPathMocking(t *testing.T) { patches := gomonkey.NewPatches() defer patches.Reset() - // 1. Mock HandleTransferToRemote to simulate successful download - patches.ApplyFunc(HandleTransferToRemote, func(ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest) (*gnoi_file_pb.TransferToRemoteResponse, error) { - // Create actual temp file so the rest of the logic can read it - testData := []byte("mock downloaded data") - if err := os.WriteFile(req.LocalPath, testData, 0644); err == nil { - // Also create the /mnt/host version to test container path logic - hostPath := "/mnt/host" + req.LocalPath - os.MkdirAll("/mnt/host/tmp", 0755) - os.WriteFile(hostPath, testData, 0644) - } - - hash := md5.Sum(testData) - return &gnoi_file_pb.TransferToRemoteResponse{ - Hash: &types.HashType{ - Method: types.HashType_MD5, - Hash: hash[:], - }, - }, nil - }) - - // 2. Mock grpc.Dial to return a connection, and mock its Close method - patches.ApplyFunc(grpc.Dial, func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + // 1. Mock dpuproxy.GetDPUConnection to return a connection + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { return &grpc.ClientConn{}, nil }) - // Mock ClientConn.Close to avoid nil pointer panic - patches.ApplyMethod(&grpc.ClientConn{}, "Close", func(*grpc.ClientConn) error { - return nil - }) - - // 3. Mock gnoi_file_pb.NewFileClient - patches.ApplyFunc(gnoi_file_pb.NewFileClient, func(cc grpc.ClientConnInterface) gnoi_file_pb.FileClient { + // 2. Mock newFileClient package variable (avoids inlining issues with gomonkey) + patches.ApplyGlobalVar(&newFileClient, func(cc grpc.ClientConnInterface) gnoi_file_pb.FileClient { return &mockSuccessFileClient{} }) - // 4. Mock os.Remove to always succeed (for cleanup testing) - patches.ApplyFunc(os.Remove, func(name string) error { - return nil + // 3. Mock the download package's DownloadHTTPStreaming + patches.ApplyFunc(download.DownloadHTTPStreaming, func(ctx context.Context, url string, maxFileSize int64) (io.ReadCloser, int64, error) { + return io.NopCloser(strings.NewReader("streaming test data")), 18, nil }) req := &gnoi_file_pb.TransferToRemoteRequest{ @@ -70,57 +43,30 @@ func TestDPU_SuccessPathMocking(t *testing.T) { }, } - // Test HandleTransferToRemoteForDPU success path - resp, err := HandleTransferToRemoteForDPU(context.Background(), req, "dpu1", "localhost:8080") - if err != nil { - t.Logf("DPU function returned error: %v", err) - } else { - t.Logf("DPU function success: %v", resp) - } - // Test HandleTransferToRemoteForDPUStreaming success path - resp2, err2 := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "dpu2", "localhost:8080") - if err2 != nil { - t.Logf("DPU streaming returned error: %v", err2) + resp, err := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "dpu2") + if err != nil { + t.Logf("DPU streaming returned error: %v", err) } else { - t.Logf("DPU streaming success: %v", resp2) + t.Logf("DPU streaming success: %v", resp) } } func TestDPU_ContainerPathBranches(t *testing.T) { - // Test specifically to hit container path logic without causing infinite recursion patches := gomonkey.NewPatches() defer patches.Reset() - // Mock HandleTransferToRemote to succeed and create files - patches.ApplyFunc(HandleTransferToRemote, func(ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest) (*gnoi_file_pb.TransferToRemoteResponse, error) { - testData := []byte("container test data") - hash := md5.Sum(testData) - return &gnoi_file_pb.TransferToRemoteResponse{ - Hash: &types.HashType{Hash: hash[:]}, - }, nil - }) - - // Mock os.ReadFile to return success (simulate file exists) - patches.ApplyFunc(os.ReadFile, func(name string) ([]byte, error) { - return []byte("mock file content"), nil - }) - - // Mock grpc.Dial and related functions to succeed but fail at Send - patches.ApplyFunc(grpc.Dial, func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + // Mock dpuproxy.GetDPUConnection + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { return &grpc.ClientConn{}, nil }) - patches.ApplyMethod(&grpc.ClientConn{}, "Close", func(*grpc.ClientConn) error { - return nil - }) - - patches.ApplyFunc(gnoi_file_pb.NewFileClient, func(cc grpc.ClientConnInterface) gnoi_file_pb.FileClient { + patches.ApplyGlobalVar(&newFileClient, func(cc grpc.ClientConnInterface) gnoi_file_pb.FileClient { return &mockFailureFileClient{} }) - patches.ApplyFunc(os.Remove, func(name string) error { - return nil + patches.ApplyFunc(download.DownloadHTTPStreaming, func(ctx context.Context, url string, maxFileSize int64) (io.ReadCloser, int64, error) { + return io.NopCloser(strings.NewReader("container test data")), 18, nil }) req := &gnoi_file_pb.TransferToRemoteRequest{ @@ -132,20 +78,23 @@ func TestDPU_ContainerPathBranches(t *testing.T) { } // This should exercise the DPU path with mocked components - _, err := HandleTransferToRemoteForDPU(context.Background(), req, "container-test", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "container-test") t.Logf("Container path test result: %v", err) } func TestDPU_ErrorPaths(t *testing.T) { - // Test various error conditions to get better coverage patches := gomonkey.NewPatches() defer patches.Reset() - // Test with HandleTransferToRemote failure - patches.ApplyFunc(HandleTransferToRemote, func(ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest) (*gnoi_file_pb.TransferToRemoteResponse, error) { + // Mock dpuproxy.GetDPUConnection to fail + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { return nil, os.ErrNotExist }) + patches.ApplyFunc(download.DownloadHTTPStreaming, func(ctx context.Context, url string, maxFileSize int64) (io.ReadCloser, int64, error) { + return io.NopCloser(strings.NewReader("error test data")), 14, nil + }) + req := &gnoi_file_pb.TransferToRemoteRequest{ LocalPath: "/tmp/error_test.bin", RemoteDownload: &common.RemoteDownload{ @@ -154,32 +103,26 @@ func TestDPU_ErrorPaths(t *testing.T) { }, } - _, err := HandleTransferToRemoteForDPU(context.Background(), req, "error-test", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "error-test") t.Logf("Error path test result: %v", err) } func TestDPU_StreamingSuccess(t *testing.T) { - // Test the streaming DPU function to increase coverage patches := gomonkey.NewPatches() defer patches.Reset() // Mock the download package's DownloadHTTPStreaming patches.ApplyFunc(download.DownloadHTTPStreaming, func(ctx context.Context, url string, maxFileSize int64) (io.ReadCloser, int64, error) { - // Return a mock reader that simulates HTTP response body return io.NopCloser(strings.NewReader("streaming test data")), 18, nil }) - // Mock grpc.Dial - patches.ApplyFunc(grpc.Dial, func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + // Mock dpuproxy.GetDPUConnection + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { return &grpc.ClientConn{}, nil }) - patches.ApplyMethod(&grpc.ClientConn{}, "Close", func(*grpc.ClientConn) error { - return nil - }) - // Mock file client to return success - patches.ApplyFunc(gnoi_file_pb.NewFileClient, func(cc grpc.ClientConnInterface) gnoi_file_pb.FileClient { + patches.ApplyGlobalVar(&newFileClient, func(cc grpc.ClientConnInterface) gnoi_file_pb.FileClient { return &mockSuccessFileClient{} }) @@ -191,7 +134,7 @@ func TestDPU_StreamingSuccess(t *testing.T) { }, } - resp, err := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "stream-test", "localhost:8080") + resp, err := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "stream-test") if err != nil { t.Logf("Streaming test returned error: %v", err) } else { @@ -200,7 +143,6 @@ func TestDPU_StreamingSuccess(t *testing.T) { } func TestDPU_StreamingError(t *testing.T) { - // Test streaming function with DownloadHTTPStreaming failure patches := gomonkey.NewPatches() defer patches.Reset() @@ -216,27 +158,11 @@ func TestDPU_StreamingError(t *testing.T) { }, } - _, err := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "stream-error-test", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "stream-error-test") t.Logf("Streaming error test result: %v", err) } // Mock implementations -type mockFileInfo struct { - name string - isDir bool -} - -func (m *mockFileInfo) Name() string { return m.name } -func (m *mockFileInfo) Size() int64 { return 0 } -func (m *mockFileInfo) Mode() os.FileMode { - if m.isDir { - return os.ModeDir - } - return 0644 -} -func (m *mockFileInfo) ModTime() time.Time { return time.Now() } -func (m *mockFileInfo) IsDir() bool { return m.isDir } -func (m *mockFileInfo) Sys() interface{} { return nil } type mockSuccessFileClient struct{} diff --git a/pkg/gnoi/file/file.go b/pkg/gnoi/file/file.go index b6b09866..0eab32c9 100644 --- a/pkg/gnoi/file/file.go +++ b/pkg/gnoi/file/file.go @@ -20,9 +20,8 @@ import ( "github.com/openconfig/gnoi/types" "github.com/sonic-net/sonic-gnmi/internal/download" "github.com/sonic-net/sonic-gnmi/internal/hash" - "google.golang.org/grpc" + "github.com/sonic-net/sonic-gnmi/pkg/interceptors/dpuproxy" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -35,6 +34,10 @@ const ( maxFileSize = 4 * 1024 * 1024 * 1024 // 4GB in bytes ) +// newFileClient wraps gnoi_file_pb.NewFileClient to allow test patching +// (the generated function is tiny and gets inlined, defeating gomonkey). +var newFileClient = gnoi_file_pb.NewFileClient + // HandleTransferToRemote implements the complete logic for the TransferToRemote RPC. // It validates the request, checks for DPU metadata, and routes accordingly. // @@ -68,7 +71,7 @@ func HandleTransferToRemote( // If DPU headers are present, handle DPU transfer logic using efficient streaming if targetType == "dpu" && targetIndex != "" { log.Infof("[TransferToRemote] DPU routing detected: target-type=%s, target-index=%s", targetType, targetIndex) - return HandleTransferToRemoteForDPUStreaming(ctx, req, targetIndex, "localhost:8080") + return HandleTransferToRemoteForDPUStreaming(ctx, req, targetIndex) } } @@ -359,160 +362,16 @@ func HandlePut(stream gnoi_file_pb.File_PutServer) error { return stream.SendAndClose(&gnoi_file_pb.PutResponse{}) } -// HandleTransferToRemoteForDPU handles TransferToRemote when DPU headers are present. -// It downloads the file to NPU first, then uploads it to the specified DPU using File.Put. -// -// This function implements the complete DPU transfer workflow: -// - Downloads file to a temporary location on NPU -// - Reads the downloaded file with container path translation -// - Creates a gRPC connection with DPU routing metadata -// - Uploads the file to DPU using File.Put streaming RPC -// - Cleans up temporary files -// -// Parameters: -// - ctx: Context for the operation -// - req: Original TransferToRemoteRequest with target DPU path -// - dpuIndex: Target DPU index (e.g., "0", "1") -// - proxyAddress: Address of the gNOI proxy server (e.g., "localhost:8080") -// -// Returns: -// - TransferToRemoteResponse with MD5 hash on success -// - Error with appropriate gRPC status code on failure -func HandleTransferToRemoteForDPU( - ctx context.Context, - req *gnoi_file_pb.TransferToRemoteRequest, - dpuIndex string, - proxyAddress string, -) (*gnoi_file_pb.TransferToRemoteResponse, error) { - // Validate inputs - if req == nil { - return nil, status.Error(codes.InvalidArgument, "request cannot be nil") - } - if dpuIndex == "" { - return nil, status.Error(codes.InvalidArgument, "dpuIndex cannot be empty") - } - if proxyAddress == "" { - return nil, status.Error(codes.InvalidArgument, "proxyAddress cannot be empty") - } - - // Step 1: Download file to NPU temp location - tempPath := fmt.Sprintf("/tmp/dpu_transfer_%s_%d", dpuIndex, time.Now().UnixNano()) - - // Create a modified request with temp path - npuReq := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: tempPath, - RemoteDownload: req.GetRemoteDownload(), - } - - // Download to NPU - resp, err := HandleTransferToRemote(ctx, npuReq) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to download to NPU: %v", err) - } - - // Ensure cleanup of temp file (use the same path translation as reading) - defer func() { - cleanupPath := tempPath - if _, err := os.Stat("/mnt/host"); err == nil { - cleanupPath = "/mnt/host" + tempPath - } - if err := os.Remove(cleanupPath); err != nil { - // Log warning but don't fail the operation - } - }() - - // Step 2: Read the downloaded file (apply container path translation) - // The file was downloaded to host filesystem via HandleTransferToRemote, so we need to read from /mnt/host prefix - translatedTempPath := tempPath - if _, err := os.Stat("/mnt/host"); err == nil { - translatedTempPath = "/mnt/host" + tempPath - } - - fileData, err := os.ReadFile(translatedTempPath) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to read downloaded file: %v", err) - } - - // Step 3: Connect to DPU via proxy - // Create metadata for DPU routing - md := metadata.New(map[string]string{ - "x-sonic-ss-target-type": "dpu", - "x-sonic-ss-target-index": dpuIndex, - }) - outCtx := metadata.NewOutgoingContext(ctx, md) - - // Connect to local gNOI server which will proxy to DPU - conn, err := grpc.Dial(proxyAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to connect to proxy: %v", err) - } - defer conn.Close() - - fileClient := gnoi_file_pb.NewFileClient(conn) - - // Step 4: Use File.Put to upload to DPU - putClient, err := fileClient.Put(outCtx) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to create Put client: %v", err) - } - - // Send Open request - openReq := &gnoi_file_pb.PutRequest{ - Request: &gnoi_file_pb.PutRequest_Open{ - Open: &gnoi_file_pb.PutRequest_Details{ - RemoteFile: req.GetLocalPath(), // Use original target path - Permissions: 0644, - }, - }, - } - if err := putClient.Send(openReq); err != nil { - return nil, status.Errorf(codes.Internal, "failed to send open request: %v", err) - } - - // Send file contents in chunks - chunkSize := 64 * 1024 // 64KB chunks - for i := 0; i < len(fileData); i += chunkSize { - end := i + chunkSize - if end > len(fileData) { - end = len(fileData) - } - - contentReq := &gnoi_file_pb.PutRequest{ - Request: &gnoi_file_pb.PutRequest_Contents{ - Contents: fileData[i:end], - }, - } - if err := putClient.Send(contentReq); err != nil { - return nil, status.Errorf(codes.Internal, "failed to send content: %v", err) - } - } - - // Send hash (reuse the hash from download response) - hashReq := &gnoi_file_pb.PutRequest{ - Request: &gnoi_file_pb.PutRequest_Hash{ - Hash: resp.GetHash(), - }, - } - if err := putClient.Send(hashReq); err != nil { - return nil, status.Errorf(codes.Internal, "failed to send hash: %v", err) - } - - // Close and get response - if _, err := putClient.CloseAndRecv(); err != nil { - return nil, status.Errorf(codes.Internal, "failed to complete Put to DPU: %v", err) - } - - return resp, nil -} - // HandleTransferToRemoteForDPUStreaming implements efficient streaming proxy for DPU file transfers. // This function streams data directly from HTTP source to DPU without intermediate disk storage // or loading the entire file into memory. It calculates MD5 hash concurrently during streaming. +// +// The DPU connection is obtained directly via dpuproxy.GetDPUConnection, which returns a cached +// connection managed by the DPU proxy infrastructure. Callers must NOT close the connection. func HandleTransferToRemoteForDPUStreaming( ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest, dpuIndex string, - proxyAddress string, ) (*gnoi_file_pb.TransferToRemoteResponse, error) { // Validate inputs if req == nil { @@ -521,9 +380,6 @@ func HandleTransferToRemoteForDPUStreaming( if dpuIndex == "" { return nil, status.Error(codes.InvalidArgument, "dpuIndex cannot be empty") } - if proxyAddress == "" { - return nil, status.Error(codes.InvalidArgument, "proxyAddress cannot be empty") - } remoteDownload := req.GetRemoteDownload() if remoteDownload == nil { @@ -558,23 +414,16 @@ func HandleTransferToRemoteForDPUStreaming( } defer httpStream.Close() - // Step 2: Connect to DPU via proxy - md := metadata.New(map[string]string{ - "x-sonic-ss-target-type": "dpu", - "x-sonic-ss-target-index": dpuIndex, - }) - outCtx := metadata.NewOutgoingContext(streamCtx, md) - - conn, err := grpc.Dial(proxyAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + // Step 2: Get direct connection to DPU (cached, do NOT close) + conn, err := dpuproxy.GetDPUConnection(streamCtx, dpuIndex) if err != nil { - return nil, status.Errorf(codes.Internal, "failed to connect to proxy: %v", err) + return nil, status.Errorf(codes.Internal, "failed to get DPU connection: %v", err) } - defer conn.Close() - fileClient := gnoi_file_pb.NewFileClient(conn) + fileClient := newFileClient(conn) // Step 3: Create DPU Put stream - putClient, err := fileClient.Put(outCtx) + putClient, err := fileClient.Put(streamCtx) if err != nil { return nil, status.Errorf(codes.Internal, "failed to create Put client: %v", err) } diff --git a/pkg/gnoi/file/file_test.go b/pkg/gnoi/file/file_test.go index 064b2a7a..e5916f59 100644 --- a/pkg/gnoi/file/file_test.go +++ b/pkg/gnoi/file/file_test.go @@ -19,6 +19,8 @@ import ( gnoi_file_pb "github.com/openconfig/gnoi/file" "github.com/openconfig/gnoi/types" "github.com/sonic-net/sonic-gnmi/internal/download" + "github.com/sonic-net/sonic-gnmi/pkg/interceptors/dpuproxy" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -984,116 +986,9 @@ func TestHandlePut_InvalidMessageType(t *testing.T) { } } -func TestHandleTransferToRemoteForDPU_NilRequest(t *testing.T) { - ctx := context.Background() - _, err := HandleTransferToRemoteForDPU(ctx, nil, "0", "localhost:8080") - if err == nil { - t.Fatal("Expected error for nil request, got nil") - } - - st, ok := status.FromError(err) - if !ok || st.Code() != codes.InvalidArgument { - t.Errorf("Expected InvalidArgument error, got %v", err) - } - - if !strings.Contains(st.Message(), "request cannot be nil") { - t.Errorf("Error message = %q, want substring 'request cannot be nil'", st.Message()) - } -} - -func TestHandleTransferToRemoteForDPU_EmptyDpuIndex(t *testing.T) { - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: "http://example.com/file", - Protocol: common.RemoteDownload_HTTP, - }, - } - _, err := HandleTransferToRemoteForDPU(ctx, req, "", "localhost:8080") - if err == nil { - t.Fatal("Expected error for empty dpuIndex, got nil") - } - - st, ok := status.FromError(err) - if !ok || st.Code() != codes.InvalidArgument { - t.Errorf("Expected InvalidArgument error, got %v", err) - } - - if !strings.Contains(st.Message(), "dpuIndex cannot be empty") { - t.Errorf("Error message = %q, want substring 'dpuIndex cannot be empty'", st.Message()) - } -} - -func TestHandleTransferToRemoteForDPU_EmptyProxyAddress(t *testing.T) { - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: "http://example.com/file", - Protocol: common.RemoteDownload_HTTP, - }, - } - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "") - if err == nil { - t.Fatal("Expected error for empty proxyAddress, got nil") - } - - st, ok := status.FromError(err) - if !ok || st.Code() != codes.InvalidArgument { - t.Errorf("Expected InvalidArgument error, got %v", err) - } - - if !strings.Contains(st.Message(), "proxyAddress cannot be empty") { - t.Errorf("Error message = %q, want substring 'proxyAddress cannot be empty'", st.Message()) - } -} - -func TestHandleTransferToRemoteForDPU_InvalidRemoteDownload(t *testing.T) { - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/test.bin", - // Missing RemoteDownload - } - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "localhost:8080") - if err == nil { - t.Fatal("Expected error for missing remote download, got nil") - } - - // The error should come from HandleTransferToRemote validation - st, ok := status.FromError(err) - if !ok || st.Code() != codes.Internal { - t.Errorf("Expected Internal error (from failed download), got %v", err) - } -} - -func TestHandleTransferToRemoteForDPU_DownloadFailure(t *testing.T) { - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: "http://invalid-url-that-does-not-exist.example", - Protocol: common.RemoteDownload_HTTP, - }, - } - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "localhost:8080") - if err == nil { - t.Fatal("Expected error for download failure, got nil") - } - - st, ok := status.FromError(err) - if !ok || st.Code() != codes.Internal { - t.Errorf("Expected Internal error, got %v", err) - } - - if !strings.Contains(st.Message(), "failed to download to NPU") { - t.Errorf("Error message = %q, want substring 'failed to download to NPU'", st.Message()) - } -} - func TestHandleTransferToRemoteForDPUStreaming_NilRequest(t *testing.T) { ctx := context.Background() - _, err := HandleTransferToRemoteForDPUStreaming(ctx, nil, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, nil, "0") if err == nil { t.Fatal("Expected error for nil request, got nil") } @@ -1117,7 +1012,7 @@ func TestHandleTransferToRemoteForDPUStreaming_EmptyDpuIndex(t *testing.T) { Protocol: common.RemoteDownload_HTTP, }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "") if err == nil { t.Fatal("Expected error for empty dpuIndex, got nil") } @@ -1132,37 +1027,13 @@ func TestHandleTransferToRemoteForDPUStreaming_EmptyDpuIndex(t *testing.T) { } } -func TestHandleTransferToRemoteForDPUStreaming_EmptyProxyAddress(t *testing.T) { - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: "http://example.com/file", - Protocol: common.RemoteDownload_HTTP, - }, - } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "") - if err == nil { - t.Fatal("Expected error for empty proxyAddress, got nil") - } - - st, ok := status.FromError(err) - if !ok || st.Code() != codes.InvalidArgument { - t.Errorf("Expected InvalidArgument error, got %v", err) - } - - if !strings.Contains(st.Message(), "proxyAddress cannot be empty") { - t.Errorf("Error message = %q, want substring 'proxyAddress cannot be empty'", st.Message()) - } -} - func TestHandleTransferToRemoteForDPUStreaming_NilRemoteDownload(t *testing.T) { ctx := context.Background() req := &gnoi_file_pb.TransferToRemoteRequest{ LocalPath: "/tmp/test.bin", // Missing RemoteDownload } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected error for nil remote download, got nil") } @@ -1186,7 +1057,7 @@ func TestHandleTransferToRemoteForDPUStreaming_EmptyLocalPath(t *testing.T) { Protocol: common.RemoteDownload_HTTP, }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected error for empty local path, got nil") } @@ -1210,7 +1081,7 @@ func TestHandleTransferToRemoteForDPUStreaming_UnsupportedProtocol(t *testing.T) Protocol: common.RemoteDownload_HTTPS, // Unsupported }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected error for unsupported protocol, got nil") } @@ -1234,7 +1105,7 @@ func TestHandleTransferToRemoteForDPUStreaming_EmptyURL(t *testing.T) { Protocol: common.RemoteDownload_HTTP, }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected error for empty URL, got nil") } @@ -1258,7 +1129,7 @@ func TestHandleTransferToRemoteForDPUStreaming_InvalidURL(t *testing.T) { Protocol: common.RemoteDownload_HTTP, }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected error for invalid URL, got nil") } @@ -1284,7 +1155,7 @@ func TestHandleTransferToRemoteForDPUStreaming_ContextCancellation(t *testing.T) Protocol: common.RemoteDownload_HTTP, }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected error for cancelled context, got nil") } @@ -1296,83 +1167,10 @@ func TestHandleTransferToRemoteForDPUStreaming_ContextCancellation(t *testing.T) } } -func TestHandleTransferToRemoteForDPU_SuccessPath(t *testing.T) { - // Test successful DPU transfer with mock server - testContent := []byte("test DPU firmware content") - httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write(testContent) - })) - defer httpServer.Close() - - // Create a mock gRPC server to simulate DPU proxy - grpcServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Simulate successful gRPC response - w.WriteHeader(http.StatusOK) - w.Write([]byte("OK")) - })) - defer grpcServer.Close() - - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/dpu_test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: httpServer.URL, - Protocol: common.RemoteDownload_HTTP, - }, - } - - // This will fail at the gRPC connection step, but we test the HTTP download part - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "invalid-proxy:8080") - if err == nil { - t.Fatal("Expected error due to invalid proxy address, got nil") - } - - // Verify the error is from the proxy connection, not from earlier validation - st, ok := status.FromError(err) - if !ok || st.Code() != codes.Internal { - t.Errorf("Expected Internal error from proxy connection, got %v", err) - } - - if !strings.Contains(st.Message(), "failed to create Put client") && !strings.Contains(st.Message(), "failed to connect to proxy") { - t.Errorf("Error message = %q, want substring about connection failure", st.Message()) - } -} - -func TestHandleTransferToRemoteForDPU_FileReadFailure(t *testing.T) { - // Test scenario where file download succeeds but reading fails - testContent := []byte("test content") - httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write(testContent) - })) - defer httpServer.Close() - - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/read_fail_test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: httpServer.URL, - Protocol: common.RemoteDownload_HTTP, - }, - } - - // This test exercises the file download and read path - // It will fail at gRPC connection but exercises more code - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "127.0.0.1:99999") // Invalid port - if err == nil { - t.Fatal("Expected error, got nil") - } - - // Should be an internal error from proxy connection failure - st, ok := status.FromError(err) - if !ok || st.Code() != codes.Internal { - t.Errorf("Expected Internal error, got %v", err) - } -} - func TestHandleTransferToRemoteForDPUStreaming_SuccessPath(t *testing.T) { // Test successful streaming transfer with chunked data + // HTTP streaming works, but DPU connection is mocked to return error + // to validate the HTTP streaming part before the DPU connection step. testContent := make([]byte, 200*1024) // 200KB test data for i := range testContent { testContent[i] = byte(i % 256) @@ -1394,6 +1192,14 @@ func TestHandleTransferToRemoteForDPUStreaming_SuccessPath(t *testing.T) { })) defer httpServer.Close() + // Mock dpuproxy.GetDPUConnection to return error so the test validates + // HTTP streaming works before the DPU connection step fails. + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + return nil, fmt.Errorf("mock DPU connection failure") + }) + ctx := context.Background() req := &gnoi_file_pb.TransferToRemoteRequest{ LocalPath: "/tmp/streaming_test.bin", @@ -1403,21 +1209,17 @@ func TestHandleTransferToRemoteForDPUStreaming_SuccessPath(t *testing.T) { }, } - // This will fail at gRPC connection but exercises the HTTP streaming and hash calculation - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "invalid-proxy:8080") + // This will fail at DPU connection but exercises the HTTP streaming and hash calculation + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { - t.Fatal("Expected error due to invalid proxy, got nil") + t.Fatal("Expected error due to DPU connection failure, got nil") } - // Should fail at proxy connection step + // Should fail at DPU connection step st, ok := status.FromError(err) if !ok || st.Code() != codes.Internal { t.Errorf("Expected Internal error, got %v", err) } - - if !strings.Contains(st.Message(), "failed to create Put client") && !strings.Contains(st.Message(), "failed to connect to proxy") { - t.Errorf("Error message = %q, want substring about connection failure", st.Message()) - } } func TestHandleTransferToRemoteForDPUStreaming_NetworkError(t *testing.T) { @@ -1431,7 +1233,7 @@ func TestHandleTransferToRemoteForDPUStreaming_NetworkError(t *testing.T) { }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected error for network failure, got nil") } @@ -1469,7 +1271,7 @@ func TestHandleTransferToRemoteForDPUStreaming_TimeoutDuringStream(t *testing.T) }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected timeout error, got nil") } @@ -1795,38 +1597,6 @@ func TestTranslatePathForContainer_Coverage(t *testing.T) { } } -// Test coverage for DPU functions by creating scenario where HTTP succeeds but gRPC fails -func TestHandleTransferToRemoteForDPU_HTTPSuccessGRPCFail(t *testing.T) { - // Create HTTP server that works - testContent := []byte("DPU transfer test content with enough data to test the full download and read path") - httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write(testContent) - })) - defer httpServer.Close() - - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/dpu_http_success.bin", - RemoteDownload: &common.RemoteDownload{ - Path: httpServer.URL, - Protocol: common.RemoteDownload_HTTP, - }, - } - - // Use invalid proxy to force gRPC failure AFTER HTTP download succeeds - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "localhost:99999") - if err == nil { - t.Fatal("Expected error from gRPC connection failure") - } - - // Should be connection error to proxy - st, ok := status.FromError(err) - if !ok || st.Code() != codes.Internal { - t.Errorf("Expected Internal error, got %v", err) - } -} - func TestHandleTransferToRemoteForDPUStreaming_HTTPSuccessGRPCFail(t *testing.T) { // Create large HTTP content to test streaming path largeContent := make([]byte, 512*1024) // 512KB @@ -1850,6 +1620,13 @@ func TestHandleTransferToRemoteForDPUStreaming_HTTPSuccessGRPCFail(t *testing.T) })) defer httpServer.Close() + // Mock dpuproxy.GetDPUConnection to return error to force gRPC failure + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + return nil, fmt.Errorf("mock DPU connection failure") + }) + ctx := context.Background() req := &gnoi_file_pb.TransferToRemoteRequest{ LocalPath: "/tmp/dpu_streaming_test.bin", @@ -1859,13 +1636,13 @@ func TestHandleTransferToRemoteForDPUStreaming_HTTPSuccessGRPCFail(t *testing.T) }, } - // Use invalid proxy to force gRPC failure AFTER HTTP streaming succeeds - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:99999") + // DPU connection failure AFTER HTTP streaming succeeds + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { - t.Fatal("Expected error from gRPC connection failure") + t.Fatal("Expected error from DPU connection failure") } - // Should be connection error to proxy + // Should be connection error from DPU connection st, ok := status.FromError(err) if !ok || st.Code() != codes.Internal { t.Errorf("Expected Internal error, got %v", err) @@ -2120,35 +1897,6 @@ func TestValidatePath_HighCoverage(t *testing.T) { // Additional DPU function tests with more coverage func TestDPUFunctions_MoreCoverage(t *testing.T) { - t.Run("DPU with large file and connection failure", func(t *testing.T) { - // Test with larger file to exercise more of the DPU code paths - largeContent := make([]byte, 2*1024*1024) // 2MB - for i := range largeContent { - largeContent[i] = byte(i % 256) - } - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write(largeContent) - })) - defer server.Close() - - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/large_dpu_test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: server.URL, - Protocol: common.RemoteDownload_HTTP, - }, - } - - // This exercises more of the file reading logic before gRPC fails - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "127.0.0.1:99999") - if err == nil { - t.Fatal("Expected connection error") - } - }) - t.Run("DPU streaming with very large content", func(t *testing.T) { // Test streaming with large content to exercise chunk processing hugeContent := make([]byte, 4*1024*1024) // 4MB @@ -2181,8 +1929,15 @@ func TestDPUFunctions_MoreCoverage(t *testing.T) { }, } - // This exercises the streaming and hash calculation extensively before gRPC failure - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "127.0.0.1:99999") + // Mock dpuproxy.GetDPUConnection to return error + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + return nil, fmt.Errorf("mock DPU connection failure") + }) + + // This exercises the streaming and hash calculation extensively before DPU connection failure + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected connection error") } @@ -2194,7 +1949,7 @@ func TestDPUFunctions_AggressiveCoverage(t *testing.T) { // Test all the early validation and file handling paths extensively t.Run("DPU container path logic", func(t *testing.T) { - // Test the container path translation logic in DPU functions + // Test the container path translation logic in DPU streaming function testContent := []byte("container path test") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) @@ -2202,6 +1957,13 @@ func TestDPUFunctions_AggressiveCoverage(t *testing.T) { })) defer server.Close() + // Mock dpuproxy.GetDPUConnection to return error + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + return nil, fmt.Errorf("mock DPU connection failure") + }) + req := &gnoi_file_pb.TransferToRemoteRequest{ LocalPath: "/tmp/container_path_test.bin", RemoteDownload: &common.RemoteDownload{ @@ -2212,13 +1974,12 @@ func TestDPUFunctions_AggressiveCoverage(t *testing.T) { ctx := context.Background() - // Test both DPU functions to exercise container path logic - _, err1 := HandleTransferToRemoteForDPU(ctx, req, "0", "localhost:99999") - _, err2 := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:99999") + // Test streaming DPU function to exercise container path logic + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") - // Both should fail at gRPC connection but exercise file download/streaming - if err1 == nil || err2 == nil { - t.Fatal("Expected connection errors") + // Should fail at DPU connection but exercise file streaming + if err == nil { + t.Fatal("Expected connection error") } }) @@ -2236,6 +1997,13 @@ func TestDPUFunctions_AggressiveCoverage(t *testing.T) { })) defer server.Close() + // Mock dpuproxy.GetDPUConnection to return error + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + return nil, fmt.Errorf("mock DPU connection failure") + }) + req := &gnoi_file_pb.TransferToRemoteRequest{ LocalPath: fmt.Sprintf("/tmp/dpu_%s_test.bin", idx), RemoteDownload: &common.RemoteDownload{ @@ -2246,51 +2014,15 @@ func TestDPUFunctions_AggressiveCoverage(t *testing.T) { ctx := context.Background() - // This exercises metadata creation and file processing for each DPU - _, err := HandleTransferToRemoteForDPU(ctx, req, idx, "localhost:99999") + // Test streaming version with metadata creation for each DPU index + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, idx) if err == nil { t.Fatal("Expected connection error") } - - // Also test streaming version - _, err2 := HandleTransferToRemoteForDPUStreaming(ctx, req, idx, "localhost:99999") - if err2 == nil { - t.Fatal("Expected connection error") - } }) } }) - t.Run("DPU file cleanup logic", func(t *testing.T) { - // Test the cleanup logic in DPU functions - content := []byte("cleanup logic test content") - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write(content) - })) - defer server.Close() - - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/cleanup_logic_test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: server.URL, - Protocol: common.RemoteDownload_HTTP, - }, - } - - ctx := context.Background() - - // This should exercise the defer cleanup logic - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "localhost:99999") - if err == nil { - t.Fatal("Expected connection error") - } - - // The cleanup logic should have removed the temp file - // We exercise the cleanup code path but can't test exact match - // because the filename includes timestamp - }) - t.Run("DPU streaming chunk processing", func(t *testing.T) { // Test streaming with specific chunk sizes to exercise the streaming loop chunkSizes := []int{1024, 4096, 16384, 65536} // Different chunk sizes @@ -2310,6 +2042,13 @@ func TestDPUFunctions_AggressiveCoverage(t *testing.T) { })) defer server.Close() + // Mock dpuproxy.GetDPUConnection to return error + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + return nil, fmt.Errorf("mock DPU connection failure") + }) + req := &gnoi_file_pb.TransferToRemoteRequest{ LocalPath: fmt.Sprintf("/tmp/chunk_test_%d.bin", i), RemoteDownload: &common.RemoteDownload{ @@ -2321,7 +2060,7 @@ func TestDPUFunctions_AggressiveCoverage(t *testing.T) { ctx := context.Background() // This exercises the streaming and chunking logic - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:99999") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected connection error") } @@ -2455,7 +2194,7 @@ func TestHandleTransferToRemote_DPU_Routing(t *testing.T) { // Mock HandleTransferToRemoteForDPUStreaming to succeed patches.ApplyFunc(HandleTransferToRemoteForDPUStreaming, - func(ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest, dpuIndex string, dpuAddr string) (*gnoi_file_pb.TransferToRemoteResponse, error) { + func(ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest, dpuIndex string) (*gnoi_file_pb.TransferToRemoteResponse, error) { return &gnoi_file_pb.TransferToRemoteResponse{}, nil }) diff --git a/pkg/interceptors/dpuproxy/proxy.go b/pkg/interceptors/dpuproxy/proxy.go index 5f450437..0645dd1b 100644 --- a/pkg/interceptors/dpuproxy/proxy.go +++ b/pkg/interceptors/dpuproxy/proxy.go @@ -75,6 +75,22 @@ var defaultForwardableMethods = []ForwardableMethod{ }, } +var defaultProxy *DPUProxy + +// SetDefaultProxy registers the DPU proxy singleton for use by handlers +// that need direct DPU connections (e.g., TransferToRemote file operations). +func SetDefaultProxy(p *DPUProxy) { defaultProxy = p } + +// GetDPUConnection returns a cached gRPC connection to the specified DPU. +// It resolves DPU info via Redis and establishes/reuses connections. +// The returned connection is cached — callers must NOT close it. +func GetDPUConnection(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + if defaultProxy == nil { + return nil, fmt.Errorf("DPU proxy not initialized") + } + return defaultProxy.GetDPUConnection(ctx, dpuIndex) +} + // DPUProxy is a gRPC interceptor that routes requests to DPU targets based on metadata. // It examines incoming gRPC metadata for x-sonic-ss-target-type and x-sonic-ss-target-index // headers and routes requests accordingly. @@ -108,6 +124,22 @@ func NewDPUProxy(resolver *DPUResolver) *DPUProxy { } } +// GetDPUConnection resolves DPU info and returns a cached gRPC connection to the DPU. +// The returned connection is cached — callers must NOT close it. +func (p *DPUProxy) GetDPUConnection(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + if p.resolver == nil { + return nil, fmt.Errorf("resolver not available") + } + dpuInfo, err := p.resolver.GetDPUInfo(ctx, dpuIndex) + if err != nil { + return nil, err + } + if !dpuInfo.Reachable { + return nil, status.Errorf(codes.Unavailable, "DPU%s not reachable", dpuIndex) + } + return p.getConnection(ctx, dpuInfo.Index, dpuInfo.IPAddress, dpuInfo.GNMIPortsToTry) +} + // getForwardingMode checks if a method is registered and returns its forwarding mode. // Returns the ForwardingMode and a boolean indicating if the method was found. func (p *DPUProxy) getForwardingMode(method string) (ForwardingMode, bool) { diff --git a/pkg/interceptors/setup.go b/pkg/interceptors/setup.go index 61c6c414..49a945c5 100644 --- a/pkg/interceptors/setup.go +++ b/pkg/interceptors/setup.go @@ -25,6 +25,7 @@ func NewServerChain() (*ServerChain, error) { // Create DPU resolver and proxy dpuResolver := dpuproxy.NewDPUResolver(stateRedisAdapter, configRedisAdapter) dpuProxy := dpuproxy.NewDPUProxy(dpuResolver) + dpuproxy.SetDefaultProxy(dpuProxy) // Create interceptor chain with DPU proxy chain := NewChain(dpuProxy)