From 2ebc4dc4583fe0461c4363590a9f1fdea201f126 Mon Sep 17 00:00:00 2001 From: Predrag Knezevic Date: Mon, 18 May 2026 18:04:47 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=8C=B1=20Deduplicate=20metrics=20service?= =?UTF-8?q?=20lookup=20and=20port-forward=20in=20e2e=20steps?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SendMetricsRequest and withMetricsPortForward duplicated service endpoint discovery and port-forward lifecycle code. Extract shared helpers (getMetricsService, metricsPort, portForward) and rewrite both callers to use them. Port-forward cleanup is deferred to avoid process leaks if waitFor panics. Co-Authored-By: Claude Opus 4.6 --- test/e2e/steps/steps.go | 83 +++++++++++++------------------------ test/e2e/steps/tls_steps.go | 72 ++++++++++++++++++++++---------- 2 files changed, 80 insertions(+), 75 deletions(-) diff --git a/test/e2e/steps/steps.go b/test/e2e/steps/steps.go index 08b5a17a48..a38332f58a 100644 --- a/test/e2e/steps/steps.go +++ b/test/e2e/steps/steps.go @@ -8,7 +8,6 @@ import ( "encoding/json" "fmt" "io" - "net" "net/http" "os" "os/exec" @@ -1394,36 +1393,24 @@ func httpGet(url string, token string) (*http.Response, error) { return resp, nil } -func randomAvailablePort() (int, error) { - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return 0, err - } - defer l.Close() - return l.Addr().(*net.TCPAddr).Port, nil -} - // SendMetricsRequest sets up port-forwarding to the controller's service pods and waits for the metrics endpoint // to return a successful response. Stores the response body per pod in the scenario context. Polls with timeout. func SendMetricsRequest(ctx context.Context, serviceAccount string, endpoint string, controllerName string) error { sc := scenarioCtx(ctx) - serviceNs, err := k8sClient("get", "service", "-A", "-o", fmt.Sprintf(`jsonpath={.items[?(@.metadata.name=="%s-service")].metadata.namespace}`, controllerName)) + svc, err := getMetricsService(controllerName) if err != nil { return err } - v, err := k8sClient("get", "service", "-n", serviceNs, fmt.Sprintf("%s-service", controllerName), "-o", "json") + mPort, err := metricsPort(svc) if err != nil { return err } - var service corev1.Service - if err := json.Unmarshal([]byte(v), &service); err != nil { - return err - } - podNameCmd := []string{"get", "pod", "-n", olmNamespace, "-o", "jsonpath={.items}"} - for k, v := range service.Spec.Selector { + + podNameCmd := []string{"get", "pod", "-n", svc.Namespace, "-o", "jsonpath={.items}"} + for k, v := range svc.Spec.Selector { podNameCmd = append(podNameCmd, fmt.Sprintf("--selector=%s=%s", k, v)) } - v, err = k8sClient(podNameCmd...) + v, err := k8sClient(podNameCmd...) if err != nil { return err } @@ -1436,51 +1423,39 @@ func SendMetricsRequest(ctx context.Context, serviceAccount string, endpoint str if err != nil { return err } - var metricsPort int32 - for _, p := range service.Spec.Ports { - if p.Name == "metrics" { - metricsPort = p.Port - break - } - } + sc.metricsResponse = make(map[string]string) for _, p := range pods { - port, err := randomAvailablePort() - if err != nil { - return err - } - portForwardCmd := exec.Command(k8sCli, "port-forward", "-n", p.Namespace, fmt.Sprintf("pod/%s", p.Name), fmt.Sprintf("%d:%d", port, metricsPort)) //nolint:gosec // perfectly safe to start port-forwarder for provided controller name - logger.V(1).Info("starting port-forward", "command", strings.Join(portForwardCmd.Args, " ")) - if err := portForwardCmd.Start(); err != nil { - logger.Error(err, fmt.Sprintf("failed to start port-forward for pod %s", p.Name)) - return err - } - waitFor(ctx, func() bool { - resp, err := httpGet(fmt.Sprintf("https://localhost:%d%s", port, endpoint), token) + if err := func() error { + addr, cleanup, err := portForward(p.Namespace, fmt.Sprintf("pod/%s", p.Name), mPort) if err != nil { - return false + return err } - defer resp.Body.Close() + defer cleanup() + waitFor(ctx, func() bool { + resp, err := httpGet(fmt.Sprintf("https://%s%s", addr, endpoint), token) + if err != nil { + return false + } + defer resp.Body.Close() - if resp.StatusCode == http.StatusOK { + if resp.StatusCode == http.StatusOK { + b, err := io.ReadAll(resp.Body) + if err != nil { + return false + } + sc.metricsResponse[p.Name] = string(b) + return true + } b, err := io.ReadAll(resp.Body) if err != nil { return false } - sc.metricsResponse[p.Name] = string(b) - return true - } - b, err := io.ReadAll(resp.Body) - if err != nil { + logger.V(1).Info("failed to get metrics", "pod", p.Name, "response", string(b)) return false - } - logger.V(1).Info("failed to get metrics", "pod", p.Name, "response", string(b)) - return false - }) - if err := portForwardCmd.Process.Kill(); err != nil { - return err - } - if _, err := portForwardCmd.Process.Wait(); err != nil { + }) + return nil + }(); err != nil { return err } } diff --git a/test/e2e/steps/tls_steps.go b/test/e2e/steps/tls_steps.go index 3842c9eacc..4fad45d83e 100644 --- a/test/e2e/steps/tls_steps.go +++ b/test/e2e/steps/tls_steps.go @@ -53,65 +53,95 @@ var curveIDByName = map[string]tls.CurveID{ "secp521r1": tls.CurveP521, } -// getMetricsServiceEndpoint returns the namespace and metrics port for the named component service. -func getMetricsServiceEndpoint(component string) (string, int32, error) { +// getMetricsService returns the full Service object for the named component's metrics service. +// The namespace is available via svc.Namespace. +func getMetricsService(component string) (*corev1.Service, error) { serviceName := fmt.Sprintf("%s-service", component) serviceNs, err := k8sClient("get", "service", "-A", "-o", fmt.Sprintf(`jsonpath={.items[?(@.metadata.name=="%s")].metadata.namespace}`, serviceName)) if err != nil { - return "", 0, fmt.Errorf("failed to find namespace for service %s: %w", serviceName, err) + return nil, fmt.Errorf("failed to find namespace for service %s: %w", serviceName, err) } serviceNs = strings.TrimSpace(serviceNs) if serviceNs == "" { - return "", 0, fmt.Errorf("service %s not found in any namespace", serviceName) + return nil, fmt.Errorf("service %s not found in any namespace", serviceName) } raw, err := k8sClient("get", "service", "-n", serviceNs, serviceName, "-o", "json") if err != nil { - return "", 0, fmt.Errorf("failed to get service %s: %w", serviceName, err) + return nil, fmt.Errorf("failed to get service %s: %w", serviceName, err) } var svc corev1.Service if err := json.Unmarshal([]byte(raw), &svc); err != nil { - return "", 0, fmt.Errorf("failed to unmarshal service %s: %w", serviceName, err) + return nil, fmt.Errorf("failed to unmarshal service %s: %w", serviceName, err) } + return &svc, nil +} + +// metricsPort returns the port number of the port named "metrics" on the given service. +func metricsPort(svc *corev1.Service) (int32, error) { for _, p := range svc.Spec.Ports { if p.Name == "metrics" { - return serviceNs, p.Port, nil + return p.Port, nil } } - return "", 0, fmt.Errorf("no port named 'metrics' found on service %s", serviceName) + return 0, fmt.Errorf("no port named 'metrics' found on service %s", svc.Name) } -// withMetricsPortForward starts a kubectl port-forward to the component's metrics service, -// waits until a basic TLS connection succeeds (confirming the port-forward is ready), -// then calls fn with the local address. The port-forward is torn down when fn returns. -func withMetricsPortForward(ctx context.Context, component string, fn func(addr string) error) error { - ns, metricsPort, err := getMetricsServiceEndpoint(component) +func randomAvailablePort() (int, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { - return err + return 0, err } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port, nil +} +// portForward starts a kubectl port-forward to target (e.g. "service/foo" or "pod/bar") +// in the given namespace, mapping a random local port to remotePort. It returns the +// local address and a cleanup function. The caller is responsible for calling cleanup. +func portForward(ns, target string, remotePort int32) (string, func(), error) { localPort, err := randomAvailablePort() if err != nil { - return fmt.Errorf("failed to find a free local port: %w", err) + return "", nil, fmt.Errorf("failed to find a free local port: %w", err) } - serviceName := fmt.Sprintf("%s-service", component) pfCmd := exec.Command(k8sCli, "port-forward", "-n", ns, //nolint:gosec - fmt.Sprintf("service/%s", serviceName), - fmt.Sprintf("%d:%d", localPort, metricsPort)) + target, fmt.Sprintf("%d:%d", localPort, remotePort)) pfCmd.Env = append(os.Environ(), fmt.Sprintf("KUBECONFIG=%s", kubeconfigPath)) if err := pfCmd.Start(); err != nil { - return fmt.Errorf("failed to start port-forward to %s: %w", serviceName, err) + return "", nil, fmt.Errorf("failed to start port-forward to %s: %w", target, err) } - defer func() { + + cleanup := func() { if p := pfCmd.Process; p != nil { _ = p.Kill() _ = pfCmd.Wait() } - }() + } addr := fmt.Sprintf("127.0.0.1:%d", localPort) + return addr, cleanup, nil +} + +// withMetricsPortForward starts a kubectl port-forward to the component's metrics service, +// waits until a basic TLS connection succeeds (confirming the port-forward is ready), +// then calls fn with the local address. The port-forward is torn down when fn returns. +func withMetricsPortForward(ctx context.Context, component string, fn func(addr string) error) error { + svc, err := getMetricsService(component) + if err != nil { + return err + } + port, err := metricsPort(svc) + if err != nil { + return err + } + + addr, cleanup, err := portForward(svc.Namespace, fmt.Sprintf("service/%s", svc.Name), port) + if err != nil { + return err + } + defer cleanup() // Wait until the port-forward is accepting connections. A plain TLS dial (no version // restrictions) serves as the readiness probe; any successful TLS handshake confirms