diff --git a/testing/e2e/agent_download.go b/testing/e2e/agent_download.go new file mode 100644 index 0000000000..3a6da288bd --- /dev/null +++ b/testing/e2e/agent_download.go @@ -0,0 +1,198 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build e2e && !requirefips + +package e2e + +import ( + "context" + "crypto/sha512" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "runtime" + "strings" + "testing" +) + +// SearchResp is the response body for the artifacts search API +type SearchResp struct { + Packages map[string]Artifact `json:"packages"` +} + +// Artifact describes an elastic artifact available through the API. +type Artifact struct { + URL string `json:"url"` + //SHAURL string `json:"sha_url"` // Unused + //Type string `json:"type"` // Unused + //Architecture string `json:"architecture"` // Unused +} + +// agentCacheDir returns the directory used to cache downloaded elastic-agent archives. +func agentCacheDir() string { + return filepath.Join(os.TempDir(), "fleet-server-e2e") +} + +// downloadElasticAgent searches the artifacts API for the snapshot version +// specified by ELASTICSEARCH_VERSION and returns a ReadCloser for the +// elastic-agent archive matching the current OS and architecture. +// +// The archive is cached on disk. The remote .sha512 file is fetched first; if +// it matches the cached file's checksum the download is skipped. +func downloadElasticAgent(ctx context.Context, t *testing.T, client *http.Client) io.ReadCloser { + t.Helper() + // Use version associated with latest DRA instead of fleet-server's version to avoid breaking on fleet-server version bumps + draVersion, ok := os.LookupEnv("ELASTICSEARCH_VERSION") + if !ok || draVersion == "" { + t.Fatal("ELASTICSEARCH_VERSION is not set") + } + draSplit := strings.Split(draVersion, "-") + if len(draSplit) == 3 { + draVersion = draSplit[0] + "-" + draSplit[2] // remove hash + } else if len(draSplit) > 3 { + t.Fatalf("Unsupported ELASTICSEARCH_VERSION format, expected 3 segments got: %s", draVersion) + } + t.Logf("Using ELASTICSEARCH_VERSION=%s for agent download", draVersion) + + req, err := http.NewRequestWithContext(ctx, "GET", "https://artifacts-api.elastic.co/v1/search/"+draVersion, nil) + if err != nil { + t.Fatalf("failed to create search request: %v", err) + } + resp, err := client.Do(req) + if err != nil { + t.Fatalf("failed to query artifacts API: %v", err) + } + + var body SearchResp + err = json.NewDecoder(resp.Body).Decode(&body) + resp.Body.Close() + if err != nil { + t.Fatalf("failed to decode artifacts API response: %v", err) + } + + fType := "tar.gz" + if runtime.GOOS == "windows" { + fType = "zip" + } + arch := runtime.GOARCH + if arch == "amd64" { + arch = "x86_64" + } + if arch == "arm64" && runtime.GOOS == "darwin" { + arch = "aarch64" + } + + fileName := fmt.Sprintf("elastic-agent-%s-%s-%s.%s", draVersion, runtime.GOOS, arch, fType) + pkg, ok := body.Packages[fileName] + if !ok { + t.Fatalf("unable to find package download for fileName=%s", fileName) + } + + cacheDir := agentCacheDir() + if err := os.MkdirAll(cacheDir, 0755); err != nil { + t.Fatalf("failed to create cache dir: %v", err) + } + cachePath := filepath.Join(cacheDir, fileName) + + // Fetch the remote SHA512 checksum (small file, always fetched). + remoteSHA := fetchRemoteSHA512(ctx, t, client, pkg.URL+".sha512") + + // If the cached file exists and matches, use it directly. + if localSHA, err := sha512OfFile(cachePath); err == nil && strings.EqualFold(localSHA, remoteSHA) { + t.Logf("Using cached elastic-agent from %s", cachePath) + f, err := os.Open(cachePath) + if err != nil { + t.Fatalf("failed to open cached elastic-agent: %v", err) + } + return f + } + + // Download to a temp file first so a partial download never poisons the cache. + t.Logf("Downloading elastic-agent from %s", pkg.URL) + tmp, err := os.CreateTemp(cacheDir, fileName+".tmp-*") + if err != nil { + t.Fatalf("failed to create temp file for download: %v", err) + } + tmpName := tmp.Name() + + req, err = http.NewRequestWithContext(ctx, "GET", pkg.URL, nil) + if err != nil { + tmp.Close() + os.Remove(tmpName) + t.Fatalf("failed to create download request: %v", err) + } + downloadResp, err := client.Do(req) + if err != nil { + tmp.Close() + os.Remove(tmpName) + t.Fatalf("failed to download elastic-agent: %v", err) + } + defer downloadResp.Body.Close() + + h := sha512.New() + if _, err := io.Copy(tmp, io.TeeReader(downloadResp.Body, h)); err != nil { + tmp.Close() + os.Remove(tmpName) + t.Fatalf("failed to write elastic-agent download: %v", err) + } + tmp.Close() + + // Verify the downloaded file's checksum before caching. + downloadedSHA := hex.EncodeToString(h.Sum(nil)) + if !strings.EqualFold(downloadedSHA, remoteSHA) { + os.Remove(tmpName) + t.Fatalf("elastic-agent checksum mismatch: got %s, want %s", downloadedSHA, remoteSHA) + } + + if err := os.Rename(tmpName, cachePath); err != nil { + os.Remove(tmpName) + t.Fatalf("failed to move downloaded file to cache: %v", err) + } + + f, err := os.Open(cachePath) + if err != nil { + t.Fatalf("failed to open cached elastic-agent after download: %v", err) + } + return f +} + +// fetchRemoteSHA512 downloads the .sha512 file at url and returns the hex checksum. +// The .sha512 file format is " " (sha512sum output), so only the +// first whitespace-delimited field is returned. +func fetchRemoteSHA512(ctx context.Context, t *testing.T, client *http.Client, url string) string { + t.Helper() + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + t.Fatalf("failed to create sha512 request: %v", err) + } + resp, err := client.Do(req) + if err != nil { + t.Fatalf("failed to fetch sha512 file: %v", err) + } + defer resp.Body.Close() + data, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("failed to read sha512 file: %v", err) + } + return strings.Fields(string(data))[0] +} + +// sha512OfFile returns the hex-encoded SHA-512 checksum of the file at path. +func sha512OfFile(path string) (string, error) { + f, err := os.Open(path) + if err != nil { + return "", err + } + defer f.Close() + h := sha512.New() + if _, err := io.Copy(h, f); err != nil { + return "", err + } + return hex.EncodeToString(h.Sum(nil)), nil +} diff --git a/testing/e2e/agent_install_test.go b/testing/e2e/agent_install_test.go index ee07c7114b..1d2174b598 100644 --- a/testing/e2e/agent_install_test.go +++ b/testing/e2e/agent_install_test.go @@ -12,12 +12,10 @@ import ( "bytes" "compress/gzip" "context" - "encoding/json" "errors" "fmt" "html/template" "io" - "net/http" "os" "os/exec" "path/filepath" @@ -44,19 +42,6 @@ type AgentInstallSuite struct { } -// SearchResp is the response body for the artifacts search API -type SearchResp struct { - Packages map[string]Artifact `json:"packages"` -} - -// Artifact describes an elastic artifact available through the API. -type Artifact struct { - URL string `json:"url"` - //SHAURL string `json:"sha_url"` // Unused - //Type string `json:"type"` // Unused - //Architecture string `json:"architecture"` // Unused -} - func TestAgentInstallSuite(t *testing.T) { suite.Run(t, new(AgentInstallSuite)) } @@ -94,7 +79,7 @@ func (suite *AgentInstallSuite) SetupSuite() { defer cancel() // use artifacts API to download snapshot - rc := suite.downloadAgent(ctx) + rc := downloadElasticAgent(ctx, suite.T(), suite.Client) defer rc.Close() // Unarchive download in temp dir @@ -114,58 +99,6 @@ func (suite *AgentInstallSuite) SetupSuite() { suite.T().Log("Setup complete.") } -// downloadAgent will search the artifacts repo for the latest snapshot and return the stream to the download for the current OS + ARCH. -func (suite *AgentInstallSuite) downloadAgent(ctx context.Context) io.ReadCloser { - suite.T().Helper() - // Use version associated with latest DRA instead of fleet-server's version to avoid breaking on fleet-server version bumps - draVersion, ok := os.LookupEnv("ELASTICSEARCH_VERSION") - if !ok || draVersion == "" { - suite.T().Fatal("ELASTICSEARCH_VERSION is not set") - } - draSplit := strings.Split(draVersion, "-") - if len(draSplit) == 3 { - draVersion = draSplit[0] + "-" + draSplit[2] // remove hash - } else if len(draSplit) > 3 { - suite.T().Fatalf("Unsupported ELASTICSEARCH_VERSION format, expected 3 segments got: %s", draVersion) - } - suite.T().Logf("Using ELASTICSARCH_VERSION=%s", draVersion) - - req, err := http.NewRequestWithContext(ctx, "GET", "https://artifacts-api.elastic.co/v1/search/"+draVersion, nil) - suite.Require().NoError(err) - - resp, err := suite.Client.Do(req) - suite.Require().NoError(err) - - var body SearchResp - err = json.NewDecoder(resp.Body).Decode(&body) - resp.Body.Close() - suite.Require().NoError(err) - - fType := "tar.gz" - if runtime.GOOS == "windows" { - fType = "zip" - } - - arch := runtime.GOARCH - if arch == "amd64" { - arch = "x86_64" - } - if arch == "arm64" && runtime.GOOS == "darwin" { - arch = "aarch64" - } - - fileName := fmt.Sprintf("elastic-agent-%s-%s-%s.%s", draVersion, runtime.GOOS, arch, fType) - pkg, ok := body.Packages[fileName] - suite.Require().Truef(ok, "unable to find package download for fileName = %s", fileName) - - req, err = http.NewRequestWithContext(ctx, "GET", pkg.URL, nil) - suite.Require().NoError(err) - resp, err = suite.Client.Do(req) - suite.Require().NoError(err) - suite.T().Logf("Downloading elastic-agent from %s", pkg.URL) - return resp.Body -} - // extractZip treats the passed Reader as a zip stream and unarchives it to a temp dir // fleet-server binary in archive is replaced by a locally compiled version func (suite *AgentInstallSuite) extractZip(r io.Reader) { diff --git a/testing/e2e/stand_alone_test.go b/testing/e2e/stand_alone_test.go index ce1e3c4934..914220a02d 100644 --- a/testing/e2e/stand_alone_test.go +++ b/testing/e2e/stand_alone_test.go @@ -581,13 +581,13 @@ func (suite *StandAloneSuite) TestAPMInstrumentation() { cmd.Wait() } -// TestOpAMP ensures that the OpAMP endpoint in Fleet Server works as expected by installing -// an OTel Collector, configuring it with the OpAMP extension, and having it connect to Fleet -// Server using OpAMP, and verifying that Fleet Server responds to this request with an HTTP -// 200 OK status response. -func (suite *StandAloneSuite) TestOpAMP() { - // Create a config file from a template in the test temp dir - dir := suite.T().TempDir() +// startFleetServerForOpAMP creates the fleet-server config from stand-alone-opamp.tpl, +// starts the fleet-server binary, waits for it to be healthy, fetches an enrollment token +// for "dummy-policy", and enrolls a dummy agent (to ensure .fleet-agents exists before any +// OpAMP collector connects). It returns the enrollment API key. Fleet-server is stopped when +// ctx is cancelled; the caller owns the context lifetime. +func (suite *StandAloneSuite) startFleetServerForOpAMP(ctx context.Context, dir, staticTokenKey string) string { + suite.T().Helper() tpl, err := template.ParseFiles(filepath.Join("testdata", "stand-alone-opamp.tpl")) suite.Require().NoError(err) f, err := os.Create(filepath.Join(dir, "config.yml")) @@ -595,46 +595,68 @@ func (suite *StandAloneSuite) TestOpAMP() { err = tpl.Execute(f, map[string]interface{}{ "Hosts": suite.ESHosts, "ServiceToken": suite.ServiceToken, - "StaticTokenKey": "opamp-e2e-test-key", + "StaticTokenKey": staticTokenKey, }) f.Close() suite.Require().NoError(err) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - defer cancel() - - // Run the fleet-server binary cmd := exec.CommandContext(ctx, suite.binaryPath, "-c", filepath.Join(dir, "config.yml")) - cmd.Cancel = func() error { - return cmd.Process.Signal(syscall.SIGTERM) - } + cmd.Cancel = func() error { return cmd.Process.Signal(syscall.SIGTERM) } cmd.Env = []string{"GOCOVERDIR=" + suite.CoverPath} err = cmd.Start() suite.Require().NoError(err) - defer cmd.Wait() + suite.T().Cleanup(func() { cmd.Wait() }) suite.FleetServerStatusOK(ctx, "http://localhost:8220") apiKey := suite.GetEnrollmentTokenForPolicyID(ctx, "dummy-policy") + tester := api_version.NewClientAPITesterCurrent(suite.Scaffold, "http://localhost:8220", apiKey) + tester.Enroll(ctx, apiKey) + return apiKey +} + +// writeOpAMPCollectorConfig renders otelcol-opamp.tpl into dir/configFile and returns +// the full path to the written file. +func (suite *StandAloneSuite) writeOpAMPCollectorConfig(dir, configFile, instanceUID, apiKey string) string { + suite.T().Helper() + tpl, err := template.ParseFiles(filepath.Join("testdata", "otelcol-opamp.tpl")) + suite.Require().NoError(err) + path := filepath.Join(dir, configFile) + f, err := os.Create(path) + suite.Require().NoError(err) + err = tpl.Execute(f, map[string]interface{}{ + "OpAMP": map[string]string{ + "InstanceUID": instanceUID, + "APIKey": apiKey, + }, + }) + f.Close() + suite.Require().NoError(err) + return path +} + +// TestOpAMP ensures that the OpAMP endpoint in Fleet Server works as expected by installing +// an OTel Collector, configuring it with the OpAMP extension, and having it connect to Fleet +// Server using OpAMP, and verifying that Fleet Server responds to this request with an HTTP +// 200 OK status response. +func (suite *StandAloneSuite) TestOpAMPWithUpstreamCollector() { + dir := suite.T().TempDir() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() - // Make sure the OpAMP endpoint works. + apiKey := suite.startFleetServerForOpAMP(ctx, dir, "opamp-e2e-test-key") + + // Make sure the OpAMP endpoint works before proceeding to build the collector. req, err := http.NewRequestWithContext(ctx, "POST", "http://localhost:8220/v1/opamp", nil) suite.Require().NoError(err) req.Header.Set("Authorization", "ApiKey "+apiKey) req.Header.Set("Content-Type", "application/x-protobuf") - resp, err := suite.Client.Do(req) suite.Require().NoError(err) resp.Body.Close() suite.Require().Equal(http.StatusOK, resp.StatusCode) - // Enroll a dummy agent to initialize the .fleet-agents index before the OTel Collector connects. - // Without this, findEnrolledAgent fails with index_not_found_exception when the OTel Collector - // sends its first AgentToServer message, because .fleet-agents doesn't exist yet in a fresh - // standalone fleet-server environment (unlike agent-managed fleet-server which self-enrolls). - tester := api_version.NewClientAPITesterCurrent(suite.Scaffold, "http://localhost:8220", apiKey) - tester.Enroll(ctx, apiKey) - // Clone OTel Collector contrib repository (shallow clone of main branch) cloneDir := filepath.Join(dir, "opentelemetry-collector-contrib") suite.T().Logf("Cloning opentelemetry-collector-contrib (main) to %s", cloneDir) @@ -650,7 +672,10 @@ func (suite *StandAloneSuite) TestOpAMP() { suite.Require().NoError(err) // Build the OTel Collector binary + // The make target outputs to bin/ which must exist first. suite.T().Log("Building otelcol-contrib binary via make otelcontribcol") + err = os.MkdirAll(filepath.Join(cloneDir, "bin"), 0755) + suite.Require().NoError(err) makeCmd := exec.CommandContext(ctx, "make", "otelcontribcol") makeCmd.Dir = cloneDir makeCmd.Stdout = os.Stdout @@ -667,22 +692,11 @@ func (suite *StandAloneSuite) TestOpAMP() { // Configure it with the OpAMP extension instanceUID := "019b8d7a-2da8-7657-b52d-492a9de33319" suite.T().Logf("Configuring OTel Collector with OpAMP extension (instanceUID=%s)", instanceUID) - tpl, err = template.ParseFiles(filepath.Join("testdata", "otelcol-opamp.tpl")) - suite.Require().NoError(err) - f, err = os.Create(filepath.Join(dir, "otelcol.yml")) - suite.Require().NoError(err) - err = tpl.Execute(f, map[string]interface{}{ - "OpAMP": map[string]string{ - "InstanceUID": instanceUID, - "APIKey": apiKey, - }, - }) - f.Close() - suite.Require().NoError(err) + collectorConfig := suite.writeOpAMPCollectorConfig(dir, "otelcol.yml", instanceUID, apiKey) // Start OTel Collector suite.T().Log("Starting OTel Collector") - otelCmd := exec.CommandContext(ctx, otelBinaryPath, "--config", filepath.Join(dir, "otelcol.yml")) + otelCmd := exec.CommandContext(ctx, otelBinaryPath, "--config", collectorConfig) otelCmd.Cancel = func() error { return otelCmd.Process.Signal(syscall.SIGTERM) } @@ -707,3 +721,88 @@ func (suite *StandAloneSuite) TestOpAMP() { suite.Equal(1, agentDoc.Revision, "expected policy_revision_idx to be 1") suite.Contains(agentDoc.Tags, "otelcontribcol", "expected tags to contain otelcontribcol") } + +// TestOpAMPWithEDOTCollector ensures that the EDOT Collector can connect to Fleet Server +// over OpAMP and enroll as an agent in the .fleet-agents index. +func (suite *StandAloneSuite) TestOpAMPWithEDOTCollector() { + dir := suite.T().TempDir() + + // Download and extract the full Elastic Agent package before starting the timed + // portion of the test. The archive is cached on disk after the first run so this + // is fast on subsequent runs; extracting everything ensures all components + // (e.g. elastic-otel-collector) needed by elastic-agent otel are present. + suite.T().Log("Downloading Elastic Agent package") + agentExtractDir := filepath.Join(dir, "elastic-agent-package") + suite.Require().NoError(os.MkdirAll(agentExtractDir, 0755)) + downloadCtx, downloadCancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer downloadCancel() + rc := downloadElasticAgent(downloadCtx, suite.T(), suite.Client) + paths := extractAgentArchive(suite.T(), rc, agentExtractDir, nil, nil) + rc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + apiKey := suite.startFleetServerForOpAMP(ctx, dir, "edot-opamp-e2e-test-key") + + agentBinaryPath, ok := paths[agentName] + suite.Require().Truef(ok, "elastic-agent binary %q not found in package", agentName) + suite.T().Logf("Found elastic-agent binary at %s", agentBinaryPath) + + instanceUID := "029c9e8b-3eb9-8768-c63e-593b0ef44430" + suite.T().Logf("Configuring EDOT Collector with OpAMP extension (instanceUID=%s)", instanceUID) + collectorConfig := suite.writeOpAMPCollectorConfig(dir, "edot-otelcol.yml", instanceUID, apiKey) + + // Start the EDOT Collector via `elastic-agent otel` + suite.T().Log("Starting EDOT Collector via elastic-agent otel") + edotOutputFile, err := os.CreateTemp(dir, "edot-output-*.log") + suite.Require().NoError(err) + edotCmd := exec.CommandContext(ctx, agentBinaryPath, "otel", "--config", collectorConfig) + edotCmd.Cancel = func() error { + return edotCmd.Process.Signal(syscall.SIGTERM) + } + edotCmd.Stdout = edotOutputFile + edotCmd.Stderr = edotOutputFile + err = edotCmd.Start() + suite.Require().NoError(err) + // processExited owns the single Wait() call on edotCmd. + processExited := make(chan error, 1) + go func() { processExited <- edotCmd.Wait() }() + suite.T().Cleanup(func() { + // Wait for the process to exit (context cancellation will have killed it) + // before closing the output file and reading it. The 30s fallback handles + // the case where the early-exit path already consumed processExited. + select { + case <-processExited: + case <-time.After(30 * time.Second): + } + edotOutputFile.Close() + if out, readErr := os.ReadFile(edotOutputFile.Name()); readErr == nil { + suite.T().Logf("EDOT Collector output:\n%s", string(out)) + } + }) + // Detect immediate exit — if the process dies within 5s it's a startup failure. + select { + case exitErr := <-processExited: + edotOutputFile.Close() + if out, readErr := os.ReadFile(edotOutputFile.Name()); readErr == nil { + suite.T().Logf("EDOT Collector output:\n%s", string(out)) + } + suite.Require().NoError(exitErr, "EDOT Collector exited prematurely") + return + case <-time.After(5 * time.Second): + // Process is still running after 5s — proceed + } + + // Verify that the EDOT Collector was enrolled in Fleet by fetching its document from + // .fleet-agents and asserting on its contents. + suite.T().Logf("Waiting for EDOT agent %s to appear in .fleet-agents", instanceUID) + agentDoc := suite.WaitForAgentDoc(ctx, instanceUID) + + suite.Equal(instanceUID, agentDoc.Agent.ID, "expected agent.id to match instanceUID") + suite.Equal("OPAMP", agentDoc.Type, "expected type to be OPAMP") + suite.Equal("elastic-otel-collector", agentDoc.Agent.Type, "expected agent.type to be elastic-otel-collector") + suite.NotEmpty(agentDoc.Agent.Version, "expected agent.version to be set") + suite.Contains(agentDoc.Tags, "elastic-otel-collector", "expected tags to contain elastic-otel-collector") + suite.Equal(1, agentDoc.Revision, "expected policy_revision_idx to be 1") +}