diff --git a/.github/actions/run-tests/action.yml b/.github/actions/run-tests/action.yml index ec2acb5..b558675 100644 --- a/.github/actions/run-tests/action.yml +++ b/.github/actions/run-tests/action.yml @@ -172,25 +172,30 @@ runs: echo "AGYN_AGENT_IMAGE=$AGYN_AGENT_IMAGE" >> "$GITHUB_ENV" fi - required_init_envs=( - AGYN_AGENT_INIT_IMAGE - CODEX_INIT_IMAGE - AGN_INIT_IMAGE - CLAUDE_INIT_IMAGE - AGN_EXPOSE_INIT_IMAGE - ) - missing_init_envs=() - for env_name in "${required_init_envs[@]}"; do - env_value=${!env_name:-} - trimmed_value=$(echo "$env_value" | xargs) - if [ -z "$trimmed_value" ]; then - missing_init_envs+=("$env_name") - fi - done - if [ "${#missing_init_envs[@]}" -gt 0 ]; then - printf 'Missing required init image environment variables: %s\n' \ - "${missing_init_envs[*]}" >&2 - exit 1 + if [ -z "${CODEX_INIT_IMAGE:-}" ]; then + CODEX_INIT_IMAGE="ghcr.io/agynio/agent-init-codex:latest" + echo "CODEX_INIT_IMAGE=$CODEX_INIT_IMAGE" >> "$GITHUB_ENV" + fi + + if [ -z "${AGYN_AGENT_INIT_IMAGE:-}" ]; then + # No agynio/agent-init image exists; default to the codex init image. + AGYN_AGENT_INIT_IMAGE="$CODEX_INIT_IMAGE" + echo "AGYN_AGENT_INIT_IMAGE=$AGYN_AGENT_INIT_IMAGE" >> "$GITHUB_ENV" + fi + + if [ -z "${AGN_INIT_IMAGE:-}" ]; then + AGN_INIT_IMAGE="ghcr.io/agynio/agent-init-agn:latest" + echo "AGN_INIT_IMAGE=$AGN_INIT_IMAGE" >> "$GITHUB_ENV" + fi + + if [ -z "${CLAUDE_INIT_IMAGE:-}" ]; then + CLAUDE_INIT_IMAGE="ghcr.io/agynio/agent-init-claude:latest" + echo "CLAUDE_INIT_IMAGE=$CLAUDE_INIT_IMAGE" >> "$GITHUB_ENV" + fi + + if [ -z "${AGN_EXPOSE_INIT_IMAGE:-}" ]; then + AGN_EXPOSE_INIT_IMAGE="ghcr.io/agynio/agent-init-agn:latest" + echo "AGN_EXPOSE_INIT_IMAGE=$AGN_EXPOSE_INIT_IMAGE" >> "$GITHUB_ENV" fi if [ -z "${AGYN_MODEL_ID:-}" ]; then diff --git a/.github/workflows/agn-cli-e2e.yml b/.github/workflows/agn-cli-e2e.yml index 136de0e..45d0868 100644 --- a/.github/workflows/agn-cli-e2e.yml +++ b/.github/workflows/agn-cli-e2e.yml @@ -11,12 +11,6 @@ jobs: timeout-minutes: 60 permissions: contents: read - env: - AGYN_AGENT_INIT_IMAGE: ghcr.io/agynio/agent-init:v1.0.0 - CODEX_INIT_IMAGE: ghcr.io/agynio/agent-init-codex:0.13 - AGN_INIT_IMAGE: ghcr.io/agynio/agent-init-agn:0.4 - CLAUDE_INIT_IMAGE: ghcr.io/agynio/agent-init-claude:0.1 - AGN_EXPOSE_INIT_IMAGE: ghcr.io/agynio/agent-init-agn:0.4 steps: - name: Checkout uses: actions/checkout@v4 @@ -64,5 +58,5 @@ jobs: with: ref: ${{ github.sha }} tag: svc_agn_cli - include-smoke: false + include_smoke: false agn-binary: agn diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 80ca943..3db82ae 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -4,6 +4,17 @@ on: pull_request: push: branches: [main] + workflow_dispatch: + inputs: + tag: + description: "Suite tag to run (e.g. svc_reminders)" + required: false + type: string + include_smoke: + description: "Include smoke suites" + required: false + default: true + type: boolean concurrency: group: e2e-${{ github.event_name == 'pull_request' && github.event.pull_request.number || github.sha }} @@ -15,12 +26,6 @@ jobs: timeout-minutes: 60 permissions: contents: read - env: - AGYN_AGENT_INIT_IMAGE: ghcr.io/agynio/agent-init:v1.0.0 - CODEX_INIT_IMAGE: ghcr.io/agynio/agent-init-codex:0.13 - AGN_INIT_IMAGE: ghcr.io/agynio/agent-init-agn:0.4 - CLAUDE_INIT_IMAGE: ghcr.io/agynio/agent-init-claude:0.1 - AGN_EXPOSE_INIT_IMAGE: ghcr.io/agynio/agent-init-agn:0.4 steps: - name: Checkout uses: actions/checkout@v4 @@ -34,3 +39,5 @@ jobs: uses: ./.github/actions/run-tests with: ref: ${{ github.sha }} + tag: ${{ inputs.tag }} + include_smoke: ${{ inputs.include_smoke }} diff --git a/README.md b/README.md index 0eac7f1..d7f877e 100644 --- a/README.md +++ b/README.md @@ -19,14 +19,11 @@ Optional domain override: - `E2E_DOMAIN` -Full-chain tests require explicit init images: - -- `AGN_INIT_IMAGE` (agn, for example `ghcr.io/agynio/agent-init-agn:0.4`) -- `CODEX_INIT_IMAGE` (codex, for example `ghcr.io/agynio/agent-init-codex:0.13`) -- `CLAUDE_INIT_IMAGE` (claude, for example `ghcr.io/agynio/agent-init-claude:0.1`) -- `AGN_EXPOSE_INIT_IMAGE` (go-core expose tests, for example `ghcr.io/agynio/agent-init-agn:0.4`) - -For exact reproducibility, pin `*_INIT_IMAGE` to a patch tag +Full-chain tests use `AGN_INIT_IMAGE` (default `ghcr.io/agynio/agent-init-agn:latest`) for agn, +`CODEX_INIT_IMAGE` (default `ghcr.io/agynio/agent-init-codex:latest`) for codex, +`CLAUDE_INIT_IMAGE` (default `ghcr.io/agynio/agent-init-claude:latest`) for claude, +and `AGN_EXPOSE_INIT_IMAGE` (default `ghcr.io/agynio/agent-init-agn:latest`) for go-core expose. +For exact reproducibility, set `*_INIT_IMAGE` to a pinned patch tag (for example, `ghcr.io/agynio/agent-init-agn:0.4.15`) or an image digest (for example, `ghcr.io/agynio/agent-init-agn@sha256:`). diff --git a/suites/go-core/buf.gen.yaml b/suites/go-core/buf.gen.yaml index 95704d9..8945097 100644 --- a/suites/go-core/buf.gen.yaml +++ b/suites/go-core/buf.gen.yaml @@ -10,6 +10,7 @@ inputs: - module: buf.build/agynio/api paths: - agynio/api/agents/v1 + - agynio/api/apps/v1 - agynio/api/authorization/v1 - agynio/api/files/v1 - agynio/api/gateway/v1/agents.proto diff --git a/suites/go-core/suite.yaml b/suites/go-core/suite.yaml index 73c267e..f225bc0 100644 --- a/suites/go-core/suite.yaml +++ b/suites/go-core/suite.yaml @@ -8,7 +8,7 @@ required_env: - CODEX_INIT_IMAGE - AGN_EXPOSE_INIT_IMAGE select: | - suite_tags=("svc_agents_orchestrator" "svc_runners" "svc_metering" "svc_k8s_runner" "svc_organizations" "svc_files" "svc_gateway" "svc_media_proxy" "smoke") + suite_tags=("svc_agents_orchestrator" "svc_runners" "svc_metering" "svc_k8s_runner" "svc_organizations" "svc_files" "svc_gateway" "svc_reminders" "svc_media_proxy" "smoke") if [ -z "${TAGS:-}" ]; then echo "smoke" diff --git a/suites/go-core/tests/diagnostics_helpers_test.go b/suites/go-core/tests/diagnostics_helpers_test.go index 4d6c41e..f88ba60 100644 --- a/suites/go-core/tests/diagnostics_helpers_test.go +++ b/suites/go-core/tests/diagnostics_helpers_test.go @@ -1,4 +1,4 @@ -//go:build e2e && (svc_agents_orchestrator || svc_runners || smoke) +//go:build e2e && (svc_agents_orchestrator || svc_runners || svc_gateway || svc_reminders || smoke) package tests diff --git a/suites/go-core/tests/expose_test.go b/suites/go-core/tests/expose_test.go index 9743321..ef20c59 100644 --- a/suites/go-core/tests/expose_test.go +++ b/suites/go-core/tests/expose_test.go @@ -4,7 +4,12 @@ package tests import ( "context" + "encoding/json" "fmt" + "io" + "net/http" + "net/url" + "strings" "testing" "time" @@ -14,13 +19,223 @@ import ( runnerv1 "github.com/agynio/e2e/suites/go-core/.gen/go/agynio/api/runner/v1" threadsv1 "github.com/agynio/e2e/suites/go-core/.gen/go/agynio/api/threads/v1" usersv1 "github.com/agynio/e2e/suites/go-core/.gen/go/agynio/api/users/v1" + zitimgmtv1 "github.com/agynio/e2e/suites/go-core/.gen/go/agynio/api/ziti_management/v1" "github.com/google/uuid" + sdk "github.com/openziti/sdk-golang" + "github.com/openziti/sdk-golang/ziti" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + exposeTestTimeout = 8 * time.Minute + exposeReachabilityTimeout = 90 * time.Second + exposeRequestTimeout = 15 * time.Second + exposeZitiRequestTimeout = 30 * time.Second + exposePort = 3000 + exposeExpectedResponse = "Hi! How are you?" +) + +type exposeWorkloadFixture struct { + ctx context.Context + podName string + containerName string +} + +type exposeAddResponse struct { + ID string `json:"id"` + Port int `json:"port"` + URL string `json:"url"` + Status string `json:"status"` +} + func TestAgentExposeListExec(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 6*time.Minute) + fixture := setupExposeTestWorkload(t) + + execCtx, execCancel := context.WithTimeout(fixture.ctx, 2*time.Minute) + defer execCancel() + + result := execPodCommand(t, execCtx, workloadNamespace(t), fixture.podName, fixture.containerName, []string{"/agyn-bin/cli/agyn", "expose", "list"}) + if result.exitCode != 0 { + t.Fatalf("expected expose list exit code 0, got %d stdout=%q stderr=%q", result.exitCode, result.stdout, result.stderr) + } +} + +func TestAgentExposeAddReachable(t *testing.T) { + fixture := setupExposeTestWorkload(t) + + body := fmt.Sprintf("expose-e2e-%s", uuid.NewString()) + serveDir := "/tmp/expose-e2e" + serveScript := fmt.Sprintf( + "set -e; mkdir -p %[1]s; printf '%%s' \"$1\" > %[1]s/index.html; "+ + "busybox httpd -f -p %[2]d -h %[1]s >/tmp/expose-httpd.log 2>&1 & "+ + "pid=$!; i=0; while [ \"$i\" -lt 20 ]; do "+ + "if output=$(busybox wget -q -O - http://127.0.0.1:%[2]d/index.html); then "+ + "if [ \"$output\" = \"$1\" ]; then exit 0; fi; fi; "+ + "if ! kill -0 \"$pid\" 2>/dev/null; then echo \"httpd exited\"; "+ + "cat /tmp/expose-httpd.log 2>/dev/null; exit 1; fi; "+ + "i=$((i+1)); sleep 0.5; done; "+ + "echo \"httpd not ready\"; cat /tmp/expose-httpd.log 2>/dev/null; exit 1", + serveDir, + exposePort, + ) + serveCommand := []string{ + "sh", + "-c", + serveScript, + "expose-httpd", + body, + } + + serveCtx, serveCancel := context.WithTimeout(fixture.ctx, 2*time.Minute) + defer serveCancel() + serveResult := execPodCommand(t, serveCtx, workloadNamespace(t), fixture.podName, fixture.containerName, serveCommand) + if serveResult.exitCode != 0 { + t.Fatalf("start http server exit code %d stdout=%q stderr=%q", serveResult.exitCode, serveResult.stdout, serveResult.stderr) + } + + addCtx, addCancel := context.WithTimeout(fixture.ctx, 2*time.Minute) + defer addCancel() + addResult := execPodCommand(t, addCtx, workloadNamespace(t), fixture.podName, fixture.containerName, []string{"/agyn-bin/cli/agyn", "--output", "json", "expose", "add", fmt.Sprintf("%d", exposePort)}) + if addResult.exitCode != 0 { + t.Fatalf("expose add exit code %d stdout=%q stderr=%q", addResult.exitCode, addResult.stdout, addResult.stderr) + } + + addResponse := parseExposeAddResponse(t, addResult.stdout) + if addResponse.ID == "" { + t.Fatal("expose add missing id") + } + if addResponse.Port != exposePort { + t.Fatalf("expose add port mismatch: got %d want %d", addResponse.Port, exposePort) + } + expectedURL := fmt.Sprintf("http://exposed-%s.ziti:%d", addResponse.ID, exposePort) + if addResponse.URL != expectedURL { + t.Fatalf("expose add url mismatch: got %q want %q", addResponse.URL, expectedURL) + } + if addResponse.Status == "" { + t.Fatal("expose add status missing") + } + + t.Cleanup(func() { + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cleanupCancel() + removeResult := execPodCommand(t, cleanupCtx, workloadNamespace(t), fixture.podName, fixture.containerName, []string{"/agyn-bin/cli/agyn", "expose", "remove", fmt.Sprintf("%d", exposePort)}) + if removeResult.exitCode != 0 { + t.Logf("cleanup: expose remove exit code %d stdout=%q stderr=%q", removeResult.exitCode, removeResult.stdout, removeResult.stderr) + } + }) + + parsedURL, err := url.Parse(addResponse.URL) + if err != nil { + t.Fatalf("parse expose url: %v", err) + } + if parsedURL.Scheme != "http" { + t.Fatalf("expected expose url scheme http, got %q", parsedURL.Scheme) + } + if parsedURL.Port() != fmt.Sprintf("%d", exposePort) { + t.Fatalf("expected expose url port %d, got %q", exposePort, parsedURL.Port()) + } + host := strings.TrimSpace(parsedURL.Hostname()) + if host == "" { + t.Fatal("expose url host missing") + } + serviceName := strings.TrimSuffix(host, ".ziti") + if serviceName == host { + t.Fatalf("expected expose url host to end with .ziti, got %q", host) + } + serviceName = strings.TrimSpace(serviceName) + if serviceName == "" { + t.Fatal("expose service name missing") + } + + exposedURL := fmt.Sprintf("http://%s:%d/index.html", serviceName, exposePort) + + zitiConn := dialGRPC(t, zitiManagementAddr(t)) + zitiClient := zitimgmtv1.NewZitiManagementServiceClient(zitiConn) + + zitiCtx, zitiCancel := context.WithTimeout(context.Background(), exposeZitiRequestTimeout) + defer zitiCancel() + createResp, err := zitiClient.CreateAppIdentity(zitiCtx, &zitimgmtv1.CreateAppIdentityRequest{ + IdentityId: uuid.NewString(), + Slug: fmt.Sprintf("e2e-expose-%s", uuid.NewString()), + }) + if err != nil { + t.Fatalf("create ziti identity: %v", err) + } + if createResp == nil { + t.Fatal("create ziti identity: missing response") + } + zitiIdentityID := strings.TrimSpace(createResp.GetZitiIdentityId()) + if zitiIdentityID == "" { + t.Fatal("create ziti identity: missing id") + } + if len(createResp.GetIdentityJson()) == 0 { + t.Fatal("create ziti identity: missing identity json") + } + + t.Cleanup(func() { + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), exposeZitiRequestTimeout) + defer cleanupCancel() + if _, err := zitiClient.DeleteIdentity(cleanupCtx, &zitimgmtv1.DeleteIdentityRequest{ZitiIdentityId: zitiIdentityID}); err != nil { + t.Logf("cleanup: delete ziti identity %s: %v", zitiIdentityID, err) + } + }) + + zitiConfig := &ziti.Config{} + if err := json.Unmarshal(createResp.GetIdentityJson(), zitiConfig); err != nil { + t.Fatalf("parse ziti identity json: %v", err) + } + + zitiContext, err := ziti.NewContext(zitiConfig) + if err != nil { + t.Fatalf("create ziti context: %v", err) + } + t.Cleanup(func() { zitiContext.Close() }) + + httpClient := sdk.NewHttpClient(zitiContext, nil) + httpClient.Timeout = exposeRequestTimeout + + pollCtx, pollCancel := context.WithTimeout(fixture.ctx, exposeReachabilityTimeout) + defer pollCancel() + err = pollUntil(pollCtx, pollInterval, func(ctx context.Context) error { + requestCtx, requestCancel := context.WithTimeout(ctx, exposeRequestTimeout) + defer requestCancel() + + request, err := http.NewRequestWithContext(requestCtx, http.MethodGet, exposedURL, nil) + if err != nil { + return fmt.Errorf("build request: %w", err) + } + + response, err := httpClient.Do(request) + if err != nil { + return fmt.Errorf("dial exposed service: %w", err) + } + defer response.Body.Close() + + bodyBytes, readErr := io.ReadAll(response.Body) + if readErr != nil { + if response.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status %d (read body error: %v)", response.StatusCode, readErr) + } + return fmt.Errorf("read response body: %w", readErr) + } + if response.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status %d: %s", response.StatusCode, strings.TrimSpace(string(bodyBytes))) + } + responseBody := string(bodyBytes) + if responseBody != body { + return fmt.Errorf("unexpected body %q (expected %q)", responseBody, body) + } + return nil + }) + if err != nil { + t.Fatalf("wait for exposed service: %v", err) + } +} + +func setupExposeTestWorkload(t *testing.T) exposeWorkloadFixture { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), exposeTestTimeout) t.Cleanup(cancel) agentsConn := dialGRPC(t, agentsAddr) @@ -36,7 +251,7 @@ func TestAgentExposeListExec(t *testing.T) { usersClient := usersv1.NewUsersServiceClient(usersConn) orgsClient := organizationsv1.NewOrganizationsServiceClient(orgsConn) runnerClient := runnerv1.NewRunnerServiceClient(runnerConn) - exposeInitImage := requireEnv("AGN_EXPOSE_INIT_IMAGE") + exposeInitImage := envOrDefault("AGN_EXPOSE_INIT_IMAGE", "ghcr.io/agynio/agent-init-agn:latest") identityID := resolveOrCreateUser(t, ctx, usersClient) threadsCtx := withIdentity(ctx, identityID) @@ -85,18 +300,17 @@ func TestAgentExposeListExec(t *testing.T) { } }) - expectedResponse := "Hi! How are you?" sentMessage := sendMessage(t, threadsCtx, threadsClient, threadID, identityID, "hi") sentMessageTime := messageCreatedAt(t, sentMessage) pollCtx, pollCancel := context.WithTimeout(threadsCtx, 5*time.Minute) defer pollCancel() - agentBody, err := pollForAgentResponse(t, pollCtx, threadsClient, runnerClient, threadID, agentID, labels, sentMessageTime, expectedResponse) + agentBody, err := pollForAgentResponse(t, pollCtx, threadsClient, runnerClient, threadID, agentID, labels, sentMessageTime, exposeExpectedResponse) if err != nil { t.Fatalf("wait for agent response: %v", err) } - if agentBody != expectedResponse { - t.Fatalf("expected agent response %q, got %q", expectedResponse, agentBody) + if agentBody != exposeExpectedResponse { + t.Fatalf("expected agent response %q, got %q", exposeExpectedResponse, agentBody) } workloadIDs, err := findWorkloadsByLabels(ctx, runnerClient, labels) @@ -113,10 +327,28 @@ func TestAgentExposeListExec(t *testing.T) { if err != nil { t.Fatalf("wait for agent container: %v", err) } - result := execPodCommand(t, execCtx, workloadNamespace(t), podName, containerName, []string{"/agyn-bin/cli/agyn", "expose", "list"}) - if result.exitCode != 0 { - t.Fatalf("expected expose list exit code 0, got %d stdout=%q stderr=%q", result.exitCode, result.stdout, result.stderr) + + return exposeWorkloadFixture{ + ctx: ctx, + podName: podName, + containerName: containerName, + } +} + +func parseExposeAddResponse(t *testing.T, output string) exposeAddResponse { + t.Helper() + trimmed := strings.TrimSpace(output) + if trimmed == "" { + t.Fatal("expose add output is empty") + } + var response exposeAddResponse + if err := json.Unmarshal([]byte(trimmed), &response); err != nil { + t.Fatalf("parse expose add output: %v stdout=%q", err, trimmed) } + response.ID = strings.TrimSpace(response.ID) + response.URL = strings.TrimSpace(response.URL) + response.Status = strings.TrimSpace(response.Status) + return response } func waitForWorkloadAgentContainerReady(t *testing.T, ctx context.Context, workloadID string) (string, string, error) { diff --git a/suites/go-core/tests/gateway_helpers_test.go b/suites/go-core/tests/gateway_helpers_test.go index 04b47e5..4ffb5a8 100644 --- a/suites/go-core/tests/gateway_helpers_test.go +++ b/suites/go-core/tests/gateway_helpers_test.go @@ -1,4 +1,4 @@ -//go:build e2e && (svc_gateway || smoke) +//go:build e2e && (svc_gateway || svc_reminders || smoke) package tests @@ -30,8 +30,6 @@ const ( gatewayAgentInitImageEnvKey = "AGYN_AGENT_INIT_IMAGE" gatewayUsersAddrEnvKey = "USERS_ADDRESS" defaultGatewayUsersAddr = "users:50051" - zitiManagementAddrEnvKey = "ZITI_MANAGEMENT_ADDRESS" - defaultZitiManagementAddr = "ziti-management:50051" zitiGatewayBaseURL = "http://gateway" gatewayRequestTimeout = 30 * time.Second ) @@ -99,11 +97,6 @@ func gatewayUsersAddr() string { return envOrDefault(gatewayUsersAddrEnvKey, defaultGatewayUsersAddr) } -func zitiManagementAddr(t *testing.T) string { - t.Helper() - return envOrDefault(zitiManagementAddrEnvKey, defaultZitiManagementAddr) -} - func gatewayEndpoint(t *testing.T, path string) string { t.Helper() endpoint, err := url.JoinPath(gatewayBaseURL(t), path) diff --git a/suites/go-core/tests/grpc.go b/suites/go-core/tests/grpc.go index 78d1e0a..0b6e7db 100644 --- a/suites/go-core/tests/grpc.go +++ b/suites/go-core/tests/grpc.go @@ -1,4 +1,4 @@ -//go:build e2e && (svc_agents_orchestrator || svc_runners || svc_metering || svc_k8s_runner || svc_organizations || svc_files || svc_gateway || svc_media_proxy || smoke) +//go:build e2e && (svc_agents_orchestrator || svc_runners || svc_metering || svc_k8s_runner || svc_organizations || svc_files || svc_gateway || svc_reminders || svc_media_proxy || smoke) package tests diff --git a/suites/go-core/tests/identity_helpers_test.go b/suites/go-core/tests/identity_context_test.go similarity index 86% rename from suites/go-core/tests/identity_helpers_test.go rename to suites/go-core/tests/identity_context_test.go index f0538d3..a7dc372 100644 --- a/suites/go-core/tests/identity_helpers_test.go +++ b/suites/go-core/tests/identity_context_test.go @@ -1,4 +1,4 @@ -//go:build e2e && svc_runners && !smoke +//go:build e2e package tests diff --git a/suites/go-core/tests/main_test.go b/suites/go-core/tests/main_test.go index b026b5d..a0ac820 100644 --- a/suites/go-core/tests/main_test.go +++ b/suites/go-core/tests/main_test.go @@ -1,4 +1,4 @@ -//go:build e2e && (svc_agents_orchestrator || svc_runners || svc_metering || svc_k8s_runner || svc_organizations || smoke) +//go:build e2e && (svc_agents_orchestrator || svc_runners || svc_metering || svc_k8s_runner || svc_organizations || svc_gateway || svc_reminders || smoke) package tests @@ -37,9 +37,6 @@ const ( tracingSummaryTimeout = 2 * time.Minute tracingStartTimeBuffer = 30 * time.Second - testLLMEndpointCodex = "https://testllm.dev/v1/org/agynio/suite/codex/responses" - testLLMEndpointAgn = "https://testllm.dev/v1/org/agynio/suite/agn/responses" - labelManagedBy = "managed-by" labelAgentID = "agent-id" labelThreadID = "thread-id" @@ -57,8 +54,8 @@ var ( runnersAddr = envOrDefault("RUNNERS_ADDRESS", "runners:50051") secretsAddr = envOrDefault("SECRETS_ADDRESS", "secrets:50051") tracingAddr = envOrDefault("TRACING_ADDRESS", "tracing:50051") - codexInitImage = requireEnv("CODEX_INIT_IMAGE") - agnInitImage = requireEnv("AGN_INIT_IMAGE") + codexInitImage = envOrDefault("CODEX_INIT_IMAGE", "ghcr.io/agynio/agent-init-codex:latest") + agnInitImage = envOrDefault("AGN_INIT_IMAGE", "ghcr.io/agynio/agent-init-agn:latest") ) type pipelineRun struct { diff --git a/suites/go-core/tests/reminders_agent_loop_test.go b/suites/go-core/tests/reminders_agent_loop_test.go new file mode 100644 index 0000000..557f616 --- /dev/null +++ b/suites/go-core/tests/reminders_agent_loop_test.go @@ -0,0 +1,165 @@ +//go:build e2e && (svc_reminders || svc_gateway) + +package tests + +import ( + "context" + "fmt" + "sort" + "strings" + "testing" + "time" + + agentsv1 "github.com/agynio/e2e/suites/go-core/.gen/go/agynio/api/agents/v1" + llmv1 "github.com/agynio/e2e/suites/go-core/.gen/go/agynio/api/llm/v1" + threadsv1 "github.com/agynio/e2e/suites/go-core/.gen/go/agynio/api/threads/v1" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +const ( + remindersWaitTimeout = 2 * time.Minute +) + +func TestRemindersAgentLoop(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + t.Cleanup(cancel) + + orgID := gatewayOrganizationID(t) + token := gatewayAPIToken(t) + caller := fetchGatewayIdentity(t, token) + callerID := caller.IdentityID + remindersAppIdentity := remindersAppIdentityID(t) + + llmConn := dialGRPC(t, llmAddr) + agentsConn := dialGRPC(t, agentsAddr) + threadsConn := dialGRPC(t, threadsAddr) + + llmClient := llmv1.NewLLMServiceClient(llmConn) + agentsClient := agentsv1.NewAgentsServiceClient(agentsConn) + threadsClient := threadsv1.NewThreadsServiceClient(threadsConn) + + provider := createLLMProvider(t, ctx, llmClient, testLLMEndpointAgn, orgID) + providerID := strings.TrimSpace(provider.GetMeta().GetId()) + if providerID == "" { + t.Fatal("create llm provider: missing id") + } + + model := createModel(t, ctx, llmClient, "e2e-reminders-model-"+uuid.NewString(), providerID, "reminders-agent-loop", orgID) + modelID := strings.TrimSpace(model.GetMeta().GetId()) + if modelID == "" { + t.Fatal("create model: missing id") + } + + agent := createAgent(t, ctx, agentsClient, fmt.Sprintf("e2e-reminders-agent-%s", uuid.NewString()), modelID, orgID, agnInitImage) + agentID := strings.TrimSpace(agent.GetMeta().GetId()) + if agentID == "" { + t.Fatal("create agent: missing id") + } + + t.Cleanup(func() { + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cleanupCancel() + deleteAgent(t, cleanupCtx, agentsClient, agentID) + }) + + reminderNote := fmt.Sprintf("e2e reminder %s", uuid.NewString()) + createAgentEnv(t, ctx, agentsClient, agentID, "LLM_API_TOKEN", token) + createAgentEnv(t, ctx, agentsClient, agentID, "AGYN_API_TOKEN", token) + createAgentEnv(t, ctx, agentsClient, agentID, "AGYN_BASE_URL", zitiGatewayBaseURL) + createAgentEnv(t, ctx, agentsClient, agentID, "AGYN_ORGANIZATION_ID", orgID) + createAgentEnv(t, ctx, agentsClient, agentID, "REMINDER_NOTE", reminderNote) + + threadsCtx := remindersIdentityContext(ctx, callerID) + thread := createThread(t, threadsCtx, threadsClient, orgID, []string{callerID, agentID}) + threadID := strings.TrimSpace(thread.GetId()) + if threadID == "" { + t.Fatal("create thread: missing id") + } + t.Cleanup(func() { + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cleanupCancel() + archiveThread(t, remindersIdentityContext(cleanupCtx, callerID), threadsClient, threadID) + }) + t.Cleanup(func() { + cancelPendingRemindersBestEffort(t, threadID) + }) + + sentMessage := sendMessage(t, threadsCtx, threadsClient, threadID, callerID, "Schedule a short reminder and acknowledge it when it arrives.") + sentAt := messageCreatedAt(t, sentMessage) + + expectedScheduled := "Scheduled. I will reply when the reminder arrives." + expectedReminder := fmt.Sprintf("Reminder: %s", reminderNote) + expectedAck := "Acknowledged: reminder received." + + pollCtx, pollCancel := context.WithTimeout(threadsCtx, remindersWaitTimeout) + defer pollCancel() + + var scheduledMsg *threadsv1.Message + var reminderMsg *threadsv1.Message + var ackMsg *threadsv1.Message + + err := pollUntil(pollCtx, pollInterval, func(ctx context.Context) error { + resp, err := threadsClient.GetMessages(ctx, &threadsv1.GetMessagesRequest{ + ThreadId: threadID, + PageSize: 50, + }) + if err != nil { + return fmt.Errorf("get messages: %w", err) + } + filtered := make([]*threadsv1.Message, 0, len(resp.GetMessages())) + for _, msg := range resp.GetMessages() { + if msg == nil { + return fmt.Errorf("message is nil") + } + createdAt := msg.GetCreatedAt() + if createdAt == nil { + return fmt.Errorf("message %s missing created_at", msg.GetId()) + } + if createdAt.AsTime().Before(sentAt) { + continue + } + filtered = append(filtered, msg) + } + sort.Slice(filtered, func(i, j int) bool { + return messageCreatedAt(t, filtered[i]).Before(messageCreatedAt(t, filtered[j])) + }) + + scheduledMsg = nil + reminderMsg = nil + ackMsg = nil + for _, msg := range filtered { + if scheduledMsg == nil { + if msg.GetSenderId() == agentID && msg.GetBody() == expectedScheduled { + scheduledMsg = msg + } + continue + } + if reminderMsg == nil { + if msg.GetSenderId() == remindersAppIdentity && msg.GetBody() == expectedReminder { + reminderMsg = msg + } + continue + } + if ackMsg == nil { + if msg.GetSenderId() == agentID && msg.GetBody() == expectedAck { + ackMsg = msg + } + continue + } + } + if scheduledMsg == nil || reminderMsg == nil || ackMsg == nil { + return fmt.Errorf("waiting for reminders flow messages") + } + reminderAt := messageCreatedAt(t, reminderMsg) + ackAt := messageCreatedAt(t, ackMsg) + if !ackAt.After(reminderAt) { + return fmt.Errorf("acknowledgment created_at %s not after reminder %s", ackAt, reminderAt) + } + return nil + }) + require.NoError(t, err) + require.NotNil(t, scheduledMsg) + require.NotNil(t, reminderMsg) + require.NotNil(t, ackMsg) +} diff --git a/suites/go-core/tests/reminders_test.go b/suites/go-core/tests/reminders_test.go new file mode 100644 index 0000000..5a840ad --- /dev/null +++ b/suites/go-core/tests/reminders_test.go @@ -0,0 +1,505 @@ +//go:build e2e && (svc_reminders || svc_gateway) + +package tests + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "testing" + "time" + + appsv1 "github.com/agynio/e2e/suites/go-core/.gen/go/agynio/api/apps/v1" + threadsv1 "github.com/agynio/e2e/suites/go-core/.gen/go/agynio/api/threads/v1" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" +) + +const remindersAppSlug = "reminders" + +type reminderResponse struct { + ID string `json:"id"` + ThreadID string `json:"thread_id"` + IdentityID string `json:"identity_id"` + Note string `json:"note"` + Status string `json:"status"` + At string `json:"at"` + CreatedAt string `json:"created_at"` + CompletedAt *string `json:"completed_at"` + CancelledAt *string `json:"cancelled_at"` +} + +type singleReminderResponse struct { + Reminder reminderResponse `json:"reminder"` +} + +type listRemindersResponse struct { + Reminders []reminderResponse `json:"reminders"` +} + +func TestRemindersGatewayHealthz(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), gatewayRequestTimeout) + defer cancel() + + request, err := newRemindersRequest(t, ctx, http.MethodGet, "healthz", nil) + require.NoError(t, err) + + response, err := remindersGatewayClient(t).Do(request) + require.NoError(t, err) + defer response.Body.Close() + + if response.StatusCode != http.StatusOK { + body, _ := io.ReadAll(response.Body) + t.Fatalf("healthz returned %d: %s", response.StatusCode, strings.TrimSpace(string(body))) + } +} + +func TestRemindersCreateGetRoundtrip(t *testing.T) { + callerIdentity := fetchGatewayIdentity(t, gatewayAPIToken(t)).IdentityID + threadID := uuid.NewString() + note := fmt.Sprintf("roundtrip %s", uuid.NewString()) + + created := createReminder(t, threadID, 60, note) + t.Cleanup(func() { cancelReminderBestEffort(t, created.ID) }) + + require.NotEmpty(t, created.ID) + require.Equal(t, threadID, created.ThreadID) + require.Equal(t, callerIdentity, created.IdentityID) + require.Equal(t, note, created.Note) + require.Equal(t, "pending", created.Status) + require.NotEmpty(t, created.At) + require.NotEmpty(t, created.CreatedAt) + require.Nil(t, created.CompletedAt) + require.Nil(t, created.CancelledAt) + + fetched := getReminder(t, created.ID) + require.Equal(t, created, fetched) +} + +func TestRemindersListFilters(t *testing.T) { + threadID := uuid.NewString() + pending := createReminder(t, threadID, 3600, "pending "+uuid.NewString()) + cancelled := createReminder(t, threadID, 3600, "cancelled "+uuid.NewString()) + t.Cleanup(func() { cancelReminderBestEffort(t, pending.ID) }) + t.Cleanup(func() { cancelReminderBestEffort(t, cancelled.ID) }) + + cancelReminder := cancelReminderStrict(t, cancelled.ID) + require.Equal(t, "cancelled", cancelReminder.Status) + + defaultList := listReminders(t, threadID, "") + require.ElementsMatch(t, []string{pending.ID}, reminderIDs(defaultList.Reminders)) + + pendingList := listReminders(t, threadID, "pending") + require.ElementsMatch(t, []string{pending.ID}, reminderIDs(pendingList.Reminders)) + + cancelledList := listReminders(t, threadID, "cancelled") + require.ElementsMatch(t, []string{cancelled.ID}, reminderIDs(cancelledList.Reminders)) + + allList := listReminders(t, threadID, "all") + require.ElementsMatch(t, []string{pending.ID, cancelled.ID}, reminderIDs(allList.Reminders)) +} + +func TestRemindersCancelSemantics(t *testing.T) { + threadID := uuid.NewString() + reminder := createReminder(t, threadID, 3600, "cancel "+uuid.NewString()) + t.Cleanup(func() { cancelReminderBestEffort(t, reminder.ID) }) + + cancelled := cancelReminderStrict(t, reminder.ID) + require.Equal(t, "cancelled", cancelled.Status) + require.NotNil(t, cancelled.CancelledAt) + require.Nil(t, cancelled.CompletedAt) + + secondResp := postRemindersJSON(t, "cancel-reminder", map[string]string{"reminder_id": reminder.ID}) + require.Equal(t, http.StatusConflict, secondResp.StatusCode) + secondResp.Body.Close() + + notFoundResp := postRemindersJSON(t, "cancel-reminder", map[string]string{"reminder_id": uuid.NewString()}) + require.Equal(t, http.StatusNotFound, notFoundResp.StatusCode) + notFoundResp.Body.Close() +} + +func TestRemindersDeliveryHappyPath(t *testing.T) { + callerIdentity := fetchGatewayIdentity(t, gatewayAPIToken(t)).IdentityID + threadsClient := newThreadsClient(t) + appIdentity := remindersAppIdentityID(t) + + threadID := createThreadWithAppParticipant(t, threadsClient, callerIdentity, appIdentity) + t.Cleanup(func() { archiveThreadBestEffort(t, threadsClient, callerIdentity, threadID) }) + + note := "delivery " + uuid.NewString() + created := createReminder(t, threadID, 0, note) + t.Cleanup(func() { cancelReminderBestEffort(t, created.ID) }) + + completed := pollReminderStatus(t, created.ID, "completed", 30*time.Second) + require.NotNil(t, completed.CompletedAt) + + messages := pollThreadMessages(t, threadsClient, callerIdentity, threadID, 1, 30*time.Second) + require.Len(t, messages, 1) + require.Equal(t, "Reminder: "+note, messages[0].GetBody()) + require.Equal(t, appIdentity, messages[0].GetSenderId()) +} + +func TestRemindersDeliveryFailurePending(t *testing.T) { + threadID := uuid.NewString() + created := createReminder(t, threadID, 0, "missing thread "+uuid.NewString()) + t.Cleanup(func() { cancelReminderBestEffort(t, created.ID) }) + + assertReminderStaysPending(t, created.ID, 10*time.Second) +} + +func remindersAppsAddr() string { + return envOrDefault("APPS_ADDRESS", "apps:50051") +} + +func remindersThreadsAddr() string { + return envOrDefault("THREADS_ADDRESS", "threads:50051") +} + +func remindersGatewayClient(t *testing.T) *http.Client { + t.Helper() + return newGatewayAuthenticatedClient(t, gatewayAPIToken(t)) +} + +func remindersGatewayEndpoint(t *testing.T, path string) string { + t.Helper() + trimmed := strings.TrimPrefix(path, "/") + if trimmed == "" { + return gatewayEndpoint(t, "apps/"+remindersAppSlug) + } + return gatewayEndpoint(t, "apps/"+remindersAppSlug+"/"+trimmed) +} + +func newRemindersRequest(t *testing.T, ctx context.Context, method, path string, body io.Reader) (*http.Request, error) { + t.Helper() + req, err := http.NewRequestWithContext(ctx, method, remindersGatewayEndpoint(t, path), body) + if err != nil { + return nil, err + } + req.Header.Set("x-organization-id", gatewayOrganizationID(t)) + if method == http.MethodPost { + req.Header.Set("Content-Type", "application/json") + } + return req, nil +} + +func postRemindersJSON(t *testing.T, path string, payload any) *http.Response { + t.Helper() + body, err := json.Marshal(payload) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), gatewayRequestTimeout) + defer cancel() + + req, err := newRemindersRequest(t, ctx, http.MethodPost, path, bytes.NewReader(body)) + require.NoError(t, err) + + resp, err := remindersGatewayClient(t).Do(req) + require.NoError(t, err) + return resp +} + +func decodeGatewayResponse[T any](t *testing.T, resp *http.Response) T { + t.Helper() + defer resp.Body.Close() + decoder := json.NewDecoder(resp.Body) + var payload T + require.NoError(t, decoder.Decode(&payload)) + return payload +} + +func createReminder(t *testing.T, threadID string, delaySeconds int64, note string) reminderResponse { + t.Helper() + resp := postRemindersJSON(t, "create-reminder", map[string]any{ + "thread_id": threadID, + "delay_seconds": delaySeconds, + "note": note, + }) + if resp.StatusCode != http.StatusCreated { + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + t.Fatalf("create reminder failed: status %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) + } + created := decodeGatewayResponse[singleReminderResponse](t, resp) + return created.Reminder +} + +func getReminder(t *testing.T, reminderID string) reminderResponse { + t.Helper() + resp := postRemindersJSON(t, "get-reminder", map[string]string{"reminder_id": reminderID}) + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + t.Fatalf("get reminder failed: status %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) + } + payload := decodeGatewayResponse[singleReminderResponse](t, resp) + return payload.Reminder +} + +func listReminders(t *testing.T, threadID, status string) listRemindersResponse { + t.Helper() + body := map[string]any{"thread_id": threadID} + if status != "" { + body["status"] = status + } + resp := postRemindersJSON(t, "list-reminders", body) + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + resp.Body.Close() + t.Fatalf("list reminders failed: status %d: %s", resp.StatusCode, strings.TrimSpace(string(bodyBytes))) + } + payload := decodeGatewayResponse[listRemindersResponse](t, resp) + return payload +} + +func cancelReminderStrict(t *testing.T, reminderID string) reminderResponse { + t.Helper() + resp := postRemindersJSON(t, "cancel-reminder", map[string]string{"reminder_id": reminderID}) + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + t.Fatalf("cancel reminder failed: status %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) + } + payload := decodeGatewayResponse[singleReminderResponse](t, resp) + return payload.Reminder +} + +func cancelReminderBestEffort(t *testing.T, reminderID string) { + t.Helper() + resp := postRemindersJSON(t, "cancel-reminder", map[string]string{"reminder_id": reminderID}) + defer resp.Body.Close() + switch resp.StatusCode { + case http.StatusOK, http.StatusConflict, http.StatusNotFound: + return + default: + body, _ := io.ReadAll(resp.Body) + t.Logf("cleanup: cancel reminder %s: status %d: %s", reminderID, resp.StatusCode, strings.TrimSpace(string(body))) + } +} + +func cancelPendingRemindersBestEffort(t *testing.T, threadID string) { + t.Helper() + reminders := listRemindersBestEffort(t, threadID, "pending") + for _, reminder := range reminders { + reminderID := strings.TrimSpace(reminder.ID) + if reminderID == "" { + continue + } + cancelReminderBestEffort(t, reminderID) + } +} + +func listRemindersBestEffort(t *testing.T, threadID, status string) []reminderResponse { + t.Helper() + payload := map[string]any{"thread_id": threadID} + if status != "" { + payload["status"] = status + } + body, err := json.Marshal(payload) + if err != nil { + t.Logf("cleanup: marshal reminders list: %v", err) + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), gatewayRequestTimeout) + defer cancel() + + req, err := newRemindersRequest(t, ctx, http.MethodPost, "list-reminders", bytes.NewReader(body)) + if err != nil { + t.Logf("cleanup: list reminders request: %v", err) + return nil + } + + resp, err := remindersGatewayClient(t).Do(req) + if err != nil { + t.Logf("cleanup: list reminders request: %v", err) + return nil + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + t.Logf("cleanup: list reminders status %d: %s", resp.StatusCode, strings.TrimSpace(string(bodyBytes))) + return nil + } + + decoder := json.NewDecoder(resp.Body) + var payloadResponse listRemindersResponse + if err := decoder.Decode(&payloadResponse); err != nil { + t.Logf("cleanup: decode reminders list: %v", err) + return nil + } + + return payloadResponse.Reminders +} + +func reminderIDs(reminders []reminderResponse) []string { + ids := make([]string, 0, len(reminders)) + for _, reminder := range reminders { + ids = append(ids, reminder.ID) + } + return ids +} + +func newThreadsClient(t *testing.T) threadsv1.ThreadsServiceClient { + t.Helper() + conn := dialGRPC(t, remindersThreadsAddr()) + return threadsv1.NewThreadsServiceClient(conn) +} + +func newAppsClient(t *testing.T) appsv1.AppsServiceClient { + t.Helper() + conn := dialGRPC(t, remindersAppsAddr()) + return appsv1.NewAppsServiceClient(conn) +} + +func remindersAppIdentityID(t *testing.T) string { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + caller := fetchGatewayIdentity(t, gatewayAPIToken(t)) + callCtx := remindersIdentityContext(ctx, caller.IdentityID) + orgID := gatewayOrganizationID(t) + resp, err := newAppsClient(t).GetAppBySlug(callCtx, &appsv1.GetAppBySlugRequest{ + Slug: remindersAppSlug, + OrganizationId: orgID, + }) + require.NoError(t, err) + if resp == nil || resp.GetApp() == nil { + t.Fatal("get app by slug: missing app") + } + identityID := strings.TrimSpace(resp.GetApp().GetIdentityId()) + if identityID == "" { + t.Fatal("get app by slug: missing identity id") + } + return identityID +} + +func createThreadWithAppParticipant( + t *testing.T, + client threadsv1.ThreadsServiceClient, + callerIdentityID string, + appIdentityID string, +) string { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + callCtx := remindersIdentityContext(ctx, callerIdentityID) + orgID := gatewayOrganizationID(t) + resp, err := client.CreateThread(callCtx, &threadsv1.CreateThreadRequest{ + ParticipantIds: []string{callerIdentityID, appIdentityID}, + OrganizationId: &orgID, + }) + require.NoError(t, err) + thread := resp.GetThread() + if thread == nil { + t.Fatal("create thread: missing thread") + } + threadID := strings.TrimSpace(thread.GetId()) + if threadID == "" { + t.Fatal("create thread: missing id") + } + return threadID +} + +func archiveThreadBestEffort(t *testing.T, client threadsv1.ThreadsServiceClient, identityID, threadID string) { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + callCtx := remindersIdentityContext(ctx, identityID) + _, err := client.ArchiveThread(callCtx, &threadsv1.ArchiveThreadRequest{ThreadId: threadID}) + if err != nil { + t.Logf("cleanup: archive thread %s: %v", threadID, err) + } +} + +func pollReminderStatus(t *testing.T, reminderID, targetStatus string, timeout time.Duration) reminderResponse { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + reminder := getReminder(t, reminderID) + if reminder.Status == targetStatus { + return reminder + } + time.Sleep(500 * time.Millisecond) + } + t.Fatalf("reminder %s did not reach status %q within %s", reminderID, targetStatus, timeout) + return reminderResponse{} +} + +func assertReminderStaysPending(t *testing.T, reminderID string, duration time.Duration) { + t.Helper() + deadline := time.Now().Add(duration) + for time.Now().Before(deadline) { + reminder := getReminder(t, reminderID) + if reminder.Status != "pending" { + t.Fatalf("expected reminder %s to stay pending, got %q", reminderID, reminder.Status) + } + if reminder.CompletedAt != nil { + t.Fatalf("expected reminder %s completed_at to stay nil", reminderID) + } + time.Sleep(500 * time.Millisecond) + } +} + +func pollThreadMessages( + t *testing.T, + client threadsv1.ThreadsServiceClient, + identityID string, + threadID string, + expectedCount int, + timeout time.Duration, +) []*threadsv1.Message { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + messages := getThreadMessages(t, client, identityID, threadID) + if len(messages) >= expectedCount { + return messages + } + time.Sleep(500 * time.Millisecond) + } + t.Fatalf("expected %d messages for thread %s within %s", expectedCount, threadID, timeout) + return nil +} + +func getThreadMessages( + t *testing.T, + client threadsv1.ThreadsServiceClient, + identityID string, + threadID string, +) []*threadsv1.Message { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + callCtx := remindersIdentityContext(ctx, identityID) + var all []*threadsv1.Message + pageToken := "" + for { + resp, err := client.GetMessages(callCtx, &threadsv1.GetMessagesRequest{ + ThreadId: threadID, + PageSize: 100, + PageToken: pageToken, + }) + require.NoError(t, err) + all = append(all, resp.GetMessages()...) + pageToken = resp.GetNextPageToken() + if pageToken == "" { + break + } + } + return all +} + +func remindersIdentityContext(ctx context.Context, identityID string) context.Context { + md := metadata.New(map[string]string{"x-identity-id": identityID}) + return metadata.NewOutgoingContext(ctx, md) +} diff --git a/suites/go-core/tests/setup_test.go b/suites/go-core/tests/setup_test.go index 7c7df36..176b606 100644 --- a/suites/go-core/tests/setup_test.go +++ b/suites/go-core/tests/setup_test.go @@ -12,7 +12,6 @@ import ( organizationsv1 "github.com/agynio/e2e/suites/go-core/.gen/go/agynio/api/organizations/v1" usersv1 "github.com/agynio/e2e/suites/go-core/.gen/go/agynio/api/users/v1" "github.com/google/uuid" - "google.golang.org/grpc/metadata" ) const ( @@ -20,11 +19,6 @@ const ( apiTokenName = "e2e-orchestrator" ) -func withIdentity(ctx context.Context, identityID string) context.Context { - md := metadata.New(map[string]string{"x-identity-id": identityID}) - return metadata.NewOutgoingContext(ctx, md) -} - func resolveOrCreateUser(t *testing.T, ctx context.Context, client usersv1.UsersServiceClient) string { t.Helper() callCtx, cancel := context.WithTimeout(ctx, setupTimeout) diff --git a/suites/go-core/tests/testllm_endpoints_test.go b/suites/go-core/tests/testllm_endpoints_test.go new file mode 100644 index 0000000..8cae65a --- /dev/null +++ b/suites/go-core/tests/testllm_endpoints_test.go @@ -0,0 +1,8 @@ +//go:build e2e + +package tests + +const ( + testLLMEndpointCodex = "https://testllm.dev/v1/org/agynio/suite/codex/responses" + testLLMEndpointAgn = "https://testllm.dev/v1/org/agynio/suite/agn/responses" +) diff --git a/suites/go-core/tests/ziti_helpers_test.go b/suites/go-core/tests/ziti_helpers_test.go new file mode 100644 index 0000000..1ddccd5 --- /dev/null +++ b/suites/go-core/tests/ziti_helpers_test.go @@ -0,0 +1,15 @@ +//go:build e2e + +package tests + +import "testing" + +const ( + zitiManagementAddrEnvKey = "ZITI_MANAGEMENT_ADDRESS" + defaultZitiManagementAddr = "ziti-management:50051" +) + +func zitiManagementAddr(t *testing.T) string { + t.Helper() + return envOrDefault(zitiManagementAddrEnvKey, defaultZitiManagementAddr) +} diff --git a/suites/playwright-chat-app/test/e2e/chat-api.ts b/suites/playwright-chat-app/test/e2e/chat-api.ts index 0f85a4b..669979d 100644 --- a/suites/playwright-chat-app/test/e2e/chat-api.ts +++ b/suites/playwright-chat-app/test/e2e/chat-api.ts @@ -9,6 +9,10 @@ const LLM_GATEWAY_PATH = '/api/agynio.api.gateway.v1.LLMGateway'; const ORGS_GATEWAY_PATH = '/api/agynio.api.gateway.v1.OrganizationsGateway'; const USERS_GATEWAY_PATH = '/api/agynio.api.gateway.v1.UsersGateway'; +export const DEFAULT_TEST_INIT_IMAGE = + process.env.E2E_AGENT_INIT_IMAGE?.trim() || + process.env.CODEX_INIT_IMAGE?.trim() || + 'ghcr.io/agynio/agent-init-codex:latest'; export const DEFAULT_TEST_AGENT_IMAGE = 'alpine:3.21'; const CONNECT_HEADERS = { @@ -24,11 +28,11 @@ export function resolveCodexInitImage(override?: string): string { } return trimmed; } - const value = process.env.CODEX_INIT_IMAGE?.trim() ?? ''; - if (!value) { - throw new Error('CODEX_INIT_IMAGE is required to create chat agents.'); + const value = process.env.CODEX_INIT_IMAGE?.trim(); + if (value) { + return value; } - return value; + return DEFAULT_TEST_INIT_IMAGE; } type CreateChatResponseWire = { @@ -47,6 +51,10 @@ type CreateAPITokenResponseWire = { plaintextToken?: string; }; +type GetMeResponseWire = { + user?: { meta?: { id?: string } }; +}; + type ListAccessibleOrganizationsResponseWire = { organizations?: Array<{ id: string; name: string }>; }; @@ -200,24 +208,12 @@ async function storeChatOrganization(page: Page, chatId: string, organizationId: } export async function resolveIdentityId(page: Page): Promise { - const session = await readOidcSession(page); - const token = session?.accessToken ?? null; - const headers: Record = token - ? { Authorization: `Bearer ${token}` } - : {}; - - const baseUrl = resolveBaseUrl(); - const response = await page.context().request.get(`${baseUrl}/api/me`, { headers }); - if (!response.ok()) { - const body = await response.text(); - throw new Error(`GET /api/me failed with status ${response.status()}: ${body}`); - } - - const payload = (await response.json()) as { identity_id?: string }; - if (!payload.identity_id) { - throw new Error('/api/me response missing identity_id'); + const response = await postConnect(page, USERS_GATEWAY_PATH, 'GetMe', {}); + const identityId = response.user?.meta?.id ?? ''; + if (!identityId) { + throw new Error('UsersGateway.GetMe response missing identity id.'); } - return payload.identity_id; + return identityId; } export async function resolveUserLabel(page: Page, identityId: string): Promise { diff --git a/suites/playwright-tracing-app/test/e2e/tracing-run.ts b/suites/playwright-tracing-app/test/e2e/tracing-run.ts index f3f1c2c..272e88e 100644 --- a/suites/playwright-tracing-app/test/e2e/tracing-run.ts +++ b/suites/playwright-tracing-app/test/e2e/tracing-run.ts @@ -33,6 +33,11 @@ const TEST_LLM_TOKEN = 'test-token'; const TEST_LLM_MODEL = 'mcp-tools-test'; const AGENT_IMAGE = 'alpine:3.21'; const MCP_IMAGE = 'node:22-slim'; +const DEFAULT_INIT_IMAGES: Record = { + agn: 'ghcr.io/agynio/agent-init-agn:latest', + codex: 'ghcr.io/agynio/agent-init-codex:latest', + claude: 'ghcr.io/agynio/agent-init-claude:latest', +}; const INIT_IMAGE_ENV_VARS: Record = { agn: 'AGN_INIT_IMAGE', codex: 'CODEX_INIT_IMAGE', @@ -72,11 +77,11 @@ type FullChainRunOptions = { function resolveInitImage(sdk: TraceSdk): string { const envVar = INIT_IMAGE_ENV_VARS[sdk]; - const value = process.env[envVar]?.trim() ?? ''; - if (!value) { - throw new Error(`${envVar} is required to run tracing full-chain tests.`); + const value = process.env[envVar]?.trim(); + if (value) { + return value; } - return value; + return DEFAULT_INIT_IMAGES[sdk]; } function resolveLlmEndpoint(sdk: TraceSdk): string {