diff --git a/gnmi_server/gnoi_file_test.go b/gnmi_server/gnoi_file_test.go index bfdc7f38..17c314ac 100644 --- a/gnmi_server/gnoi_file_test.go +++ b/gnmi_server/gnoi_file_test.go @@ -240,7 +240,7 @@ func TestGnoiFileServer(t *testing.T) { // Mock HandleTransferToRemoteForDPUStreaming to succeed patches.ApplyFunc(gnoifile.HandleTransferToRemoteForDPUStreaming, - func(ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest, dpuIndex string, dpuAddr string) (*gnoi_file_pb.TransferToRemoteResponse, error) { + func(ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest, dpuIndex string) (*gnoi_file_pb.TransferToRemoteResponse, error) { return &gnoi_file_pb.TransferToRemoteResponse{}, nil }) diff --git a/pkg/gnoi/file/dpu_monkey_test.go b/pkg/gnoi/file/dpu_monkey_test.go index 3895f2c6..7ff35aac 100644 --- a/pkg/gnoi/file/dpu_monkey_test.go +++ b/pkg/gnoi/file/dpu_monkey_test.go @@ -2,18 +2,16 @@ package file import ( "context" - "crypto/md5" "io" "os" "strings" "testing" - "time" "github.com/agiledragon/gomonkey/v2" "github.com/openconfig/gnoi/common" gnoi_file_pb "github.com/openconfig/gnoi/file" - "github.com/openconfig/gnoi/types" "github.com/sonic-net/sonic-gnmi/internal/download" + "github.com/sonic-net/sonic-gnmi/pkg/interceptors/dpuproxy" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -22,44 +20,19 @@ func TestDPU_SuccessPathMocking(t *testing.T) { patches := gomonkey.NewPatches() defer patches.Reset() - // 1. Mock HandleTransferToRemote to simulate successful download - patches.ApplyFunc(HandleTransferToRemote, func(ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest) (*gnoi_file_pb.TransferToRemoteResponse, error) { - // Create actual temp file so the rest of the logic can read it - testData := []byte("mock downloaded data") - if err := os.WriteFile(req.LocalPath, testData, 0644); err == nil { - // Also create the /mnt/host version to test container path logic - hostPath := "/mnt/host" + req.LocalPath - os.MkdirAll("/mnt/host/tmp", 0755) - os.WriteFile(hostPath, testData, 0644) - } - - hash := md5.Sum(testData) - return &gnoi_file_pb.TransferToRemoteResponse{ - Hash: &types.HashType{ - Method: types.HashType_MD5, - Hash: hash[:], - }, - }, nil - }) - - // 2. Mock grpc.Dial to return a connection, and mock its Close method - patches.ApplyFunc(grpc.Dial, func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + // 1. Mock dpuproxy.GetDPUConnection to return a connection + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { return &grpc.ClientConn{}, nil }) - // Mock ClientConn.Close to avoid nil pointer panic - patches.ApplyMethod(&grpc.ClientConn{}, "Close", func(*grpc.ClientConn) error { - return nil - }) - - // 3. Mock gnoi_file_pb.NewFileClient - patches.ApplyFunc(gnoi_file_pb.NewFileClient, func(cc grpc.ClientConnInterface) gnoi_file_pb.FileClient { + // 2. Mock newFileClient package variable (avoids inlining issues with gomonkey) + patches.ApplyGlobalVar(&newFileClient, func(cc grpc.ClientConnInterface) gnoi_file_pb.FileClient { return &mockSuccessFileClient{} }) - // 4. Mock os.Remove to always succeed (for cleanup testing) - patches.ApplyFunc(os.Remove, func(name string) error { - return nil + // 3. Mock the download package's DownloadHTTPStreaming + patches.ApplyFunc(download.DownloadHTTPStreaming, func(ctx context.Context, url string, maxFileSize int64) (io.ReadCloser, int64, error) { + return io.NopCloser(strings.NewReader("streaming test data")), 18, nil }) req := &gnoi_file_pb.TransferToRemoteRequest{ @@ -70,57 +43,30 @@ func TestDPU_SuccessPathMocking(t *testing.T) { }, } - // Test HandleTransferToRemoteForDPU success path - resp, err := HandleTransferToRemoteForDPU(context.Background(), req, "dpu1", "localhost:8080") - if err != nil { - t.Logf("DPU function returned error: %v", err) - } else { - t.Logf("DPU function success: %v", resp) - } - // Test HandleTransferToRemoteForDPUStreaming success path - resp2, err2 := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "dpu2", "localhost:8080") - if err2 != nil { - t.Logf("DPU streaming returned error: %v", err2) + resp, err := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "dpu2") + if err != nil { + t.Logf("DPU streaming returned error: %v", err) } else { - t.Logf("DPU streaming success: %v", resp2) + t.Logf("DPU streaming success: %v", resp) } } func TestDPU_ContainerPathBranches(t *testing.T) { - // Test specifically to hit container path logic without causing infinite recursion patches := gomonkey.NewPatches() defer patches.Reset() - // Mock HandleTransferToRemote to succeed and create files - patches.ApplyFunc(HandleTransferToRemote, func(ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest) (*gnoi_file_pb.TransferToRemoteResponse, error) { - testData := []byte("container test data") - hash := md5.Sum(testData) - return &gnoi_file_pb.TransferToRemoteResponse{ - Hash: &types.HashType{Hash: hash[:]}, - }, nil - }) - - // Mock os.ReadFile to return success (simulate file exists) - patches.ApplyFunc(os.ReadFile, func(name string) ([]byte, error) { - return []byte("mock file content"), nil - }) - - // Mock grpc.Dial and related functions to succeed but fail at Send - patches.ApplyFunc(grpc.Dial, func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + // Mock dpuproxy.GetDPUConnection + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { return &grpc.ClientConn{}, nil }) - patches.ApplyMethod(&grpc.ClientConn{}, "Close", func(*grpc.ClientConn) error { - return nil - }) - - patches.ApplyFunc(gnoi_file_pb.NewFileClient, func(cc grpc.ClientConnInterface) gnoi_file_pb.FileClient { + patches.ApplyGlobalVar(&newFileClient, func(cc grpc.ClientConnInterface) gnoi_file_pb.FileClient { return &mockFailureFileClient{} }) - patches.ApplyFunc(os.Remove, func(name string) error { - return nil + patches.ApplyFunc(download.DownloadHTTPStreaming, func(ctx context.Context, url string, maxFileSize int64) (io.ReadCloser, int64, error) { + return io.NopCloser(strings.NewReader("container test data")), 18, nil }) req := &gnoi_file_pb.TransferToRemoteRequest{ @@ -132,20 +78,23 @@ func TestDPU_ContainerPathBranches(t *testing.T) { } // This should exercise the DPU path with mocked components - _, err := HandleTransferToRemoteForDPU(context.Background(), req, "container-test", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "container-test") t.Logf("Container path test result: %v", err) } func TestDPU_ErrorPaths(t *testing.T) { - // Test various error conditions to get better coverage patches := gomonkey.NewPatches() defer patches.Reset() - // Test with HandleTransferToRemote failure - patches.ApplyFunc(HandleTransferToRemote, func(ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest) (*gnoi_file_pb.TransferToRemoteResponse, error) { + // Mock dpuproxy.GetDPUConnection to fail + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { return nil, os.ErrNotExist }) + patches.ApplyFunc(download.DownloadHTTPStreaming, func(ctx context.Context, url string, maxFileSize int64) (io.ReadCloser, int64, error) { + return io.NopCloser(strings.NewReader("error test data")), 14, nil + }) + req := &gnoi_file_pb.TransferToRemoteRequest{ LocalPath: "/tmp/error_test.bin", RemoteDownload: &common.RemoteDownload{ @@ -154,32 +103,26 @@ func TestDPU_ErrorPaths(t *testing.T) { }, } - _, err := HandleTransferToRemoteForDPU(context.Background(), req, "error-test", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "error-test") t.Logf("Error path test result: %v", err) } func TestDPU_StreamingSuccess(t *testing.T) { - // Test the streaming DPU function to increase coverage patches := gomonkey.NewPatches() defer patches.Reset() // Mock the download package's DownloadHTTPStreaming patches.ApplyFunc(download.DownloadHTTPStreaming, func(ctx context.Context, url string, maxFileSize int64) (io.ReadCloser, int64, error) { - // Return a mock reader that simulates HTTP response body return io.NopCloser(strings.NewReader("streaming test data")), 18, nil }) - // Mock grpc.Dial - patches.ApplyFunc(grpc.Dial, func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + // Mock dpuproxy.GetDPUConnection + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { return &grpc.ClientConn{}, nil }) - patches.ApplyMethod(&grpc.ClientConn{}, "Close", func(*grpc.ClientConn) error { - return nil - }) - // Mock file client to return success - patches.ApplyFunc(gnoi_file_pb.NewFileClient, func(cc grpc.ClientConnInterface) gnoi_file_pb.FileClient { + patches.ApplyGlobalVar(&newFileClient, func(cc grpc.ClientConnInterface) gnoi_file_pb.FileClient { return &mockSuccessFileClient{} }) @@ -191,7 +134,7 @@ func TestDPU_StreamingSuccess(t *testing.T) { }, } - resp, err := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "stream-test", "localhost:8080") + resp, err := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "stream-test") if err != nil { t.Logf("Streaming test returned error: %v", err) } else { @@ -200,7 +143,6 @@ func TestDPU_StreamingSuccess(t *testing.T) { } func TestDPU_StreamingError(t *testing.T) { - // Test streaming function with DownloadHTTPStreaming failure patches := gomonkey.NewPatches() defer patches.Reset() @@ -216,27 +158,11 @@ func TestDPU_StreamingError(t *testing.T) { }, } - _, err := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "stream-error-test", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(context.Background(), req, "stream-error-test") t.Logf("Streaming error test result: %v", err) } // Mock implementations -type mockFileInfo struct { - name string - isDir bool -} - -func (m *mockFileInfo) Name() string { return m.name } -func (m *mockFileInfo) Size() int64 { return 0 } -func (m *mockFileInfo) Mode() os.FileMode { - if m.isDir { - return os.ModeDir - } - return 0644 -} -func (m *mockFileInfo) ModTime() time.Time { return time.Now() } -func (m *mockFileInfo) IsDir() bool { return m.isDir } -func (m *mockFileInfo) Sys() interface{} { return nil } type mockSuccessFileClient struct{} diff --git a/pkg/gnoi/file/file.go b/pkg/gnoi/file/file.go index b6b09866..0eab32c9 100644 --- a/pkg/gnoi/file/file.go +++ b/pkg/gnoi/file/file.go @@ -20,9 +20,8 @@ import ( "github.com/openconfig/gnoi/types" "github.com/sonic-net/sonic-gnmi/internal/download" "github.com/sonic-net/sonic-gnmi/internal/hash" - "google.golang.org/grpc" + "github.com/sonic-net/sonic-gnmi/pkg/interceptors/dpuproxy" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -35,6 +34,10 @@ const ( maxFileSize = 4 * 1024 * 1024 * 1024 // 4GB in bytes ) +// newFileClient wraps gnoi_file_pb.NewFileClient to allow test patching +// (the generated function is tiny and gets inlined, defeating gomonkey). +var newFileClient = gnoi_file_pb.NewFileClient + // HandleTransferToRemote implements the complete logic for the TransferToRemote RPC. // It validates the request, checks for DPU metadata, and routes accordingly. // @@ -68,7 +71,7 @@ func HandleTransferToRemote( // If DPU headers are present, handle DPU transfer logic using efficient streaming if targetType == "dpu" && targetIndex != "" { log.Infof("[TransferToRemote] DPU routing detected: target-type=%s, target-index=%s", targetType, targetIndex) - return HandleTransferToRemoteForDPUStreaming(ctx, req, targetIndex, "localhost:8080") + return HandleTransferToRemoteForDPUStreaming(ctx, req, targetIndex) } } @@ -359,160 +362,16 @@ func HandlePut(stream gnoi_file_pb.File_PutServer) error { return stream.SendAndClose(&gnoi_file_pb.PutResponse{}) } -// HandleTransferToRemoteForDPU handles TransferToRemote when DPU headers are present. -// It downloads the file to NPU first, then uploads it to the specified DPU using File.Put. -// -// This function implements the complete DPU transfer workflow: -// - Downloads file to a temporary location on NPU -// - Reads the downloaded file with container path translation -// - Creates a gRPC connection with DPU routing metadata -// - Uploads the file to DPU using File.Put streaming RPC -// - Cleans up temporary files -// -// Parameters: -// - ctx: Context for the operation -// - req: Original TransferToRemoteRequest with target DPU path -// - dpuIndex: Target DPU index (e.g., "0", "1") -// - proxyAddress: Address of the gNOI proxy server (e.g., "localhost:8080") -// -// Returns: -// - TransferToRemoteResponse with MD5 hash on success -// - Error with appropriate gRPC status code on failure -func HandleTransferToRemoteForDPU( - ctx context.Context, - req *gnoi_file_pb.TransferToRemoteRequest, - dpuIndex string, - proxyAddress string, -) (*gnoi_file_pb.TransferToRemoteResponse, error) { - // Validate inputs - if req == nil { - return nil, status.Error(codes.InvalidArgument, "request cannot be nil") - } - if dpuIndex == "" { - return nil, status.Error(codes.InvalidArgument, "dpuIndex cannot be empty") - } - if proxyAddress == "" { - return nil, status.Error(codes.InvalidArgument, "proxyAddress cannot be empty") - } - - // Step 1: Download file to NPU temp location - tempPath := fmt.Sprintf("/tmp/dpu_transfer_%s_%d", dpuIndex, time.Now().UnixNano()) - - // Create a modified request with temp path - npuReq := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: tempPath, - RemoteDownload: req.GetRemoteDownload(), - } - - // Download to NPU - resp, err := HandleTransferToRemote(ctx, npuReq) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to download to NPU: %v", err) - } - - // Ensure cleanup of temp file (use the same path translation as reading) - defer func() { - cleanupPath := tempPath - if _, err := os.Stat("/mnt/host"); err == nil { - cleanupPath = "/mnt/host" + tempPath - } - if err := os.Remove(cleanupPath); err != nil { - // Log warning but don't fail the operation - } - }() - - // Step 2: Read the downloaded file (apply container path translation) - // The file was downloaded to host filesystem via HandleTransferToRemote, so we need to read from /mnt/host prefix - translatedTempPath := tempPath - if _, err := os.Stat("/mnt/host"); err == nil { - translatedTempPath = "/mnt/host" + tempPath - } - - fileData, err := os.ReadFile(translatedTempPath) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to read downloaded file: %v", err) - } - - // Step 3: Connect to DPU via proxy - // Create metadata for DPU routing - md := metadata.New(map[string]string{ - "x-sonic-ss-target-type": "dpu", - "x-sonic-ss-target-index": dpuIndex, - }) - outCtx := metadata.NewOutgoingContext(ctx, md) - - // Connect to local gNOI server which will proxy to DPU - conn, err := grpc.Dial(proxyAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to connect to proxy: %v", err) - } - defer conn.Close() - - fileClient := gnoi_file_pb.NewFileClient(conn) - - // Step 4: Use File.Put to upload to DPU - putClient, err := fileClient.Put(outCtx) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to create Put client: %v", err) - } - - // Send Open request - openReq := &gnoi_file_pb.PutRequest{ - Request: &gnoi_file_pb.PutRequest_Open{ - Open: &gnoi_file_pb.PutRequest_Details{ - RemoteFile: req.GetLocalPath(), // Use original target path - Permissions: 0644, - }, - }, - } - if err := putClient.Send(openReq); err != nil { - return nil, status.Errorf(codes.Internal, "failed to send open request: %v", err) - } - - // Send file contents in chunks - chunkSize := 64 * 1024 // 64KB chunks - for i := 0; i < len(fileData); i += chunkSize { - end := i + chunkSize - if end > len(fileData) { - end = len(fileData) - } - - contentReq := &gnoi_file_pb.PutRequest{ - Request: &gnoi_file_pb.PutRequest_Contents{ - Contents: fileData[i:end], - }, - } - if err := putClient.Send(contentReq); err != nil { - return nil, status.Errorf(codes.Internal, "failed to send content: %v", err) - } - } - - // Send hash (reuse the hash from download response) - hashReq := &gnoi_file_pb.PutRequest{ - Request: &gnoi_file_pb.PutRequest_Hash{ - Hash: resp.GetHash(), - }, - } - if err := putClient.Send(hashReq); err != nil { - return nil, status.Errorf(codes.Internal, "failed to send hash: %v", err) - } - - // Close and get response - if _, err := putClient.CloseAndRecv(); err != nil { - return nil, status.Errorf(codes.Internal, "failed to complete Put to DPU: %v", err) - } - - return resp, nil -} - // HandleTransferToRemoteForDPUStreaming implements efficient streaming proxy for DPU file transfers. // This function streams data directly from HTTP source to DPU without intermediate disk storage // or loading the entire file into memory. It calculates MD5 hash concurrently during streaming. +// +// The DPU connection is obtained directly via dpuproxy.GetDPUConnection, which returns a cached +// connection managed by the DPU proxy infrastructure. Callers must NOT close the connection. func HandleTransferToRemoteForDPUStreaming( ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest, dpuIndex string, - proxyAddress string, ) (*gnoi_file_pb.TransferToRemoteResponse, error) { // Validate inputs if req == nil { @@ -521,9 +380,6 @@ func HandleTransferToRemoteForDPUStreaming( if dpuIndex == "" { return nil, status.Error(codes.InvalidArgument, "dpuIndex cannot be empty") } - if proxyAddress == "" { - return nil, status.Error(codes.InvalidArgument, "proxyAddress cannot be empty") - } remoteDownload := req.GetRemoteDownload() if remoteDownload == nil { @@ -558,23 +414,16 @@ func HandleTransferToRemoteForDPUStreaming( } defer httpStream.Close() - // Step 2: Connect to DPU via proxy - md := metadata.New(map[string]string{ - "x-sonic-ss-target-type": "dpu", - "x-sonic-ss-target-index": dpuIndex, - }) - outCtx := metadata.NewOutgoingContext(streamCtx, md) - - conn, err := grpc.Dial(proxyAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + // Step 2: Get direct connection to DPU (cached, do NOT close) + conn, err := dpuproxy.GetDPUConnection(streamCtx, dpuIndex) if err != nil { - return nil, status.Errorf(codes.Internal, "failed to connect to proxy: %v", err) + return nil, status.Errorf(codes.Internal, "failed to get DPU connection: %v", err) } - defer conn.Close() - fileClient := gnoi_file_pb.NewFileClient(conn) + fileClient := newFileClient(conn) // Step 3: Create DPU Put stream - putClient, err := fileClient.Put(outCtx) + putClient, err := fileClient.Put(streamCtx) if err != nil { return nil, status.Errorf(codes.Internal, "failed to create Put client: %v", err) } diff --git a/pkg/gnoi/file/file_test.go b/pkg/gnoi/file/file_test.go index 064b2a7a..e5916f59 100644 --- a/pkg/gnoi/file/file_test.go +++ b/pkg/gnoi/file/file_test.go @@ -19,6 +19,8 @@ import ( gnoi_file_pb "github.com/openconfig/gnoi/file" "github.com/openconfig/gnoi/types" "github.com/sonic-net/sonic-gnmi/internal/download" + "github.com/sonic-net/sonic-gnmi/pkg/interceptors/dpuproxy" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -984,116 +986,9 @@ func TestHandlePut_InvalidMessageType(t *testing.T) { } } -func TestHandleTransferToRemoteForDPU_NilRequest(t *testing.T) { - ctx := context.Background() - _, err := HandleTransferToRemoteForDPU(ctx, nil, "0", "localhost:8080") - if err == nil { - t.Fatal("Expected error for nil request, got nil") - } - - st, ok := status.FromError(err) - if !ok || st.Code() != codes.InvalidArgument { - t.Errorf("Expected InvalidArgument error, got %v", err) - } - - if !strings.Contains(st.Message(), "request cannot be nil") { - t.Errorf("Error message = %q, want substring 'request cannot be nil'", st.Message()) - } -} - -func TestHandleTransferToRemoteForDPU_EmptyDpuIndex(t *testing.T) { - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: "http://example.com/file", - Protocol: common.RemoteDownload_HTTP, - }, - } - _, err := HandleTransferToRemoteForDPU(ctx, req, "", "localhost:8080") - if err == nil { - t.Fatal("Expected error for empty dpuIndex, got nil") - } - - st, ok := status.FromError(err) - if !ok || st.Code() != codes.InvalidArgument { - t.Errorf("Expected InvalidArgument error, got %v", err) - } - - if !strings.Contains(st.Message(), "dpuIndex cannot be empty") { - t.Errorf("Error message = %q, want substring 'dpuIndex cannot be empty'", st.Message()) - } -} - -func TestHandleTransferToRemoteForDPU_EmptyProxyAddress(t *testing.T) { - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: "http://example.com/file", - Protocol: common.RemoteDownload_HTTP, - }, - } - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "") - if err == nil { - t.Fatal("Expected error for empty proxyAddress, got nil") - } - - st, ok := status.FromError(err) - if !ok || st.Code() != codes.InvalidArgument { - t.Errorf("Expected InvalidArgument error, got %v", err) - } - - if !strings.Contains(st.Message(), "proxyAddress cannot be empty") { - t.Errorf("Error message = %q, want substring 'proxyAddress cannot be empty'", st.Message()) - } -} - -func TestHandleTransferToRemoteForDPU_InvalidRemoteDownload(t *testing.T) { - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/test.bin", - // Missing RemoteDownload - } - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "localhost:8080") - if err == nil { - t.Fatal("Expected error for missing remote download, got nil") - } - - // The error should come from HandleTransferToRemote validation - st, ok := status.FromError(err) - if !ok || st.Code() != codes.Internal { - t.Errorf("Expected Internal error (from failed download), got %v", err) - } -} - -func TestHandleTransferToRemoteForDPU_DownloadFailure(t *testing.T) { - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: "http://invalid-url-that-does-not-exist.example", - Protocol: common.RemoteDownload_HTTP, - }, - } - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "localhost:8080") - if err == nil { - t.Fatal("Expected error for download failure, got nil") - } - - st, ok := status.FromError(err) - if !ok || st.Code() != codes.Internal { - t.Errorf("Expected Internal error, got %v", err) - } - - if !strings.Contains(st.Message(), "failed to download to NPU") { - t.Errorf("Error message = %q, want substring 'failed to download to NPU'", st.Message()) - } -} - func TestHandleTransferToRemoteForDPUStreaming_NilRequest(t *testing.T) { ctx := context.Background() - _, err := HandleTransferToRemoteForDPUStreaming(ctx, nil, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, nil, "0") if err == nil { t.Fatal("Expected error for nil request, got nil") } @@ -1117,7 +1012,7 @@ func TestHandleTransferToRemoteForDPUStreaming_EmptyDpuIndex(t *testing.T) { Protocol: common.RemoteDownload_HTTP, }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "") if err == nil { t.Fatal("Expected error for empty dpuIndex, got nil") } @@ -1132,37 +1027,13 @@ func TestHandleTransferToRemoteForDPUStreaming_EmptyDpuIndex(t *testing.T) { } } -func TestHandleTransferToRemoteForDPUStreaming_EmptyProxyAddress(t *testing.T) { - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: "http://example.com/file", - Protocol: common.RemoteDownload_HTTP, - }, - } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "") - if err == nil { - t.Fatal("Expected error for empty proxyAddress, got nil") - } - - st, ok := status.FromError(err) - if !ok || st.Code() != codes.InvalidArgument { - t.Errorf("Expected InvalidArgument error, got %v", err) - } - - if !strings.Contains(st.Message(), "proxyAddress cannot be empty") { - t.Errorf("Error message = %q, want substring 'proxyAddress cannot be empty'", st.Message()) - } -} - func TestHandleTransferToRemoteForDPUStreaming_NilRemoteDownload(t *testing.T) { ctx := context.Background() req := &gnoi_file_pb.TransferToRemoteRequest{ LocalPath: "/tmp/test.bin", // Missing RemoteDownload } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected error for nil remote download, got nil") } @@ -1186,7 +1057,7 @@ func TestHandleTransferToRemoteForDPUStreaming_EmptyLocalPath(t *testing.T) { Protocol: common.RemoteDownload_HTTP, }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected error for empty local path, got nil") } @@ -1210,7 +1081,7 @@ func TestHandleTransferToRemoteForDPUStreaming_UnsupportedProtocol(t *testing.T) Protocol: common.RemoteDownload_HTTPS, // Unsupported }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected error for unsupported protocol, got nil") } @@ -1234,7 +1105,7 @@ func TestHandleTransferToRemoteForDPUStreaming_EmptyURL(t *testing.T) { Protocol: common.RemoteDownload_HTTP, }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected error for empty URL, got nil") } @@ -1258,7 +1129,7 @@ func TestHandleTransferToRemoteForDPUStreaming_InvalidURL(t *testing.T) { Protocol: common.RemoteDownload_HTTP, }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected error for invalid URL, got nil") } @@ -1284,7 +1155,7 @@ func TestHandleTransferToRemoteForDPUStreaming_ContextCancellation(t *testing.T) Protocol: common.RemoteDownload_HTTP, }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected error for cancelled context, got nil") } @@ -1296,83 +1167,10 @@ func TestHandleTransferToRemoteForDPUStreaming_ContextCancellation(t *testing.T) } } -func TestHandleTransferToRemoteForDPU_SuccessPath(t *testing.T) { - // Test successful DPU transfer with mock server - testContent := []byte("test DPU firmware content") - httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write(testContent) - })) - defer httpServer.Close() - - // Create a mock gRPC server to simulate DPU proxy - grpcServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Simulate successful gRPC response - w.WriteHeader(http.StatusOK) - w.Write([]byte("OK")) - })) - defer grpcServer.Close() - - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/dpu_test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: httpServer.URL, - Protocol: common.RemoteDownload_HTTP, - }, - } - - // This will fail at the gRPC connection step, but we test the HTTP download part - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "invalid-proxy:8080") - if err == nil { - t.Fatal("Expected error due to invalid proxy address, got nil") - } - - // Verify the error is from the proxy connection, not from earlier validation - st, ok := status.FromError(err) - if !ok || st.Code() != codes.Internal { - t.Errorf("Expected Internal error from proxy connection, got %v", err) - } - - if !strings.Contains(st.Message(), "failed to create Put client") && !strings.Contains(st.Message(), "failed to connect to proxy") { - t.Errorf("Error message = %q, want substring about connection failure", st.Message()) - } -} - -func TestHandleTransferToRemoteForDPU_FileReadFailure(t *testing.T) { - // Test scenario where file download succeeds but reading fails - testContent := []byte("test content") - httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write(testContent) - })) - defer httpServer.Close() - - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/read_fail_test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: httpServer.URL, - Protocol: common.RemoteDownload_HTTP, - }, - } - - // This test exercises the file download and read path - // It will fail at gRPC connection but exercises more code - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "127.0.0.1:99999") // Invalid port - if err == nil { - t.Fatal("Expected error, got nil") - } - - // Should be an internal error from proxy connection failure - st, ok := status.FromError(err) - if !ok || st.Code() != codes.Internal { - t.Errorf("Expected Internal error, got %v", err) - } -} - func TestHandleTransferToRemoteForDPUStreaming_SuccessPath(t *testing.T) { // Test successful streaming transfer with chunked data + // HTTP streaming works, but DPU connection is mocked to return error + // to validate the HTTP streaming part before the DPU connection step. testContent := make([]byte, 200*1024) // 200KB test data for i := range testContent { testContent[i] = byte(i % 256) @@ -1394,6 +1192,14 @@ func TestHandleTransferToRemoteForDPUStreaming_SuccessPath(t *testing.T) { })) defer httpServer.Close() + // Mock dpuproxy.GetDPUConnection to return error so the test validates + // HTTP streaming works before the DPU connection step fails. + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + return nil, fmt.Errorf("mock DPU connection failure") + }) + ctx := context.Background() req := &gnoi_file_pb.TransferToRemoteRequest{ LocalPath: "/tmp/streaming_test.bin", @@ -1403,21 +1209,17 @@ func TestHandleTransferToRemoteForDPUStreaming_SuccessPath(t *testing.T) { }, } - // This will fail at gRPC connection but exercises the HTTP streaming and hash calculation - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "invalid-proxy:8080") + // This will fail at DPU connection but exercises the HTTP streaming and hash calculation + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { - t.Fatal("Expected error due to invalid proxy, got nil") + t.Fatal("Expected error due to DPU connection failure, got nil") } - // Should fail at proxy connection step + // Should fail at DPU connection step st, ok := status.FromError(err) if !ok || st.Code() != codes.Internal { t.Errorf("Expected Internal error, got %v", err) } - - if !strings.Contains(st.Message(), "failed to create Put client") && !strings.Contains(st.Message(), "failed to connect to proxy") { - t.Errorf("Error message = %q, want substring about connection failure", st.Message()) - } } func TestHandleTransferToRemoteForDPUStreaming_NetworkError(t *testing.T) { @@ -1431,7 +1233,7 @@ func TestHandleTransferToRemoteForDPUStreaming_NetworkError(t *testing.T) { }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected error for network failure, got nil") } @@ -1469,7 +1271,7 @@ func TestHandleTransferToRemoteForDPUStreaming_TimeoutDuringStream(t *testing.T) }, } - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:8080") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected timeout error, got nil") } @@ -1795,38 +1597,6 @@ func TestTranslatePathForContainer_Coverage(t *testing.T) { } } -// Test coverage for DPU functions by creating scenario where HTTP succeeds but gRPC fails -func TestHandleTransferToRemoteForDPU_HTTPSuccessGRPCFail(t *testing.T) { - // Create HTTP server that works - testContent := []byte("DPU transfer test content with enough data to test the full download and read path") - httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write(testContent) - })) - defer httpServer.Close() - - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/dpu_http_success.bin", - RemoteDownload: &common.RemoteDownload{ - Path: httpServer.URL, - Protocol: common.RemoteDownload_HTTP, - }, - } - - // Use invalid proxy to force gRPC failure AFTER HTTP download succeeds - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "localhost:99999") - if err == nil { - t.Fatal("Expected error from gRPC connection failure") - } - - // Should be connection error to proxy - st, ok := status.FromError(err) - if !ok || st.Code() != codes.Internal { - t.Errorf("Expected Internal error, got %v", err) - } -} - func TestHandleTransferToRemoteForDPUStreaming_HTTPSuccessGRPCFail(t *testing.T) { // Create large HTTP content to test streaming path largeContent := make([]byte, 512*1024) // 512KB @@ -1850,6 +1620,13 @@ func TestHandleTransferToRemoteForDPUStreaming_HTTPSuccessGRPCFail(t *testing.T) })) defer httpServer.Close() + // Mock dpuproxy.GetDPUConnection to return error to force gRPC failure + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + return nil, fmt.Errorf("mock DPU connection failure") + }) + ctx := context.Background() req := &gnoi_file_pb.TransferToRemoteRequest{ LocalPath: "/tmp/dpu_streaming_test.bin", @@ -1859,13 +1636,13 @@ func TestHandleTransferToRemoteForDPUStreaming_HTTPSuccessGRPCFail(t *testing.T) }, } - // Use invalid proxy to force gRPC failure AFTER HTTP streaming succeeds - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:99999") + // DPU connection failure AFTER HTTP streaming succeeds + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { - t.Fatal("Expected error from gRPC connection failure") + t.Fatal("Expected error from DPU connection failure") } - // Should be connection error to proxy + // Should be connection error from DPU connection st, ok := status.FromError(err) if !ok || st.Code() != codes.Internal { t.Errorf("Expected Internal error, got %v", err) @@ -2120,35 +1897,6 @@ func TestValidatePath_HighCoverage(t *testing.T) { // Additional DPU function tests with more coverage func TestDPUFunctions_MoreCoverage(t *testing.T) { - t.Run("DPU with large file and connection failure", func(t *testing.T) { - // Test with larger file to exercise more of the DPU code paths - largeContent := make([]byte, 2*1024*1024) // 2MB - for i := range largeContent { - largeContent[i] = byte(i % 256) - } - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write(largeContent) - })) - defer server.Close() - - ctx := context.Background() - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/large_dpu_test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: server.URL, - Protocol: common.RemoteDownload_HTTP, - }, - } - - // This exercises more of the file reading logic before gRPC fails - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "127.0.0.1:99999") - if err == nil { - t.Fatal("Expected connection error") - } - }) - t.Run("DPU streaming with very large content", func(t *testing.T) { // Test streaming with large content to exercise chunk processing hugeContent := make([]byte, 4*1024*1024) // 4MB @@ -2181,8 +1929,15 @@ func TestDPUFunctions_MoreCoverage(t *testing.T) { }, } - // This exercises the streaming and hash calculation extensively before gRPC failure - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "127.0.0.1:99999") + // Mock dpuproxy.GetDPUConnection to return error + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + return nil, fmt.Errorf("mock DPU connection failure") + }) + + // This exercises the streaming and hash calculation extensively before DPU connection failure + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected connection error") } @@ -2194,7 +1949,7 @@ func TestDPUFunctions_AggressiveCoverage(t *testing.T) { // Test all the early validation and file handling paths extensively t.Run("DPU container path logic", func(t *testing.T) { - // Test the container path translation logic in DPU functions + // Test the container path translation logic in DPU streaming function testContent := []byte("container path test") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) @@ -2202,6 +1957,13 @@ func TestDPUFunctions_AggressiveCoverage(t *testing.T) { })) defer server.Close() + // Mock dpuproxy.GetDPUConnection to return error + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + return nil, fmt.Errorf("mock DPU connection failure") + }) + req := &gnoi_file_pb.TransferToRemoteRequest{ LocalPath: "/tmp/container_path_test.bin", RemoteDownload: &common.RemoteDownload{ @@ -2212,13 +1974,12 @@ func TestDPUFunctions_AggressiveCoverage(t *testing.T) { ctx := context.Background() - // Test both DPU functions to exercise container path logic - _, err1 := HandleTransferToRemoteForDPU(ctx, req, "0", "localhost:99999") - _, err2 := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:99999") + // Test streaming DPU function to exercise container path logic + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") - // Both should fail at gRPC connection but exercise file download/streaming - if err1 == nil || err2 == nil { - t.Fatal("Expected connection errors") + // Should fail at DPU connection but exercise file streaming + if err == nil { + t.Fatal("Expected connection error") } }) @@ -2236,6 +1997,13 @@ func TestDPUFunctions_AggressiveCoverage(t *testing.T) { })) defer server.Close() + // Mock dpuproxy.GetDPUConnection to return error + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + return nil, fmt.Errorf("mock DPU connection failure") + }) + req := &gnoi_file_pb.TransferToRemoteRequest{ LocalPath: fmt.Sprintf("/tmp/dpu_%s_test.bin", idx), RemoteDownload: &common.RemoteDownload{ @@ -2246,51 +2014,15 @@ func TestDPUFunctions_AggressiveCoverage(t *testing.T) { ctx := context.Background() - // This exercises metadata creation and file processing for each DPU - _, err := HandleTransferToRemoteForDPU(ctx, req, idx, "localhost:99999") + // Test streaming version with metadata creation for each DPU index + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, idx) if err == nil { t.Fatal("Expected connection error") } - - // Also test streaming version - _, err2 := HandleTransferToRemoteForDPUStreaming(ctx, req, idx, "localhost:99999") - if err2 == nil { - t.Fatal("Expected connection error") - } }) } }) - t.Run("DPU file cleanup logic", func(t *testing.T) { - // Test the cleanup logic in DPU functions - content := []byte("cleanup logic test content") - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write(content) - })) - defer server.Close() - - req := &gnoi_file_pb.TransferToRemoteRequest{ - LocalPath: "/tmp/cleanup_logic_test.bin", - RemoteDownload: &common.RemoteDownload{ - Path: server.URL, - Protocol: common.RemoteDownload_HTTP, - }, - } - - ctx := context.Background() - - // This should exercise the defer cleanup logic - _, err := HandleTransferToRemoteForDPU(ctx, req, "0", "localhost:99999") - if err == nil { - t.Fatal("Expected connection error") - } - - // The cleanup logic should have removed the temp file - // We exercise the cleanup code path but can't test exact match - // because the filename includes timestamp - }) - t.Run("DPU streaming chunk processing", func(t *testing.T) { // Test streaming with specific chunk sizes to exercise the streaming loop chunkSizes := []int{1024, 4096, 16384, 65536} // Different chunk sizes @@ -2310,6 +2042,13 @@ func TestDPUFunctions_AggressiveCoverage(t *testing.T) { })) defer server.Close() + // Mock dpuproxy.GetDPUConnection to return error + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(dpuproxy.GetDPUConnection, func(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + return nil, fmt.Errorf("mock DPU connection failure") + }) + req := &gnoi_file_pb.TransferToRemoteRequest{ LocalPath: fmt.Sprintf("/tmp/chunk_test_%d.bin", i), RemoteDownload: &common.RemoteDownload{ @@ -2321,7 +2060,7 @@ func TestDPUFunctions_AggressiveCoverage(t *testing.T) { ctx := context.Background() // This exercises the streaming and chunking logic - _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0", "localhost:99999") + _, err := HandleTransferToRemoteForDPUStreaming(ctx, req, "0") if err == nil { t.Fatal("Expected connection error") } @@ -2455,7 +2194,7 @@ func TestHandleTransferToRemote_DPU_Routing(t *testing.T) { // Mock HandleTransferToRemoteForDPUStreaming to succeed patches.ApplyFunc(HandleTransferToRemoteForDPUStreaming, - func(ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest, dpuIndex string, dpuAddr string) (*gnoi_file_pb.TransferToRemoteResponse, error) { + func(ctx context.Context, req *gnoi_file_pb.TransferToRemoteRequest, dpuIndex string) (*gnoi_file_pb.TransferToRemoteResponse, error) { return &gnoi_file_pb.TransferToRemoteResponse{}, nil }) diff --git a/pkg/interceptors/dpuproxy/proxy.go b/pkg/interceptors/dpuproxy/proxy.go index 5f450437..0645dd1b 100644 --- a/pkg/interceptors/dpuproxy/proxy.go +++ b/pkg/interceptors/dpuproxy/proxy.go @@ -75,6 +75,22 @@ var defaultForwardableMethods = []ForwardableMethod{ }, } +var defaultProxy *DPUProxy + +// SetDefaultProxy registers the DPU proxy singleton for use by handlers +// that need direct DPU connections (e.g., TransferToRemote file operations). +func SetDefaultProxy(p *DPUProxy) { defaultProxy = p } + +// GetDPUConnection returns a cached gRPC connection to the specified DPU. +// It resolves DPU info via Redis and establishes/reuses connections. +// The returned connection is cached — callers must NOT close it. +func GetDPUConnection(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + if defaultProxy == nil { + return nil, fmt.Errorf("DPU proxy not initialized") + } + return defaultProxy.GetDPUConnection(ctx, dpuIndex) +} + // DPUProxy is a gRPC interceptor that routes requests to DPU targets based on metadata. // It examines incoming gRPC metadata for x-sonic-ss-target-type and x-sonic-ss-target-index // headers and routes requests accordingly. @@ -108,6 +124,22 @@ func NewDPUProxy(resolver *DPUResolver) *DPUProxy { } } +// GetDPUConnection resolves DPU info and returns a cached gRPC connection to the DPU. +// The returned connection is cached — callers must NOT close it. +func (p *DPUProxy) GetDPUConnection(ctx context.Context, dpuIndex string) (*grpc.ClientConn, error) { + if p.resolver == nil { + return nil, fmt.Errorf("resolver not available") + } + dpuInfo, err := p.resolver.GetDPUInfo(ctx, dpuIndex) + if err != nil { + return nil, err + } + if !dpuInfo.Reachable { + return nil, status.Errorf(codes.Unavailable, "DPU%s not reachable", dpuIndex) + } + return p.getConnection(ctx, dpuInfo.Index, dpuInfo.IPAddress, dpuInfo.GNMIPortsToTry) +} + // getForwardingMode checks if a method is registered and returns its forwarding mode. // Returns the ForwardingMode and a boolean indicating if the method was found. func (p *DPUProxy) getForwardingMode(method string) (ForwardingMode, bool) { diff --git a/pkg/interceptors/setup.go b/pkg/interceptors/setup.go index 61c6c414..49a945c5 100644 --- a/pkg/interceptors/setup.go +++ b/pkg/interceptors/setup.go @@ -25,6 +25,7 @@ func NewServerChain() (*ServerChain, error) { // Create DPU resolver and proxy dpuResolver := dpuproxy.NewDPUResolver(stateRedisAdapter, configRedisAdapter) dpuProxy := dpuproxy.NewDPUProxy(dpuResolver) + dpuproxy.SetDefaultProxy(dpuProxy) // Create interceptor chain with DPU proxy chain := NewChain(dpuProxy)