From 965f2ac8872a7fdfc9761f02567f68bfca0085da Mon Sep 17 00:00:00 2001 From: whoisasx Date: Sun, 31 May 2026 00:55:42 +0530 Subject: [PATCH] feat: add neutral scm observability --- .../internal/adapters/scm/github/client.go | 167 +++++++-- .../internal/adapters/scm/github/commands.go | 4 +- .../internal/adapters/scm/github/provider.go | 60 ++-- .../adapters/scm/github/provider_test.go | 161 +++++++++ backend/internal/scm/command/service.go | 152 +++++++- backend/internal/scm/command/service_test.go | 174 ++++++++- backend/internal/scm/logging/logging.go | 337 ++++++++++++++++++ backend/internal/scm/observer/observer.go | 146 ++++++-- .../internal/scm/observer/observer_test.go | 119 +++++++ 9 files changed, 1209 insertions(+), 111 deletions(-) create mode 100644 backend/internal/scm/logging/logging.go diff --git a/backend/internal/adapters/scm/github/client.go b/backend/internal/adapters/scm/github/client.go index 66f3bb59..3d04b1d3 100644 --- a/backend/internal/adapters/scm/github/client.go +++ b/backend/internal/adapters/scm/github/client.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "io" + "log/slog" "net" "net/http" "net/url" @@ -14,6 +15,7 @@ import ( "time" "github.com/aoagents/agent-orchestrator/backend/internal/domain" + scmlog "github.com/aoagents/agent-orchestrator/backend/internal/scm/logging" ) const ( @@ -28,6 +30,7 @@ type Client struct { restBase string graphqlURL string userAgent string + logger *slog.Logger } type ClientOptions struct { @@ -36,10 +39,11 @@ type ClientOptions struct { RESTBase string GraphQLURL string UserAgent string + Logger *slog.Logger } func NewClient(opts ClientOptions) *Client { - c := &Client{httpClient: opts.HTTPClient, tokens: opts.Token, restBase: opts.RESTBase, graphqlURL: opts.GraphQLURL, userAgent: opts.UserAgent} + c := &Client{httpClient: opts.HTTPClient, tokens: opts.Token, restBase: opts.RESTBase, graphqlURL: opts.GraphQLURL, userAgent: opts.UserAgent, logger: opts.Logger} if c.httpClient == nil { c.httpClient = http.DefaultClient } @@ -76,22 +80,31 @@ type RESTResponse struct { } func (c *Client) DoREST(ctx context.Context, method, path string, q url.Values, body any, etag string, operation string) (RESTResponse, error) { + ctx, _ = scmlog.EnsureCorrelationID(ctx) started := time.Now() + logger := scmlog.Logger(c.logger) + endpoint := githubEndpointTemplate(path) var rdr io.Reader if body != nil { b, err := json.Marshal(body) if err != nil { - return RESTResponse{}, &domain.SCMError{Kind: domain.SCMErrorParse, Operation: operation, Message: err.Error(), Cause: err} + se := &domain.SCMError{Kind: domain.SCMErrorParse, Operation: operation, Message: err.Error(), Cause: err} + logTransportFailure(ctx, logger, operation, method, endpoint, started, 0, nil, false, false, se) + return RESTResponse{}, se } rdr = bytes.NewReader(b) } u, err := c.restURL(path, q) if err != nil { - return RESTResponse{}, &domain.SCMError{Kind: domain.SCMErrorParse, Operation: operation, Message: err.Error(), Cause: err} + se := &domain.SCMError{Kind: domain.SCMErrorParse, Operation: operation, Message: err.Error(), Cause: err} + logTransportFailure(ctx, logger, operation, method, endpoint, started, 0, nil, false, etag != "", se) + return RESTResponse{}, se } req, err := http.NewRequestWithContext(ctx, method, u, rdr) if err != nil { - return RESTResponse{}, &domain.SCMError{Kind: domain.SCMErrorParse, Operation: operation, Message: err.Error(), Cause: err} + se := &domain.SCMError{Kind: domain.SCMErrorParse, Operation: operation, Message: err.Error(), Cause: err} + logTransportFailure(ctx, logger, operation, method, endpoint, started, 0, nil, false, etag != "", se) + return RESTResponse{}, se } if body != nil { req.Header.Set("Content-Type", "application/json") @@ -102,60 +115,89 @@ func (c *Client) DoREST(ctx context.Context, method, path string, q url.Values, if etag != "" { req.Header.Set("If-None-Match", etag) } + logTransportRequest(ctx, logger, operation, method, endpoint, etag != "") if err := c.authorize(ctx, req); err != nil { + logTransportFailure(ctx, logger, operation, method, endpoint, started, 0, nil, false, etag != "", err) return RESTResponse{}, err } resp, err := c.httpClient.Do(req) if err != nil { - return RESTResponse{}, normalizeHTTPError(operation, err) + se := normalizeHTTPError(operation, err) + logTransportFailure(ctx, logger, operation, method, endpoint, started, 0, nil, false, etag != "", se) + return RESTResponse{}, se } defer resp.Body.Close() b, readErr := io.ReadAll(resp.Body) if readErr != nil { - return RESTResponse{}, &domain.SCMError{Kind: domain.SCMErrorNetwork, Operation: operation, Message: readErr.Error(), Cause: readErr} + se := &domain.SCMError{Kind: domain.SCMErrorNetwork, Operation: operation, Message: readErr.Error(), Cause: readErr} + logTransportFailure(ctx, logger, operation, method, endpoint, started, resp.StatusCode, rateLimitFromHeaders(resp.Header), false, etag != "", se) + return RESTResponse{}, se } rl := rateLimitFromHeaders(resp.Header) out := RESTResponse{StatusCode: resp.StatusCode, NotModified: resp.StatusCode == http.StatusNotModified, ETag: resp.Header.Get("ETag"), Body: b, RateLimit: rl, Diagnostic: domain.SCMDiagnostic{Operation: operation, StatusCode: resp.StatusCode, ETag: resp.Header.Get("ETag"), CacheHit: resp.StatusCode == http.StatusNotModified, StartedAt: started, DurationMS: time.Since(started).Milliseconds()}} if resp.StatusCode == http.StatusNotModified { + logTransportResponse(ctx, logger, operation, method, endpoint, started, resp.StatusCode, rl, out.NotModified, out.ETag != "") return out, nil } if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return out, githubStatusError(operation, resp.StatusCode, b, rl) + se := githubStatusError(operation, resp.StatusCode, b, rl) + out.Diagnostic.ErrorKind = se.Kind + out.Diagnostic.Message = scmlog.SafeDiagnosticMessage(se) + logTransportFailure(ctx, logger, operation, method, endpoint, started, resp.StatusCode, rl, false, out.ETag != "" || etag != "", se) + return out, se } + logTransportResponse(ctx, logger, operation, method, endpoint, started, resp.StatusCode, rl, out.NotModified, out.ETag != "") return out, nil } func (c *Client) DoGraphQL(ctx context.Context, query string, variables map[string]any, operation string) (map[string]any, *domain.SCMRateLimit, domain.SCMDiagnostic, error) { + ctx, _ = scmlog.EnsureCorrelationID(ctx) started := time.Now() + logger := scmlog.Logger(c.logger) + const endpoint = "/graphql" body := map[string]any{"query": query, "variables": variables} b, err := json.Marshal(body) if err != nil { - return nil, nil, domain.SCMDiagnostic{}, &domain.SCMError{Kind: domain.SCMErrorParse, Operation: operation, Message: err.Error(), Cause: err} + se := &domain.SCMError{Kind: domain.SCMErrorParse, Operation: operation, Message: err.Error(), Cause: err} + logTransportFailure(ctx, logger, operation, http.MethodPost, endpoint, started, 0, nil, false, false, se) + return nil, nil, domain.SCMDiagnostic{}, se } req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.graphqlURL, bytes.NewReader(b)) if err != nil { - return nil, nil, domain.SCMDiagnostic{}, &domain.SCMError{Kind: domain.SCMErrorParse, Operation: operation, Message: err.Error(), Cause: err} + se := &domain.SCMError{Kind: domain.SCMErrorParse, Operation: operation, Message: err.Error(), Cause: err} + logTransportFailure(ctx, logger, operation, http.MethodPost, endpoint, started, 0, nil, false, false, se) + return nil, nil, domain.SCMDiagnostic{}, se } req.Header.Set("Content-Type", "application/json") req.Header.Set("Accept", "application/json") req.Header.Set("User-Agent", c.userAgent) + logTransportRequest(ctx, logger, operation, http.MethodPost, endpoint, false) if err := c.authorize(ctx, req); err != nil { + logTransportFailure(ctx, logger, operation, http.MethodPost, endpoint, started, 0, nil, false, false, err) return nil, nil, domain.SCMDiagnostic{}, err } resp, err := c.httpClient.Do(req) if err != nil { - return nil, nil, domain.SCMDiagnostic{}, normalizeHTTPError(operation, err) + se := normalizeHTTPError(operation, err) + logTransportFailure(ctx, logger, operation, http.MethodPost, endpoint, started, 0, nil, false, false, se) + return nil, nil, domain.SCMDiagnostic{}, se } defer resp.Body.Close() respBody, readErr := io.ReadAll(resp.Body) if readErr != nil { - return nil, nil, domain.SCMDiagnostic{}, &domain.SCMError{Kind: domain.SCMErrorNetwork, Operation: operation, Message: readErr.Error(), Cause: readErr} + se := &domain.SCMError{Kind: domain.SCMErrorNetwork, Operation: operation, Message: readErr.Error(), Cause: readErr} + logTransportFailure(ctx, logger, operation, http.MethodPost, endpoint, started, resp.StatusCode, rateLimitFromHeaders(resp.Header), false, false, se) + return nil, nil, domain.SCMDiagnostic{}, se } rl := rateLimitFromHeaders(resp.Header) diag := domain.SCMDiagnostic{Operation: operation, StatusCode: resp.StatusCode, StartedAt: started, DurationMS: time.Since(started).Milliseconds()} if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return nil, rl, diag, githubStatusError(operation, resp.StatusCode, respBody, rl) + se := githubStatusError(operation, resp.StatusCode, respBody, rl) + diag.ErrorKind = se.Kind + diag.Message = scmlog.SafeDiagnosticMessage(se) + logTransportFailure(ctx, logger, operation, http.MethodPost, endpoint, started, resp.StatusCode, rl, false, false, se) + return nil, rl, diag, se } var decoded struct { Data map[string]any `json:"data"` @@ -165,7 +207,11 @@ func (c *Client) DoGraphQL(ctx context.Context, query string, variables map[stri } `json:"errors"` } if err := json.Unmarshal(respBody, &decoded); err != nil { - return nil, rl, diag, &domain.SCMError{Kind: domain.SCMErrorParse, Operation: operation, Message: err.Error(), Cause: err} + se := &domain.SCMError{Kind: domain.SCMErrorParse, Operation: operation, Message: err.Error(), Cause: err} + diag.ErrorKind = se.Kind + diag.Message = scmlog.SafeDiagnosticMessage(se) + logTransportFailure(ctx, logger, operation, http.MethodPost, endpoint, started, resp.StatusCode, rl, false, false, se) + return nil, rl, diag, se } if len(decoded.Errors) > 0 { kind := domain.SCMErrorUnavailable @@ -173,9 +219,16 @@ func (c *Client) DoGraphQL(ctx context.Context, query string, variables map[stri if strings.Contains(strings.ToLower(msg), "rate limit") { kind = domain.SCMErrorRateLimited } - return decoded.Data, graphqlRateLimit(decoded.Data, rl), diag, &domain.SCMError{Kind: kind, Operation: operation, Message: msg} - } - return decoded.Data, graphqlRateLimit(decoded.Data, rl), diag, nil + rl = graphqlRateLimit(decoded.Data, rl) + se := &domain.SCMError{Kind: kind, Operation: operation, Message: scmlog.SafeDiagnosticMessage(&domain.SCMError{Kind: kind, Message: msg})} + diag.ErrorKind = se.Kind + diag.Message = scmlog.SafeDiagnosticMessage(se) + logTransportFailure(ctx, logger, operation, http.MethodPost, endpoint, started, resp.StatusCode, rl, false, false, se) + return decoded.Data, rl, diag, se + } + rl = graphqlRateLimit(decoded.Data, rl) + logTransportResponse(ctx, logger, operation, http.MethodPost, endpoint, started, resp.StatusCode, rl, false, false) + return decoded.Data, rl, diag, nil } func (c *Client) authorize(ctx context.Context, req *http.Request) error { @@ -211,7 +264,7 @@ func normalizeHTTPError(operation string, err error) error { return &domain.SCMError{Kind: kind, Operation: operation, Message: err.Error(), Cause: err} } -func githubStatusError(operation string, status int, body []byte, rl *domain.SCMRateLimit) error { +func githubStatusError(operation string, status int, body []byte, rl *domain.SCMRateLimit) *domain.SCMError { kind := domain.SCMErrorUnavailable switch status { case http.StatusUnauthorized, http.StatusForbidden: @@ -225,13 +278,11 @@ func githubStatusError(operation string, status int, body []byte, rl *domain.SCM case http.StatusTooManyRequests: kind = domain.SCMErrorRateLimited } - msg := strings.TrimSpace(string(body)) var gh struct { Message string `json:"message"` } - if json.Unmarshal(body, &gh) == nil && gh.Message != "" { - msg = gh.Message - } + _ = json.Unmarshal(body, &gh) + msg := scmlog.StatusMessage(status, body, gh.Message) se := &domain.SCMError{Kind: kind, Operation: operation, StatusCode: status, Message: msg} if kind == domain.SCMErrorRateLimited && rl != nil { se.RetryAfter = rl.ResetAt @@ -239,6 +290,80 @@ func githubStatusError(operation string, status int, body []byte, rl *domain.SCM return se } +func logTransportRequest(ctx context.Context, logger *slog.Logger, operation, method, endpoint string, etagPresent bool) { + attrs := transportAttrs(ctx, operation, method, endpoint, + slog.Bool(scmlog.FieldETagPresent, etagPresent), + ) + logger.Debug(scmlog.EventTransportRequest, scmlog.Args(attrs)...) +} + +func logTransportResponse(ctx context.Context, logger *slog.Logger, operation, method, endpoint string, started time.Time, status int, rl *domain.SCMRateLimit, cacheHit, etagPresent bool) { + attrs := transportAttrs(ctx, operation, method, endpoint, + slog.Int(scmlog.FieldStatusCode, status), + slog.Int64(scmlog.FieldDurationMS, scmlog.DurationMS(started)), + slog.Bool(scmlog.FieldCacheHit, cacheHit), + slog.Bool(scmlog.FieldETagPresent, etagPresent), + ) + attrs = append(attrs, scmlog.RateLimitAttrs(rl)...) + logger.Debug(scmlog.EventTransportResponse, scmlog.Args(attrs)...) +} + +func logTransportFailure(ctx context.Context, logger *slog.Logger, operation, method, endpoint string, started time.Time, status int, rl *domain.SCMRateLimit, cacheHit, etagPresent bool, err error) { + attrs := transportAttrs(ctx, operation, method, endpoint, + slog.Int64(scmlog.FieldDurationMS, scmlog.DurationMS(started)), + slog.Bool(scmlog.FieldCacheHit, cacheHit), + slog.Bool(scmlog.FieldETagPresent, etagPresent), + ) + if status != 0 { + attrs = append(attrs, slog.Int(scmlog.FieldStatusCode, status)) + } + attrs = append(attrs, scmlog.RateLimitAttrs(rl)...) + attrs = append(attrs, scmlog.ErrorAttrs(err)...) + if scmlog.ErrorKind(err) == domain.SCMErrorRateLimited { + logger.Warn(scmlog.EventTransportRateLimited, scmlog.Args(attrs)...) + return + } + logger.Warn(scmlog.EventTransportFailed, scmlog.Args(attrs)...) +} + +func transportAttrs(ctx context.Context, operation, method, endpoint string, attrs ...slog.Attr) []slog.Attr { + base := []slog.Attr{ + scmlog.CorrelationAttr(ctx), + slog.String(scmlog.FieldProvider, string(domain.SCMProviderGitHub)), + slog.String(scmlog.FieldOperation, operation), + slog.String(scmlog.FieldMethod, method), + slog.String(scmlog.FieldEndpointTemplate, endpoint), + } + return append(base, attrs...) +} + +func githubEndpointTemplate(p string) string { + if p == "" { + return "" + } + parts := strings.Split(strings.Trim(p, "/"), "/") + if len(parts) >= 3 && parts[0] == "repos" { + out := []string{"repos", "{owner}", "{repo}"} + for i := 3; i < len(parts); i++ { + part := parts[i] + prev := "" + if i > 0 { + prev = parts[i-1] + } + switch { + case prev == "pulls" || prev == "issues": + out = append(out, "{number}") + case prev == "commits": + out = append(out, "{ref}") + default: + out = append(out, part) + } + } + return "/" + strings.Join(out, "/") + } + return "/" + strings.Join(parts, "/") +} + func rateLimitFromHeaders(h http.Header) *domain.SCMRateLimit { if h == nil || h.Get("X-RateLimit-Limit") == "" { return nil diff --git a/backend/internal/adapters/scm/github/commands.go b/backend/internal/adapters/scm/github/commands.go index d42fc5ac..15a7b1a3 100644 --- a/backend/internal/adapters/scm/github/commands.go +++ b/backend/internal/adapters/scm/github/commands.go @@ -137,7 +137,7 @@ func (p *Provider) Checkout(ctx context.Context, req ports.SCMCommandRequest) (p res.Message = "checked out with gh pr checkout" return res, nil } else { - return res, &domain.SCMError{Kind: domain.SCMErrorCommand, Operation: "github.command.checkout", Message: fmt.Sprintf("git checkout failed: %v; gh fallback failed: %v", err, ghErr), Cause: err} + return res, &domain.SCMError{Kind: domain.SCMErrorCommand, Operation: "github.command.checkout", Message: "checkout command failed", Cause: fmt.Errorf("git checkout failed: %w; gh fallback failed: %v", err, ghErr)} } } @@ -187,7 +187,7 @@ func (p *Provider) normalizeCommandRequest(ctx context.Context, req ports.SCMCom func commandResult(req ports.SCMCommandRequest, diag domain.SCMDiagnostic) ports.SCMCommandResult { res := ports.SCMCommandResult{Provider: domain.SCMProviderGitHub, Command: req.Command, ChangeRequest: req.ChangeRequest, PerformedAt: req.Now} - if !diag.StartedAt.IsZero() || diag.Operation != "" { + if durableDiagnostic(diag) { res.Diagnostics = []domain.SCMDiagnostic{diag} } return res diff --git a/backend/internal/adapters/scm/github/provider.go b/backend/internal/adapters/scm/github/provider.go index d2765f85..f97c1219 100644 --- a/backend/internal/adapters/scm/github/provider.go +++ b/backend/internal/adapters/scm/github/provider.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "net/http" "net/url" "path" @@ -13,6 +14,7 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/ports" + scmlog "github.com/aoagents/agent-orchestrator/backend/internal/scm/logging" ) const ( @@ -52,12 +54,13 @@ type ProviderOptions struct { RESTBase string GraphQLURL string Host string + Logger *slog.Logger } func NewProvider(opts ProviderOptions) *Provider { p := &Provider{client: opts.Client, host: opts.Host} if p.client == nil { - p.client = NewClient(ClientOptions{HTTPClient: opts.HTTPClient, Token: opts.Token, RESTBase: opts.RESTBase, GraphQLURL: opts.GraphQLURL}) + p.client = NewClient(ClientOptions{HTTPClient: opts.HTTPClient, Token: opts.Token, RESTBase: opts.RESTBase, GraphQLURL: opts.GraphQLURL, Logger: opts.Logger}) } if p.host == "" { p.host = defaultHost @@ -68,6 +71,7 @@ func NewProvider(opts ProviderOptions) *Provider { func (p *Provider) Provider() domain.SCMProvider { return domain.SCMProviderGitHub } func (p *Provider) ObserveSessions(ctx context.Context, req ports.SCMObserveRequest, cache ports.SCMProviderCache) (ports.SCMObserveResult, error) { + ctx, _ = scmlog.EnsureCorrelationID(ctx) now := req.Now if now.IsZero() { now = time.Now() @@ -113,7 +117,7 @@ func (p *Provider) ObserveSessions(ctx context.Context, req ports.SCMObserveRequ st.ConsecutiveFail++ st.LastFailureAt = now st.BackoffUntil = now.Add(30 * time.Second) - if se, ok := groupErr.(*domain.SCMError); ok { + if se, ok := scmlog.SCMError(groupErr); ok { st.LastError = se if se.Kind == domain.SCMErrorRateLimited && !se.RetryAfter.IsZero() { st.RateLimitUntil = se.RetryAfter @@ -358,13 +362,18 @@ func chunkSubjects(subjects []domain.SCMSubject, size int) [][]domain.SCMSubject } func diagnosticFromError(operation string, err error) domain.SCMDiagnostic { - d := domain.SCMDiagnostic{Operation: operation, ErrorKind: domain.SCMErrorUnavailable, Message: fmt.Sprint(err)} - if se, ok := err.(*domain.SCMError); ok { - d.Operation = firstNonEmpty(se.Operation, operation) - d.ErrorKind = se.Kind - d.StatusCode = se.StatusCode + return scmlog.DiagnosticFromError(operation, err) +} + +func appendDiagnostic(diags []domain.SCMDiagnostic, diag domain.SCMDiagnostic) []domain.SCMDiagnostic { + if durableDiagnostic(diag) { + return append(diags, diag) } - return d + return diags +} + +func durableDiagnostic(diag domain.SCMDiagnostic) bool { + return diag.Operation != "" && (diag.ErrorKind != "" || diag.Message != "" || diag.StatusCode >= 400) } func shouldFetchCheckRuns(snap domain.SCMSnapshot, prData map[string]any) bool { @@ -429,9 +438,7 @@ func (p *Provider) observeKnownPRs(ctx context.Context, subjects []domain.SCMSub changed, prListDiag, commit, err := p.checkOpenPullListChanged(ctx, cache, scope, owner, repo, now) prListChanged = changed prListCommit = commit - if prListDiag.Operation != "" { - diags = append(diags, prListDiag) - } + diags = appendDiagnostic(diags, prListDiag) if err != nil { // PR-list guard failures should not be terminal truth. Fall through to // GraphQL so one failed optimization guard does not hide real PR changes. @@ -466,9 +473,7 @@ func (p *Provider) observeKnownPRs(ctx context.Context, subjects []domain.SCMSub continue } changed, diag, commit, err := p.checkRunsChanged(ctx, cache, subj, prev.PR.HeadSHA, now) - if diag.Operation != "" { - diags = append(diags, diag) - } + diags = appendDiagnostic(diags, diag) if err != nil || changed { if err == nil && changed { checkGuardCommits[subj.SessionID] = commit @@ -494,9 +499,7 @@ func (p *Provider) observeKnownPRs(ctx context.Context, subjects []domain.SCMSub mainFetchOK := true for _, batch := range chunkSubjects(toFetch, maxGraphQLBatchSize) { data, rl, diag, err := p.fetchPRBatch(ctx, owner, repo, batch) - if diag.Operation != "" { - diags = append(diags, diag) - } + diags = appendDiagnostic(diags, diag) if rl != nil { lastRL = rl } @@ -507,12 +510,12 @@ func (p *Provider) observeKnownPRs(ctx context.Context, subjects []domain.SCMSub } for _, subj := range batch { if terminal, ok, fallbackDiag, fallbackErr := p.fetchTerminalPRFallback(ctx, subj, now); ok { - if fallbackDiag.Operation != "" { + if durableDiagnostic(fallbackDiag) { terminal.Diagnostics = append(terminal.Diagnostics, fallbackDiag) } snaps = append(snaps, terminal) continue - } else if fallbackDiag.Operation != "" { + } else if durableDiagnostic(fallbackDiag) { diags = append(diags, fallbackDiag) } else if fallbackErr != nil { diags = append(diags, diagnosticFromError("github.rest_pr_fallback", fallbackErr)) @@ -547,9 +550,7 @@ func (p *Provider) observeKnownPRs(ctx context.Context, subjects []domain.SCMSub prev, havePrev := latest[subj.SessionID] if shouldFetchCheckRuns(snap, prData) { checks, checkDiag, err := p.fetchCheckRuns(ctx, cache, subj, snap, now) - if checkDiag.Operation != "" { - snap.Diagnostics = append(snap.Diagnostics, checkDiag) - } + snap.Diagnostics = appendDiagnostic(snap.Diagnostics, checkDiag) if err == nil { snap.CI.Checks = checks snap.CI.Summary = summarizeChecks(checks) @@ -918,9 +919,7 @@ func (p *Provider) fetchReviewThreads(ctx context.Context, cache ports.SCMProvid owner, repo := subj.Repository().OwnerName() resp, err := p.client.DoREST(ctx, http.MethodGet, repoPath(owner, repo, "pulls", strconv.Itoa(subj.PRNumber), "comments"), nil, nil, entry.ETag, "github.review_comments") diags := []domain.SCMDiagnostic{} - if resp.Diagnostic.Operation != "" { - diags = append(diags, resp.Diagnostic) - } + diags = appendDiagnostic(diags, resp.Diagnostic) if err != nil { return nil, diags, false, err } @@ -949,9 +948,7 @@ func (p *Provider) fetchReviewThreadsGraphQL(ctx context.Context, subj domain.SC query := `query($owner:String!,$repo:String!,$number:Int!){ repository(owner:$owner,name:$repo){ pullRequest(number:$number){ reviewThreads(last:100){ nodes{ id isResolved comments(first:100){ nodes{ id author{ login __typename } body path line url } } } } } } rateLimit{ limit remaining resetAt } }` data, _, diag, err := p.client.DoGraphQL(ctx, query, map[string]any{"owner": owner, "repo": repo, "number": subj.PRNumber}, "github.review_threads") diags := []domain.SCMDiagnostic{} - if diag.Operation != "" { - diags = append(diags, diag) - } + diags = appendDiagnostic(diags, diag) if err != nil { return nil, diags, err } @@ -1347,12 +1344,7 @@ func findPullForBranch(pulls []restPull, branch string) (branchMapping, bool) { } func unavailableSnapshot(subj domain.SCMSubject, now time.Time, err error) domain.SCMSnapshot { - d := domain.SCMDiagnostic{Operation: "github.observe", ErrorKind: domain.SCMErrorUnavailable, Message: fmt.Sprint(err)} - if se, ok := err.(*domain.SCMError); ok { - d.Operation = se.Operation - d.ErrorKind = se.Kind - d.StatusCode = se.StatusCode - } + d := scmlog.DiagnosticFromError("github.observe", err) return domain.SCMSnapshot{SessionID: subj.SessionID, Subject: subj, Freshness: domain.SCMFreshnessUnavailable, ObservedAt: now, Diagnostics: []domain.SCMDiagnostic{d}} } diff --git a/backend/internal/adapters/scm/github/provider_test.go b/backend/internal/adapters/scm/github/provider_test.go index 5facc151..b5995dac 100644 --- a/backend/internal/adapters/scm/github/provider_test.go +++ b/backend/internal/adapters/scm/github/provider_test.go @@ -1,10 +1,12 @@ package github import ( + "bytes" "context" "encoding/json" "errors" "fmt" + "log/slog" "net/http" "net/http/httptest" "strconv" @@ -15,6 +17,7 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/ports" + scmlog "github.com/aoagents/agent-orchestrator/backend/internal/scm/logging" "github.com/aoagents/agent-orchestrator/backend/internal/scm/store" ) @@ -47,6 +50,107 @@ func TestRESTETag200And304(t *testing.T) { } } +func TestRESTTransportLogsUseNeutralFieldsAndDoNotLeakPayloads(t *testing.T) { + var logs bytes.Buffer + secretToken := "secret-token" + secretBody := "SECRET_COMMENT_BODY" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if got := r.Header.Get("Authorization"); got != "Bearer "+secretToken { + t.Fatalf("auth header %q", got) + } + w.Header().Set("ETag", `"comment"`) + w.Header().Set("X-RateLimit-Limit", "5000") + w.Header().Set("X-RateLimit-Remaining", "4999") + w.Header().Set("X-RateLimit-Reset", "1780000000") + w.WriteHeader(http.StatusCreated) + _, _ = w.Write([]byte(`{"ok":true}`)) + })) + defer ts.Close() + c := NewClient(ClientOptions{RESTBase: ts.URL, GraphQLURL: ts.URL + "/graphql", Token: StaticTokenSource(secretToken), Logger: githubJSONLogger(&logs)}) + ctx := scmlog.WithCorrelationID(context.Background(), "corr-rest") + resp, err := c.DoREST(ctx, http.MethodPost, "/repos/o/r/issues/45/comments", nil, map[string]any{"body": secretBody}, "", "github.command.comment") + if err != nil || resp.StatusCode != http.StatusCreated { + t.Fatalf("resp=%+v err=%v", resp, err) + } + rawLogs := logs.String() + for _, secret := range []string{secretToken, "Authorization", secretBody} { + if strings.Contains(rawLogs, secret) { + t.Fatalf("transport logs leaked %q: %s", secret, rawLogs) + } + } + records := decodeGithubLogRecords(t, rawLogs) + request := findGithubLogRecord(t, records, scmlog.EventTransportRequest) + assertGithubLogField(t, request, scmlog.FieldCorrelationID, "corr-rest") + assertGithubLogField(t, request, scmlog.FieldProvider, "github") + assertGithubLogField(t, request, scmlog.FieldOperation, "github.command.comment") + assertGithubLogField(t, request, scmlog.FieldMethod, http.MethodPost) + assertGithubLogField(t, request, scmlog.FieldEndpointTemplate, "/repos/{owner}/{repo}/issues/{number}/comments") + response := findGithubLogRecord(t, records, scmlog.EventTransportResponse) + assertGithubLogNumber(t, response, scmlog.FieldStatusCode, http.StatusCreated) + assertGithubLogNumber(t, response, scmlog.FieldRateLimitRemaining, 4999) + assertGithubLogBool(t, response, scmlog.FieldETagPresent, true) + if _, ok := response["url"]; ok { + t.Fatalf("transport log should not include full url: %+v", response) + } +} + +func TestGraphQLTransportLogsDoNotLeakQuery(t *testing.T) { + var logs bytes.Buffer + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/graphql" { + t.Fatalf("unexpected path %s", r.URL.Path) + } + _ = json.NewEncoder(w).Encode(map[string]any{"data": map[string]any{"viewer": map[string]any{"login": "bot"}}}) + })) + defer ts.Close() + c := NewClient(ClientOptions{RESTBase: ts.URL, GraphQLURL: ts.URL + "/graphql", Token: StaticTokenSource("token"), Logger: githubJSONLogger(&logs)}) + secretQueryName := "SecretGraphQLQueryName" + if _, _, _, err := c.DoGraphQL(scmlog.WithCorrelationID(context.Background(), "corr-graphql"), "query "+secretQueryName+" { viewer { login } }", nil, "github.graphql_test"); err != nil { + t.Fatal(err) + } + if strings.Contains(logs.String(), secretQueryName) { + t.Fatalf("GraphQL query leaked into transport logs: %s", logs.String()) + } + response := findGithubLogRecord(t, decodeGithubLogRecords(t, logs.String()), scmlog.EventTransportResponse) + assertGithubLogField(t, response, scmlog.FieldEndpointTemplate, "/graphql") + assertGithubLogField(t, response, scmlog.FieldMethod, http.MethodPost) + assertGithubLogField(t, response, scmlog.FieldCorrelationID, "corr-graphql") +} + +func TestTransportRateLimitLogsNormalizedKindAndHidesResponseBody(t *testing.T) { + var logs bytes.Buffer + secretTail := "CI_LOG_TAIL_SECRET" + reset := time.Date(2026, 5, 28, 13, 0, 0, 0, time.UTC).Unix() + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-RateLimit-Limit", "5000") + w.Header().Set("X-RateLimit-Remaining", "0") + w.Header().Set("X-RateLimit-Reset", strconv.FormatInt(reset, 10)) + w.WriteHeader(http.StatusForbidden) + _, _ = w.Write([]byte(secretTail + "\nline 1\nline 2")) + })) + defer ts.Close() + c := NewClient(ClientOptions{RESTBase: ts.URL, GraphQLURL: ts.URL + "/graphql", Token: StaticTokenSource("token"), Logger: githubJSONLogger(&logs)}) + resp, err := c.DoREST(scmlog.WithCorrelationID(context.Background(), "corr-rate"), http.MethodGet, "/repos/o/r/pulls/45", nil, nil, "", "github.pr_get") + if err == nil { + t.Fatal("expected rate limit error") + } + var scmErr *domain.SCMError + if !errors.As(err, &scmErr) || scmErr.Kind != domain.SCMErrorRateLimited || scmErr.Message != http.StatusText(http.StatusForbidden) { + t.Fatalf("err=%T %[1]v", err) + } + if strings.Contains(resp.Diagnostic.Message, secretTail) { + t.Fatalf("transport diagnostic leaked response body: %+v", resp.Diagnostic) + } + if strings.Contains(logs.String(), secretTail) { + t.Fatalf("response body leaked into transport logs: %s", logs.String()) + } + rateLimited := findGithubLogRecord(t, decodeGithubLogRecords(t, logs.String()), scmlog.EventTransportRateLimited) + assertGithubLogField(t, rateLimited, scmlog.FieldCorrelationID, "corr-rate") + assertGithubLogField(t, rateLimited, scmlog.FieldErrorKind, string(domain.SCMErrorRateLimited)) + assertGithubLogNumber(t, rateLimited, scmlog.FieldStatusCode, http.StatusForbidden) + assertGithubLogNumber(t, rateLimited, scmlog.FieldRateLimitRemaining, 0) +} + func TestGHTokenSourceMemoizesAndIgnoresGithubTokenEnv(t *testing.T) { t.Setenv("GITHUB_TOKEN", "env-token") var calls atomic.Int32 @@ -300,6 +404,9 @@ func TestGraphQLBatchNormalizationReviewAndMergeability(t *testing.T) { if !s.Mergeability.Mergeable { t.Fatalf("expected mergeable: %+v", s.Mergeability) } + if len(s.Diagnostics) != 0 { + t.Fatalf("successful snapshot should not persist transport diagnostics: %+v", s.Diagnostics) + } } func TestCompleteGraphQLCIContextsAvoidFullCheckRunsAndJobLogs(t *testing.T) { @@ -1243,3 +1350,57 @@ func numberedLines(n int) string { } return b.String() } + +func githubJSONLogger(buf *bytes.Buffer) *slog.Logger { + return slog.New(slog.NewJSONHandler(buf, &slog.HandlerOptions{Level: slog.LevelDebug})) +} + +func decodeGithubLogRecords(t *testing.T, raw string) []map[string]any { + t.Helper() + lines := bytes.Split([]byte(raw), []byte("\n")) + records := make([]map[string]any, 0, len(lines)) + for _, line := range lines { + line = bytes.TrimSpace(line) + if len(line) == 0 { + continue + } + var rec map[string]any + if err := json.Unmarshal(line, &rec); err != nil { + t.Fatalf("decode log %s: %v", line, err) + } + records = append(records, rec) + } + return records +} + +func findGithubLogRecord(t *testing.T, records []map[string]any, msg string) map[string]any { + t.Helper() + for _, rec := range records { + if rec["msg"] == msg { + return rec + } + } + t.Fatalf("missing log %q in %+v", msg, records) + return nil +} + +func assertGithubLogField(t *testing.T, rec map[string]any, key, want string) { + t.Helper() + if got, _ := rec[key].(string); got != want { + t.Fatalf("%s=%q want %q in %+v", key, got, want, rec) + } +} + +func assertGithubLogNumber(t *testing.T, rec map[string]any, key string, want float64) { + t.Helper() + if got, _ := rec[key].(float64); got != want { + t.Fatalf("%s=%v want %v in %+v", key, rec[key], want, rec) + } +} + +func assertGithubLogBool(t *testing.T, rec map[string]any, key string, want bool) { + t.Helper() + if got, _ := rec[key].(bool); got != want { + t.Fatalf("%s=%v want %v in %+v", key, rec[key], want, rec) + } +} diff --git a/backend/internal/scm/command/service.go b/backend/internal/scm/command/service.go index 994ba977..175571ee 100644 --- a/backend/internal/scm/command/service.go +++ b/backend/internal/scm/command/service.go @@ -3,10 +3,12 @@ package command import ( "context" "fmt" + "log/slog" "time" "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/ports" + scmlog "github.com/aoagents/agent-orchestrator/backend/internal/scm/logging" ) // AuditSink records command attempts. It can be backed by the future durable @@ -32,6 +34,7 @@ type Service struct { Audit AuditSink Refresh Refresher Clock func() time.Time + Logger *slog.Logger } func New(store ports.SCMStore, refresh Refresher, providers ...ports.SCMCommandProvider) *Service { @@ -65,94 +68,205 @@ func (s *Service) CheckoutChangeRequest(ctx context.Context, sessionID domain.Se } func (s *Service) run(ctx context.Context, sessionID domain.SessionID, cmd ports.SCMCommand, req ports.SCMCommandRequest) (ports.SCMCommandResult, error) { + ctx, _ = scmlog.EnsureCorrelationID(ctx) + logger := scmlog.Logger(s.Logger) + started := time.Now() if s.Store == nil { - return ports.SCMCommandResult{}, fmt.Errorf("scm command: nil store") + err := fmt.Errorf("scm command: nil store") + logCommandFailed(ctx, logger, ports.SCMCommandRequest{Command: cmd}, scmlog.DurationMS(started), err) + return ports.SCMCommandResult{}, err } if s.Clock == nil { s.Clock = time.Now } subj, ok, err := s.Store.GetSubject(ctx, sessionID) if err != nil { + logCommandFailed(ctx, logger, ports.SCMCommandRequest{Command: cmd, Subject: domain.SCMSubject{SessionID: sessionID}}, scmlog.DurationMS(started), err) return ports.SCMCommandResult{}, err } if !ok { - return ports.SCMCommandResult{}, fmt.Errorf("scm command: subject %s not found", sessionID) + err := fmt.Errorf("scm command: subject %s not found", sessionID) + logCommandFailed(ctx, logger, ports.SCMCommandRequest{Command: cmd, Subject: domain.SCMSubject{SessionID: sessionID}}, scmlog.DurationMS(started), err) + return ports.SCMCommandResult{}, err } + req.Subject = subj + req.ChangeRequest = subj.ChangeRequestID() + req.Command = cmd + req.Now = s.Clock() if subj.PRNumber == 0 { - return ports.SCMCommandResult{}, &domain.SCMError{Kind: domain.SCMErrorNotFound, Operation: string(cmd), Message: "no change request bound to session"} + err := &domain.SCMError{Kind: domain.SCMErrorNotFound, Operation: string(cmd), Message: "no change request bound to session"} + res := commandResultForError(req, err) + logCommandFailed(ctx, logger, req, scmlog.DurationMS(started), err) + return res, err } provider := s.Providers[subj.Provider] if provider == nil { - return ports.SCMCommandResult{}, &domain.SCMError{Kind: domain.SCMErrorUnsupported, Operation: string(cmd), Message: fmt.Sprintf("provider %q not registered", subj.Provider)} + err := &domain.SCMError{Kind: domain.SCMErrorUnsupported, Operation: string(cmd), Message: fmt.Sprintf("provider %q not registered", subj.Provider)} + res := commandResultForError(req, err) + logCommandFailed(ctx, logger, req, scmlog.DurationMS(started), err) + return res, err } - req.Subject = subj - req.ChangeRequest = subj.ChangeRequestID() - req.Command = cmd - req.Now = s.Clock() + logger.Info(scmlog.EventCommandStarted, scmlog.Args(scmlog.Add(scmlog.CommandAttrs(req), scmlog.CorrelationAttr(ctx)))...) var res ports.SCMCommandResult switch cmd { case ports.SCMCommandMerge: if !provider.Capabilities().Merge { - return res, capabilityError(cmd) + err = capabilityError(cmd) + res = commandResultForError(req, err) + break } res, err = provider.Merge(ctx, req) case ports.SCMCommandClose: if !provider.Capabilities().Close { - return res, capabilityError(cmd) + err = capabilityError(cmd) + res = commandResultForError(req, err) + break } res, err = provider.Close(ctx, req) case ports.SCMCommandComment: if !provider.Capabilities().Comment { - return res, capabilityError(cmd) + err = capabilityError(cmd) + res = commandResultForError(req, err) + break } res, err = provider.Comment(ctx, req) case ports.SCMCommandAssign: if !provider.Capabilities().Assign { - return res, capabilityError(cmd) + err = capabilityError(cmd) + res = commandResultForError(req, err) + break } res, err = provider.Assign(ctx, req) case ports.SCMCommandCheckout: if !provider.Capabilities().Checkout { - return res, capabilityError(cmd) + err = capabilityError(cmd) + res = commandResultForError(req, err) + break } res, err = provider.Checkout(ctx, req) default: err = capabilityError(cmd) + res = commandResultForError(req, err) + } + res = normalizeResult(res, req) + if err != nil { + res = attachErrorDiagnostic(res, req, err) } if s.Audit != nil { - _ = s.Audit.RecordSCMCommand(ctx, res, err) + if auditErr := s.Audit.RecordSCMCommand(ctx, res, err); auditErr != nil { + attrs := scmlog.Add(scmlog.CommandAttrs(req), scmlog.CorrelationAttr(ctx)) + attrs = append(attrs, scmlog.ErrorAttrs(auditErr)...) + logger.Warn(scmlog.EventCommandAuditFailed, scmlog.Args(attrs)...) + } } if err != nil { + logCommandFailed(ctx, logger, req, scmlog.DurationMS(started), err) return res, err } + logCommandCompleted(ctx, logger, req, scmlog.DurationMS(started)) if cmd == ports.SCMCommandCheckout { return res, nil } - if err := s.invalidateAfterCommand(ctx, provider, subj, cmd); err != nil { + if invalidated, err := s.invalidateAfterCommand(ctx, provider, subj, cmd); err != nil { + logCommandFailed(ctx, logger, req, scmlog.DurationMS(started), err) return res, err + } else if invalidated > 0 { + attrs := scmlog.Add(scmlog.CommandAttrs(req), + scmlog.CorrelationAttr(ctx), + slog.Int("cache_prefix_count", invalidated), + ) + logger.Debug(scmlog.EventCommandCacheInvalid, scmlog.Args(attrs)...) } if s.Refresh != nil { if err := s.Refresh.Refresh(ctx, []domain.SCMSubject{subj}); err != nil { + attrs := scmlog.Add(scmlog.CommandAttrs(req), + scmlog.CorrelationAttr(ctx), + slog.Int64(scmlog.FieldDurationMS, scmlog.DurationMS(started)), + ) + attrs = append(attrs, scmlog.ErrorAttrs(err)...) + logger.Warn(scmlog.EventCommandRefreshFailed, scmlog.Args(attrs)...) return res, err } } return res, nil } -func (s *Service) invalidateAfterCommand(ctx context.Context, provider ports.SCMCommandProvider, subj domain.SCMSubject, cmd ports.SCMCommand) error { +func (s *Service) invalidateAfterCommand(ctx context.Context, provider ports.SCMCommandProvider, subj domain.SCMSubject, cmd ports.SCMCommand) (int, error) { invalidator, ok := provider.(cacheInvalidationProvider) if !ok { - return nil + return 0, nil } prefixes := invalidator.CacheInvalidationPrefixes(subj, cmd) for _, p := range prefixes { if err := s.Store.DeleteProviderCache(ctx, p); err != nil { - return err + return 0, err } } - return nil + return len(prefixes), nil } func capabilityError(cmd ports.SCMCommand) error { return &domain.SCMError{Kind: domain.SCMErrorUnsupported, Operation: string(cmd), Message: "command unsupported by provider"} } + +func normalizeResult(res ports.SCMCommandResult, req ports.SCMCommandRequest) ports.SCMCommandResult { + if res.Provider == "" { + res.Provider = req.Subject.Provider + } + if res.Command == "" { + res.Command = req.Command + } + if res.ChangeRequest.Number == 0 { + res.ChangeRequest = req.ChangeRequest + } + if res.PerformedAt.IsZero() { + res.PerformedAt = req.Now + } + return res +} + +func commandResultForError(req ports.SCMCommandRequest, err error) ports.SCMCommandResult { + return attachErrorDiagnostic(normalizeResult(ports.SCMCommandResult{}, req), req, err) +} + +func attachErrorDiagnostic(res ports.SCMCommandResult, req ports.SCMCommandRequest, err error) ports.SCMCommandResult { + if len(res.Diagnostics) == 0 { + res.Diagnostics = append(res.Diagnostics, scmlog.DiagnosticFromError(string(req.Command), err)) + return res + } + for i := range res.Diagnostics { + if res.Diagnostics[i].ErrorKind == "" { + res.Diagnostics[i].ErrorKind = scmlog.ErrorKind(err) + } + if res.Diagnostics[i].StatusCode == 0 { + if se, ok := scmlog.SCMError(err); ok { + res.Diagnostics[i].StatusCode = se.StatusCode + } + } + if res.Diagnostics[i].Message == "" { + res.Diagnostics[i].Message = scmlog.SafeDiagnosticMessage(err) + } + } + return res +} + +func logCommandCompleted(ctx context.Context, logger *slog.Logger, req ports.SCMCommandRequest, durationMS int64) { + attrs := scmlog.Add(scmlog.CommandAttrs(req), + scmlog.CorrelationAttr(ctx), + slog.Int64(scmlog.FieldDurationMS, durationMS), + ) + logger.Info(scmlog.EventCommandCompleted, scmlog.Args(attrs)...) +} + +func logCommandFailed(ctx context.Context, logger *slog.Logger, req ports.SCMCommandRequest, durationMS int64, err error) { + attrs := scmlog.Add(scmlog.CommandAttrs(req), + scmlog.CorrelationAttr(ctx), + slog.Int64(scmlog.FieldDurationMS, durationMS), + ) + attrs = append(attrs, scmlog.ErrorAttrs(err)...) + if _, ok := scmlog.SCMError(err); ok { + logger.Warn(scmlog.EventCommandFailed, scmlog.Args(attrs)...) + return + } + logger.Error(scmlog.EventCommandFailed, scmlog.Args(attrs)...) +} diff --git a/backend/internal/scm/command/service_test.go b/backend/internal/scm/command/service_test.go index bca553b9..6e4b25ec 100644 --- a/backend/internal/scm/command/service_test.go +++ b/backend/internal/scm/command/service_test.go @@ -1,18 +1,25 @@ package command import ( + "bytes" "context" + "encoding/json" "errors" + "fmt" + "log/slog" + "strings" "testing" "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/ports" + scmlog "github.com/aoagents/agent-orchestrator/backend/internal/scm/logging" "github.com/aoagents/agent-orchestrator/backend/internal/scm/store" ) type fakeCommandProvider struct { called ports.SCMCommand invalidations []domain.SCMProviderCachePrefix + err error } func (f *fakeCommandProvider) Provider() domain.SCMProvider { return domain.SCMProviderGitHub } @@ -21,33 +28,46 @@ func (f *fakeCommandProvider) Capabilities() ports.SCMCommandCapabilities { } func (f *fakeCommandProvider) Merge(_ context.Context, r ports.SCMCommandRequest) (ports.SCMCommandResult, error) { f.called = ports.SCMCommandMerge - return ports.SCMCommandResult{Provider: domain.SCMProviderGitHub, Command: r.Command, ChangeRequest: r.ChangeRequest}, nil + return ports.SCMCommandResult{Provider: domain.SCMProviderGitHub, Command: r.Command, ChangeRequest: r.ChangeRequest}, f.err } func (f *fakeCommandProvider) Close(context.Context, ports.SCMCommandRequest) (ports.SCMCommandResult, error) { f.called = ports.SCMCommandClose - return ports.SCMCommandResult{}, nil + return ports.SCMCommandResult{}, f.err } func (f *fakeCommandProvider) Comment(context.Context, ports.SCMCommandRequest) (ports.SCMCommandResult, error) { f.called = ports.SCMCommandComment - return ports.SCMCommandResult{}, nil + return ports.SCMCommandResult{}, f.err } func (f *fakeCommandProvider) Assign(context.Context, ports.SCMCommandRequest) (ports.SCMCommandResult, error) { f.called = ports.SCMCommandAssign - return ports.SCMCommandResult{}, nil + return ports.SCMCommandResult{}, f.err } func (f *fakeCommandProvider) Checkout(_ context.Context, r ports.SCMCommandRequest) (ports.SCMCommandResult, error) { f.called = ports.SCMCommandCheckout - return ports.SCMCommandResult{Provider: domain.SCMProviderGitHub, Command: r.Command, ChangeRequest: r.ChangeRequest}, nil + return ports.SCMCommandResult{Provider: domain.SCMProviderGitHub, Command: r.Command, ChangeRequest: r.ChangeRequest}, f.err } func (f *fakeCommandProvider) CacheInvalidationPrefixes(domain.SCMSubject, ports.SCMCommand) []domain.SCMProviderCachePrefix { return f.invalidations } -type fakeRefresh struct{ called bool } +type fakeRefresh struct { + called bool + err error +} func (f *fakeRefresh) Refresh(context.Context, []domain.SCMSubject) error { f.called = true - return nil + return f.err +} + +type fakeAudit struct { + called bool + err error +} + +func (f *fakeAudit) RecordSCMCommand(context.Context, ports.SCMCommandResult, error) error { + f.called = true + return f.err } func TestMergeInvalidatesProviderCacheAndRefreshes(t *testing.T) { @@ -79,6 +99,99 @@ func TestMergeInvalidatesProviderCacheAndRefreshes(t *testing.T) { } } +func TestCommandServiceLogsStartedCompletedAndCacheInvalidated(t *testing.T) { + ctx := scmlog.WithCorrelationID(context.Background(), "corr-command") + st := store.NewMemoryStore() + subj := domain.SCMSubject{SessionID: "s1", ProjectID: "p1", Provider: domain.SCMProviderGitHub, Host: "github.com", Repo: "o/r", Branch: "feat/45", PRNumber: 45, CredentialHash: "cred"} + if err := st.UpsertSubject(ctx, subj); err != nil { + t.Fatal(err) + } + key := domain.SCMProviderCacheKey{SCMProviderCacheScope: subj.CacheScope(), Namespace: "provider-checks", Key: "sha"} + if err := st.PutProviderCache(ctx, domain.SCMProviderCacheEntry{Key: key, ETag: "etag"}); err != nil { + t.Fatal(err) + } + var logs bytes.Buffer + provider := &fakeCommandProvider{invalidations: []domain.SCMProviderCachePrefix{{SCMProviderCacheScope: subj.CacheScope(), Namespace: "provider-checks"}}} + svc := New(st, nil, provider) + svc.Logger = jsonLogger(&logs) + if _, err := svc.MergeChangeRequest(ctx, "s1", ports.SCMCommandRequest{Actor: "ao"}); err != nil { + t.Fatal(err) + } + records := decodeLogRecords(t, logs.String()) + started := findLogRecord(t, records, scmlog.EventCommandStarted) + assertLogField(t, started, scmlog.FieldCorrelationID, "corr-command") + assertLogField(t, started, scmlog.FieldProvider, "github") + assertLogField(t, started, scmlog.FieldRepo, "o/r") + assertLogField(t, started, scmlog.FieldCommand, string(ports.SCMCommandMerge)) + assertLogField(t, started, scmlog.FieldActor, "ao") + assertLogNumber(t, started, scmlog.FieldChangeRequestNumber, 45) + if _, ok := started["pr_number"]; ok { + t.Fatal("command log used provider-specific pr_number field") + } + completed := findLogRecord(t, records, scmlog.EventCommandCompleted) + assertLogNumber(t, completed, scmlog.FieldChangeRequestNumber, 45) + findLogRecord(t, records, scmlog.EventCommandCacheInvalid) +} + +func TestCommandServiceLogsFailuresWithoutCommentBodyAndAttachesDiagnostic(t *testing.T) { + ctx := scmlog.WithCorrelationID(context.Background(), "corr-command-fail") + st := store.NewMemoryStore() + subj := domain.SCMSubject{SessionID: "s1", ProjectID: "p1", Provider: domain.SCMProviderGitHub, Host: "github.com", Repo: "o/r", Branch: "feat/45", PRNumber: 45} + if err := st.UpsertSubject(ctx, subj); err != nil { + t.Fatal(err) + } + commentBody := "SECRET_COMMENT_BODY" + providerErr := &domain.SCMError{Kind: domain.SCMErrorAuthFailed, Operation: "github.command.comment", StatusCode: 401, Message: "bad credentials"} + var logs bytes.Buffer + svc := New(st, nil, &fakeCommandProvider{err: providerErr}) + svc.Logger = jsonLogger(&logs) + res, err := svc.CommentOnChangeRequest(ctx, "s1", commentBody) + if err == nil { + t.Fatal("expected command error") + } + records := decodeLogRecords(t, logs.String()) + failed := findLogRecord(t, records, scmlog.EventCommandFailed) + assertLogField(t, failed, scmlog.FieldCorrelationID, "corr-command-fail") + assertLogField(t, failed, scmlog.FieldErrorKind, string(domain.SCMErrorAuthFailed)) + assertLogNumber(t, failed, scmlog.FieldStatusCode, 401) + if strings.Contains(logs.String(), commentBody) { + t.Fatalf("comment body leaked into command logs: %s", logs.String()) + } + if len(res.Diagnostics) != 1 || res.Diagnostics[0].ErrorKind != domain.SCMErrorAuthFailed || res.Diagnostics[0].StatusCode != 401 { + t.Fatalf("bad command diagnostics: %+v", res.Diagnostics) + } +} + +func TestCommandServiceLogsAuditAndRefreshFailuresSeparately(t *testing.T) { + ctx := scmlog.WithCorrelationID(context.Background(), "corr-command-after") + st := store.NewMemoryStore() + subj := domain.SCMSubject{SessionID: "s1", ProjectID: "p1", Provider: domain.SCMProviderGitHub, Host: "github.com", Repo: "o/r", Branch: "feat/45", PRNumber: 45} + if err := st.UpsertSubject(ctx, subj); err != nil { + t.Fatal(err) + } + var logs bytes.Buffer + audit := &fakeAudit{err: fmt.Errorf("audit down")} + refresh := &fakeRefresh{err: &domain.SCMError{Kind: domain.SCMErrorNetwork, Operation: "observe", Message: "refresh down"}} + svc := New(st, refresh, &fakeCommandProvider{}) + svc.Audit = audit + svc.Logger = jsonLogger(&logs) + res, err := svc.MergeChangeRequest(ctx, "s1", ports.SCMCommandRequest{}) + if err == nil { + t.Fatal("expected refresh error") + } + if !audit.called || !refresh.called { + t.Fatalf("audit=%v refresh=%v", audit.called, refresh.called) + } + if res.Command != ports.SCMCommandMerge || res.ChangeRequest.Number != 45 { + t.Fatalf("command result hidden by refresh failure: %+v", res) + } + records := decodeLogRecords(t, logs.String()) + findLogRecord(t, records, scmlog.EventCommandCompleted) + findLogRecord(t, records, scmlog.EventCommandAuditFailed) + refreshFailed := findLogRecord(t, records, scmlog.EventCommandRefreshFailed) + assertLogField(t, refreshFailed, scmlog.FieldErrorKind, string(domain.SCMErrorNetwork)) +} + func TestCheckoutDoesNotInvalidateOrRefreshProviderCache(t *testing.T) { ctx := context.Background() st := store.NewMemoryStore() @@ -156,3 +269,50 @@ func TestCommandRejectsSessionWithoutBoundChangeRequest(t *testing.T) { t.Fatalf("provider should not be called, got %s", provider.called) } } + +func jsonLogger(buf *bytes.Buffer) *slog.Logger { + return slog.New(slog.NewJSONHandler(buf, &slog.HandlerOptions{Level: slog.LevelDebug})) +} + +func decodeLogRecords(t *testing.T, raw string) []map[string]any { + t.Helper() + lines := bytes.Split([]byte(raw), []byte("\n")) + records := make([]map[string]any, 0, len(lines)) + for _, line := range lines { + line = bytes.TrimSpace(line) + if len(line) == 0 { + continue + } + var rec map[string]any + if err := json.Unmarshal(line, &rec); err != nil { + t.Fatalf("decode log %s: %v", line, err) + } + records = append(records, rec) + } + return records +} + +func findLogRecord(t *testing.T, records []map[string]any, msg string) map[string]any { + t.Helper() + for _, rec := range records { + if rec["msg"] == msg { + return rec + } + } + t.Fatalf("missing log %q in %+v", msg, records) + return nil +} + +func assertLogField(t *testing.T, rec map[string]any, key, want string) { + t.Helper() + if got, _ := rec[key].(string); got != want { + t.Fatalf("%s=%q want %q in %+v", key, got, want, rec) + } +} + +func assertLogNumber(t *testing.T, rec map[string]any, key string, want float64) { + t.Helper() + if got, _ := rec[key].(float64); got != want { + t.Fatalf("%s=%v want %v in %+v", key, rec[key], want, rec) + } +} diff --git a/backend/internal/scm/logging/logging.go b/backend/internal/scm/logging/logging.go new file mode 100644 index 00000000..6ff1a15b --- /dev/null +++ b/backend/internal/scm/logging/logging.go @@ -0,0 +1,337 @@ +package logging + +import ( + "context" + "crypto/rand" + "encoding/hex" + "errors" + "log/slog" + "net/http" + "strings" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +const ( + FieldCorrelationID = "correlation_id" + FieldProvider = "provider" + FieldHost = "host" + FieldRepo = "repo" + FieldProjectID = "project_id" + FieldSessionID = "session_id" + FieldSessionCount = "session_count" + FieldSnapshotCount = "snapshot_count" + FieldChangedCount = "changed_count" + FieldChangeRequestNumber = "change_request_number" + FieldOperation = "operation" + FieldCommand = "command" + FieldActor = "actor" + FieldFreshness = "freshness" + FieldDurationMS = "duration_ms" + FieldMethod = "method" + FieldEndpointTemplate = "endpoint_template" + FieldStatusCode = "status_code" + FieldErrorKind = "error_kind" + FieldCacheHit = "cache_hit" + FieldETagPresent = "etag_present" + FieldRateLimitLimit = "rate_limit_limit" + FieldRateLimitRemaining = "rate_limit_remaining" + FieldRateLimitReset = "rate_limit_reset" + FieldBackoffUntil = "backoff_until" + FieldRateLimitUntil = "rate_limit_until" +) + +const ( + EventObserveStarted = "scm.observe.started" + EventObserveCompleted = "scm.observe.completed" + EventObserveFailed = "scm.observe.failed" + EventSnapshotSaved = "scm.snapshot.saved" + EventSnapshotUnchanged = "scm.snapshot.unchanged" + EventSnapshotUnavailable = "scm.snapshot.unavailable" + EventCommandStarted = "scm.command.started" + EventCommandCompleted = "scm.command.completed" + EventCommandFailed = "scm.command.failed" + EventCommandCacheInvalid = "scm.command.cache_invalidated" + EventCommandRefreshFailed = "scm.command.refresh_failed" + EventCommandAuditFailed = "scm.command.audit_failed" + EventTransportRequest = "scm.transport.request" + EventTransportResponse = "scm.transport.response" + EventTransportFailed = "scm.transport.failed" + EventTransportRateLimited = "scm.transport.rate_limited" +) + +const maxDiagnosticMessage = 300 + +type correlationKey struct{} + +// Logger returns the supplied logger or slog.Default when nil. +func Logger(logger *slog.Logger) *slog.Logger { + if logger != nil { + return logger + } + return slog.Default() +} + +// WithCorrelationID stores an already-safe correlation id in ctx. +func WithCorrelationID(ctx context.Context, id string) context.Context { + id = strings.TrimSpace(id) + if id == "" { + return ctx + } + return context.WithValue(ctx, correlationKey{}, id) +} + +// CorrelationID returns the SCM correlation id from ctx, if any. +func CorrelationID(ctx context.Context) string { + if ctx == nil { + return "" + } + id, _ := ctx.Value(correlationKey{}).(string) + return id +} + +// EnsureCorrelationID returns a context carrying a correlation id and that id. +func EnsureCorrelationID(ctx context.Context) (context.Context, string) { + if ctx == nil { + ctx = context.Background() + } + if id := CorrelationID(ctx); id != "" { + return ctx, id + } + id := newCorrelationID() + return WithCorrelationID(ctx, id), id +} + +func newCorrelationID() string { + var b [16]byte + if _, err := rand.Read(b[:]); err == nil { + return hex.EncodeToString(b[:]) + } + return hex.EncodeToString([]byte(time.Now().UTC().Format("20060102150405.000000000"))) +} + +func CorrelationAttr(ctx context.Context) slog.Attr { + return slog.String(FieldCorrelationID, CorrelationID(ctx)) +} + +func SubjectAttrs(subj domain.SCMSubject) []slog.Attr { + attrs := []slog.Attr{ + slog.String(FieldProvider, string(subj.Provider)), + slog.String(FieldHost, subj.Host), + slog.String(FieldRepo, subj.Repo), + slog.String(FieldProjectID, string(subj.ProjectID)), + } + if subj.SessionID != "" { + attrs = append(attrs, slog.String(FieldSessionID, string(subj.SessionID))) + } + if subj.PRNumber > 0 { + attrs = append(attrs, slog.Int(FieldChangeRequestNumber, subj.PRNumber)) + } + return attrs +} + +func RepositoryAttrs(provider domain.SCMProvider, host, repo string, projectID domain.ProjectID) []slog.Attr { + return []slog.Attr{ + slog.String(FieldProvider, string(provider)), + slog.String(FieldHost, host), + slog.String(FieldRepo, repo), + slog.String(FieldProjectID, string(projectID)), + } +} + +func SnapshotAttrs(snap domain.SCMSnapshot) []slog.Attr { + attrs := SubjectAttrs(snap.Subject) + if snap.SessionID != "" && snap.Subject.SessionID == "" { + attrs = append(attrs, slog.String(FieldSessionID, string(snap.SessionID))) + } + if snap.PR != nil && snap.PR.Number > 0 && snap.Subject.PRNumber == 0 { + attrs = append(attrs, slog.Int(FieldChangeRequestNumber, snap.PR.Number)) + } + if snap.Freshness != "" { + attrs = append(attrs, slog.String(FieldFreshness, string(snap.Freshness))) + } + return attrs +} + +func CommandAttrs(req ports.SCMCommandRequest) []slog.Attr { + attrs := SubjectAttrs(req.Subject) + attrs = append(attrs, slog.String(FieldCommand, string(req.Command))) + if req.Actor != "" { + attrs = append(attrs, slog.String(FieldActor, req.Actor)) + } + if req.ChangeRequest.Number > 0 && req.Subject.PRNumber != req.ChangeRequest.Number { + attrs = append(attrs, slog.Int(FieldChangeRequestNumber, req.ChangeRequest.Number)) + } + return attrs +} + +func RateLimitAttrs(rl *domain.SCMRateLimit) []slog.Attr { + if rl == nil { + return nil + } + attrs := []slog.Attr{ + slog.Int(FieldRateLimitLimit, rl.Limit), + slog.Int(FieldRateLimitRemaining, rl.Remaining), + } + if !rl.ResetAt.IsZero() { + attrs = append(attrs, slog.Time(FieldRateLimitReset, rl.ResetAt)) + } + return attrs +} + +func PollStateAttrs(states []domain.SCMPollState) []slog.Attr { + var backoff time.Time + var rateLimit time.Time + for _, st := range states { + if !st.BackoffUntil.IsZero() && (backoff.IsZero() || st.BackoffUntil.Before(backoff)) { + backoff = st.BackoffUntil + } + if !st.RateLimitUntil.IsZero() && (rateLimit.IsZero() || st.RateLimitUntil.Before(rateLimit)) { + rateLimit = st.RateLimitUntil + } + } + attrs := []slog.Attr{} + if !backoff.IsZero() { + attrs = append(attrs, slog.Time(FieldBackoffUntil, backoff)) + } + if !rateLimit.IsZero() { + attrs = append(attrs, slog.Time(FieldRateLimitUntil, rateLimit)) + } + return attrs +} + +func ErrorAttrs(err error) []slog.Attr { + kind := ErrorKind(err) + attrs := []slog.Attr{} + if kind != "" { + attrs = append(attrs, slog.String(FieldErrorKind, string(kind))) + } + var se *domain.SCMError + if errors.As(err, &se) { + if se.StatusCode != 0 { + attrs = append(attrs, slog.Int(FieldStatusCode, se.StatusCode)) + } + if !se.RetryAfter.IsZero() { + attrs = append(attrs, slog.Time(FieldRateLimitUntil, se.RetryAfter)) + } + } + return attrs +} + +func ErrorKind(err error) domain.SCMErrorKind { + if err == nil { + return "" + } + var se *domain.SCMError + if errors.As(err, &se) && se.Kind != "" { + return se.Kind + } + return domain.SCMErrorUnavailable +} + +func SCMError(err error) (*domain.SCMError, bool) { + var se *domain.SCMError + if errors.As(err, &se) { + return se, true + } + return nil, false +} + +func DurationMS(started time.Time) int64 { + if started.IsZero() { + return 0 + } + return time.Since(started).Milliseconds() +} + +func Freshness(snapshots []domain.SCMSnapshot, unavailable bool) domain.SCMFreshness { + if unavailable { + return domain.SCMFreshnessUnavailable + } + if len(snapshots) == 0 { + return "" + } + freshness := snapshots[0].Freshness + for _, snap := range snapshots[1:] { + if snap.Freshness != freshness { + return "mixed" + } + } + return freshness +} + +func DiagnosticFromError(operation string, err error) domain.SCMDiagnostic { + d := domain.SCMDiagnostic{Operation: operation, ErrorKind: domain.SCMErrorUnavailable, Message: SafeDiagnosticMessage(err)} + var se *domain.SCMError + if errors.As(err, &se) { + d.Operation = firstNonEmpty(se.Operation, operation) + d.ErrorKind = se.Kind + d.StatusCode = se.StatusCode + d.Message = SafeDiagnosticMessage(se) + } + return d +} + +func SafeDiagnosticMessage(err error) string { + if err == nil { + return "" + } + var se *domain.SCMError + if errors.As(err, &se) { + msg := se.Message + if msg == "" && se.StatusCode != 0 { + msg = http.StatusText(se.StatusCode) + } + return truncateOneLine(msg, maxDiagnosticMessage) + } + return "operation failed" +} + +func StatusMessage(status int, _ []byte, jsonMessage string) string { + if strings.TrimSpace(jsonMessage) != "" { + return truncateOneLine(jsonMessage, maxDiagnosticMessage) + } + if text := http.StatusText(status); text != "" { + return text + } + return "provider request failed" +} + +func Add(attrs []slog.Attr, more ...slog.Attr) []slog.Attr { + return append(attrs, more...) +} + +func Args(attrs []slog.Attr) []any { + args := make([]any, 0, len(attrs)) + for _, attr := range attrs { + if attr.Key == "" { + continue + } + args = append(args, attr) + } + return args +} + +func truncateOneLine(s string, limit int) string { + s = strings.TrimSpace(s) + s = strings.Join(strings.Fields(s), " ") + if limit <= 0 || len(s) <= limit { + return s + } + if limit <= 1 { + return s[:limit] + } + if limit <= 3 { + return s[:limit] + } + return s[:limit-3] + "..." +} + +func firstNonEmpty(a, b string) string { + if a != "" { + return a + } + return b +} diff --git a/backend/internal/scm/observer/observer.go b/backend/internal/scm/observer/observer.go index eaa341ca..5eced1f2 100644 --- a/backend/internal/scm/observer/observer.go +++ b/backend/internal/scm/observer/observer.go @@ -3,6 +3,7 @@ package observer import ( "context" "fmt" + "log/slog" "net/url" "strconv" "strings" @@ -10,6 +11,7 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/ports" + scmlog "github.com/aoagents/agent-orchestrator/backend/internal/scm/logging" ) // Observer coordinates provider polling, durable snapshot writes, event fanout @@ -19,6 +21,7 @@ type Observer struct { LCM ports.LifecycleManager Providers map[domain.SCMProvider]ports.SCMProvider Clock func() time.Time + Logger *slog.Logger // OnSnapshot is the first-pass in-process fanout hook for API/dashboard live // updates. Durable outbox/replay can replace it later without changing @@ -58,13 +61,17 @@ func (o *Observer) Invalidate(ctx context.Context, subject domain.SCMSubject, re } func (o *Observer) Refresh(ctx context.Context, subjects []domain.SCMSubject) error { + ctx, _ = scmlog.EnsureCorrelationID(ctx) + logger := scmlog.Logger(o.Logger) if o.Store == nil { - return fmt.Errorf("scm observer: nil store") + err := fmt.Errorf("scm observer: nil store") + logger.Error(scmlog.EventObserveFailed, scmlog.Args(scmlog.Add(scmlog.ErrorAttrs(err), scmlog.CorrelationAttr(ctx)))...) + return err } if o.Clock == nil { o.Clock = time.Now } - byProvider := map[domain.SCMProvider][]domain.SCMSubject{} + byGroup := map[observeGroupKey][]domain.SCMSubject{} defaultProvider, hasDefaultProvider := o.singleProvider() var firstErr error for _, subj := range subjects { @@ -75,30 +82,44 @@ func (o *Observer) Refresh(ctx context.Context, subjects []domain.SCMSubject) er subj.Provider = defaultProvider } if subj.Provider == "" { + err := &domain.SCMError{Kind: domain.SCMErrorUnsupported, Operation: "observe", Message: "subject provider is required when observer has zero or multiple providers"} + logObserveFailure(ctx, logger, subj, 1, 0, 0, 0, nil, err) if firstErr == nil { - firstErr = &domain.SCMError{Kind: domain.SCMErrorUnsupported, Operation: "observe", Message: "subject provider is required when observer has zero or multiple providers"} + firstErr = err } continue } - byProvider[subj.Provider] = append(byProvider[subj.Provider], subj) + key := observeGroupKey{Provider: subj.Provider, Host: subj.Host, Repo: subj.Repo, ProjectID: subj.ProjectID} + byGroup[key] = append(byGroup[key], subj) } - for providerID, group := range byProvider { - provider := o.Providers[providerID] + for key, group := range byGroup { + provider := o.Providers[key.Provider] + started := time.Now() + startAttrs := scmlog.Add(scmlog.RepositoryAttrs(key.Provider, key.Host, key.Repo, key.ProjectID), + scmlog.CorrelationAttr(ctx), + slog.Int(scmlog.FieldSessionCount, len(group)), + ) + logger.Info(scmlog.EventObserveStarted, scmlog.Args(startAttrs)...) if provider == nil { + err := &domain.SCMError{Kind: domain.SCMErrorUnsupported, Operation: "observe", Message: fmt.Sprintf("provider %q not registered", key.Provider)} + logObserveFailure(ctx, logger, group[0], len(group), 0, 0, scmlog.DurationMS(started), nil, err) if firstErr == nil { - firstErr = &domain.SCMError{Kind: domain.SCMErrorUnsupported, Operation: "observe", Message: fmt.Sprintf("provider %q not registered", providerID)} + firstErr = err } continue } res, err := provider.ObserveSessions(ctx, ports.SCMObserveRequest{Subjects: group, Now: o.Clock()}, o.Store) + changedCount := 0 for _, subj := range res.Subjects { if upsertErr := o.Store.UpsertSubject(ctx, subj); upsertErr != nil { + logObserveFailure(ctx, logger, group[0], len(group), len(res.Snapshots), changedCount, scmlog.DurationMS(started), nil, upsertErr) return upsertErr } } for _, st := range res.PollStates { if pollErr := o.Store.PutPollState(ctx, st); pollErr != nil { + logObserveFailure(ctx, logger, group[0], len(group), len(res.Snapshots), changedCount, scmlog.DurationMS(started), nil, pollErr) return pollErr } } @@ -108,8 +129,17 @@ func (o *Observer) Refresh(ctx context.Context, subjects []domain.SCMSubject) er } saved := map[domain.SessionID]bool{} for _, snap := range res.Snapshots { - if saveErr := o.saveAndApply(ctx, snap); saveErr != nil && firstErr == nil { - firstErr = saveErr + changed, saveErr := o.saveAndApply(ctx, snap) + if saveErr != nil { + logObserveFailure(ctx, logger, snap.Subject, len(group), len(res.Snapshots), changedCount, scmlog.DurationMS(started), res.PollStates, saveErr) + if firstErr == nil { + firstErr = saveErr + } + } else { + if changed { + changedCount++ + } + logSnapshot(ctx, logger, snap, changed) } saved[snap.SessionID] = true } @@ -117,21 +147,58 @@ func (o *Observer) Refresh(ctx context.Context, subjects []domain.SCMSubject) er if saved[subj.SessionID] { continue } - if saveErr := o.saveAndApply(ctx, unavailableSnapshot(subj, o.Clock(), err)); saveErr != nil && firstErr == nil { - firstErr = saveErr + snap := unavailableSnapshot(subj, o.Clock(), err) + changed, saveErr := o.saveAndApply(ctx, snap) + if saveErr != nil { + logObserveFailure(ctx, logger, subj, len(group), len(res.Snapshots), changedCount, scmlog.DurationMS(started), res.PollStates, saveErr) + if firstErr == nil { + firstErr = saveErr + } + } else { + if changed { + changedCount++ + } + logSnapshot(ctx, logger, snap, changed) } } + logObserveFailure(ctx, logger, group[0], len(group), len(res.Snapshots), changedCount, scmlog.DurationMS(started), res.PollStates, err) continue } for _, snap := range res.Snapshots { - if saveErr := o.saveAndApply(ctx, snap); saveErr != nil { + changed, saveErr := o.saveAndApply(ctx, snap) + if saveErr != nil { + logObserveFailure(ctx, logger, snap.Subject, len(group), len(res.Snapshots), changedCount, scmlog.DurationMS(started), res.PollStates, saveErr) return saveErr } + if changed { + changedCount++ + } + logSnapshot(ctx, logger, snap, changed) } + attrs := scmlog.Add(scmlog.RepositoryAttrs(key.Provider, key.Host, key.Repo, key.ProjectID), + scmlog.CorrelationAttr(ctx), + slog.Int(scmlog.FieldSessionCount, len(group)), + slog.Int(scmlog.FieldSnapshotCount, len(res.Snapshots)), + slog.Int(scmlog.FieldChangedCount, changedCount), + slog.Int64(scmlog.FieldDurationMS, scmlog.DurationMS(started)), + ) + if freshness := scmlog.Freshness(res.Snapshots, res.Unavailable); freshness != "" { + attrs = append(attrs, slog.String(scmlog.FieldFreshness, string(freshness))) + } + attrs = append(attrs, scmlog.RateLimitAttrs(res.RateLimit)...) + attrs = append(attrs, scmlog.PollStateAttrs(res.PollStates)...) + logger.Info(scmlog.EventObserveCompleted, scmlog.Args(attrs)...) } return firstErr } +type observeGroupKey struct { + Provider domain.SCMProvider + Host string + Repo string + ProjectID domain.ProjectID +} + func (o *Observer) singleProvider() (domain.SCMProvider, bool) { if len(o.Providers) != 1 { return "", false @@ -142,42 +209,65 @@ func (o *Observer) singleProvider() (domain.SCMProvider, bool) { return "", false } -func (o *Observer) saveAndApply(ctx context.Context, snap domain.SCMSnapshot) error { +func (o *Observer) saveAndApply(ctx context.Context, snap domain.SCMSnapshot) (bool, error) { saved, changed, err := o.Store.SaveSnapshot(ctx, snap) if err != nil { - return err + return false, err } if !changed { - return nil + return false, nil } if o.OnSnapshot != nil { if err := o.OnSnapshot(ctx, saved); err != nil { - return err + return true, err } } if o.LCM != nil { if err := o.LCM.ApplySCMObservation(ctx, saved.SessionID, FactsFromSnapshot(saved)); err != nil { - return err + return true, err } } - return nil + return true, nil } func unavailableSnapshot(subj domain.SCMSubject, now time.Time, err error) domain.SCMSnapshot { - d := domain.SCMDiagnostic{Operation: "observe", ErrorKind: domain.SCMErrorUnavailable, Message: errString(err)} - if se, ok := err.(*domain.SCMError); ok { - d.ErrorKind = se.Kind - d.StatusCode = se.StatusCode - d.Operation = se.Operation - } + d := scmlog.DiagnosticFromError("observe", err) return domain.SCMSnapshot{SessionID: subj.SessionID, Subject: subj, Freshness: domain.SCMFreshnessUnavailable, ObservedAt: now, Diagnostics: []domain.SCMDiagnostic{d}} } -func errString(err error) string { - if err == nil { - return "" +func logObserveFailure(ctx context.Context, logger *slog.Logger, subj domain.SCMSubject, sessionCount, snapshotCount, changedCount int, durationMS int64, pollStates []domain.SCMPollState, err error) { + attrs := scmlog.Add(scmlog.SubjectAttrs(subj), + scmlog.CorrelationAttr(ctx), + slog.Int(scmlog.FieldSessionCount, sessionCount), + slog.Int(scmlog.FieldSnapshotCount, snapshotCount), + slog.Int(scmlog.FieldChangedCount, changedCount), + slog.Int64(scmlog.FieldDurationMS, durationMS), + ) + attrs = append(attrs, scmlog.ErrorAttrs(err)...) + attrs = append(attrs, scmlog.PollStateAttrs(pollStates)...) + if _, ok := scmlog.SCMError(err); ok { + logger.Warn(scmlog.EventObserveFailed, scmlog.Args(attrs)...) + return + } + logger.Error(scmlog.EventObserveFailed, scmlog.Args(attrs)...) +} + +func logSnapshot(ctx context.Context, logger *slog.Logger, snap domain.SCMSnapshot, changed bool) { + attrs := scmlog.Add(scmlog.SnapshotAttrs(snap), scmlog.CorrelationAttr(ctx)) + for _, diag := range snap.Diagnostics { + if diag.ErrorKind != "" { + attrs = append(attrs, slog.String(scmlog.FieldErrorKind, string(diag.ErrorKind))) + break + } + } + switch { + case snap.Freshness == domain.SCMFreshnessUnavailable: + logger.Warn(scmlog.EventSnapshotUnavailable, scmlog.Args(attrs)...) + case changed: + logger.Debug(scmlog.EventSnapshotSaved, scmlog.Args(attrs)...) + default: + logger.Debug(scmlog.EventSnapshotUnchanged, scmlog.Args(attrs)...) } - return err.Error() } // FactsFromSnapshot projects normalized SCM snapshots into the existing LCM DTO. diff --git a/backend/internal/scm/observer/observer_test.go b/backend/internal/scm/observer/observer_test.go index 161d6bc8..752ef7e9 100644 --- a/backend/internal/scm/observer/observer_test.go +++ b/backend/internal/scm/observer/observer_test.go @@ -1,12 +1,16 @@ package observer import ( + "bytes" "context" + "encoding/json" + "log/slog" "testing" "time" "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/ports" + scmlog "github.com/aoagents/agent-orchestrator/backend/internal/scm/logging" "github.com/aoagents/agent-orchestrator/backend/internal/scm/store" ) @@ -112,6 +116,74 @@ func TestObserverProjectsDraftAndFailingCIToLCMFacts(t *testing.T) { } } +func TestObserverLogsProviderNeutralObserveEvents(t *testing.T) { + ctx := scmlog.WithCorrelationID(context.Background(), "corr-observe") + now := time.Date(2026, 5, 28, 12, 0, 0, 0, time.UTC) + subj := domain.SCMSubject{SessionID: "s1", ProjectID: "p1", Provider: domain.SCMProviderGitHub, Host: "github.com", Repo: "o/r", Branch: "feat/45", PRNumber: 45} + snap := domain.SCMSnapshot{SessionID: "s1", Subject: subj, Freshness: domain.SCMFreshnessFresh, ObservedAt: now, PR: &domain.SCMPullRequest{Number: 45, URL: "https://github.com/o/r/pull/45", State: domain.PROpen}} + var logs bytes.Buffer + st := store.NewMemoryStore() + o := New(st, nil, fakeProvider{res: ports.SCMObserveResult{ProviderName: domain.SCMProviderGitHub, Subjects: []domain.SCMSubject{subj}, Snapshots: []domain.SCMSnapshot{snap}}}) + o.Clock = func() time.Time { return now } + o.Logger = jsonLogger(&logs) + if err := o.Refresh(ctx, []domain.SCMSubject{subj}); err != nil { + t.Fatal(err) + } + records := decodeLogRecords(t, logs.String()) + started := findLogRecord(t, records, scmlog.EventObserveStarted) + assertLogField(t, started, scmlog.FieldCorrelationID, "corr-observe") + assertLogField(t, started, scmlog.FieldProvider, "github") + assertLogField(t, started, scmlog.FieldHost, "github.com") + assertLogField(t, started, scmlog.FieldRepo, "o/r") + assertLogField(t, started, scmlog.FieldProjectID, "p1") + assertLogNumber(t, started, scmlog.FieldSessionCount, 1) + if _, ok := started["pr_number"]; ok { + t.Fatal("observer log used provider-specific pr_number field") + } + completed := findLogRecord(t, records, scmlog.EventObserveCompleted) + assertLogField(t, completed, scmlog.FieldCorrelationID, "corr-observe") + assertLogField(t, completed, scmlog.FieldFreshness, "fresh") + assertLogNumber(t, completed, scmlog.FieldSnapshotCount, 1) + assertLogNumber(t, completed, scmlog.FieldChangedCount, 1) + saved := findLogRecord(t, records, scmlog.EventSnapshotSaved) + assertLogNumber(t, saved, scmlog.FieldChangeRequestNumber, 45) +} + +func TestObserverFailureLogsAndPersistsConciseDiagnostic(t *testing.T) { + ctx := scmlog.WithCorrelationID(context.Background(), "corr-fail") + now := time.Date(2026, 5, 28, 12, 0, 0, 0, time.UTC) + subj := domain.SCMSubject{SessionID: "s1", ProjectID: "p1", Provider: domain.SCMProviderGitHub, Host: "github.com", Repo: "o/r", Branch: "feat/45", PRNumber: 45} + err := &domain.SCMError{Kind: domain.SCMErrorRateLimited, Operation: "github.graphql_pr_batch", StatusCode: 403, Message: "rate limit exceeded " + string(bytes.Repeat([]byte("x"), 500))} + var logs bytes.Buffer + st := store.NewMemoryStore() + o := New(st, nil, fakeProvider{res: ports.SCMObserveResult{ProviderName: domain.SCMProviderGitHub}, err: err}) + o.Clock = func() time.Time { return now } + o.Logger = jsonLogger(&logs) + if gotErr := o.Refresh(ctx, []domain.SCMSubject{subj}); gotErr == nil { + t.Fatal("expected observe error") + } + records := decodeLogRecords(t, logs.String()) + failed := findLogRecord(t, records, scmlog.EventObserveFailed) + assertLogField(t, failed, scmlog.FieldCorrelationID, "corr-fail") + assertLogField(t, failed, scmlog.FieldErrorKind, string(domain.SCMErrorRateLimited)) + assertLogNumber(t, failed, scmlog.FieldStatusCode, 403) + findLogRecord(t, records, scmlog.EventSnapshotUnavailable) + latest, ok, gotErr := st.GetLatestSnapshot(ctx, "s1") + if gotErr != nil || !ok { + t.Fatalf("latest ok=%v err=%v", ok, gotErr) + } + if len(latest.Diagnostics) != 1 { + t.Fatalf("diagnostics=%+v", latest.Diagnostics) + } + diag := latest.Diagnostics[0] + if diag.Operation != "github.graphql_pr_batch" || diag.ErrorKind != domain.SCMErrorRateLimited || diag.StatusCode != 403 { + t.Fatalf("bad diagnostic: %+v", diag) + } + if len(diag.Message) > 300 { + t.Fatalf("diagnostic message too large: %d", len(diag.Message)) + } +} + func TestFactsFromUnavailableSnapshotIsNotFetched(t *testing.T) { facts := FactsFromSnapshot(domain.SCMSnapshot{Freshness: domain.SCMFreshnessUnavailable}) if facts.Fetched { @@ -196,3 +268,50 @@ func TestSubjectsFromSessionsUsesProviderMetadataWithoutGitHubDefaults(t *testin t.Fatalf("bad subject: %+v", got) } } + +func jsonLogger(buf *bytes.Buffer) *slog.Logger { + return slog.New(slog.NewJSONHandler(buf, &slog.HandlerOptions{Level: slog.LevelDebug})) +} + +func decodeLogRecords(t *testing.T, raw string) []map[string]any { + t.Helper() + lines := bytes.Split([]byte(raw), []byte("\n")) + records := make([]map[string]any, 0, len(lines)) + for _, line := range lines { + line = bytes.TrimSpace(line) + if len(line) == 0 { + continue + } + var rec map[string]any + if err := json.Unmarshal(line, &rec); err != nil { + t.Fatalf("decode log %s: %v", line, err) + } + records = append(records, rec) + } + return records +} + +func findLogRecord(t *testing.T, records []map[string]any, msg string) map[string]any { + t.Helper() + for _, rec := range records { + if rec["msg"] == msg { + return rec + } + } + t.Fatalf("missing log %q in %+v", msg, records) + return nil +} + +func assertLogField(t *testing.T, rec map[string]any, key, want string) { + t.Helper() + if got, _ := rec[key].(string); got != want { + t.Fatalf("%s=%q want %q in %+v", key, got, want, rec) + } +} + +func assertLogNumber(t *testing.T, rec map[string]any, key string, want float64) { + t.Helper() + if got, _ := rec[key].(float64); got != want { + t.Fatalf("%s=%v want %v in %+v", key, rec[key], want, rec) + } +}