diff --git a/internal/github/headers.go b/internal/github/headers.go new file mode 100644 index 00000000..d83c7a2f --- /dev/null +++ b/internal/github/headers.go @@ -0,0 +1,17 @@ +/* + * Copyright 2025 Canonical Ltd. + * See LICENSE file for licensing details. + */ + +// Package github provides shared constants for GitHub webhook integration. +package github + +const ( + SignatureHeader = "X-Hub-Signature-256" + EventHeader = "X-GitHub-Event" + HookIDHeader = "X-GitHub-Hook-ID" + DeliveryHeader = "X-GitHub-Delivery" + HookInstallationTargetTypeHeader = "X-GitHub-Hook-Installation-Target-Type" + HookInstallationTargetIDHeader = "X-GitHub-Hook-Installation-Target-ID" + SignaturePrefix = "sha256=" +) diff --git a/internal/planner/consumer.go b/internal/planner/consumer.go index 23885078..905f8ae2 100644 --- a/internal/planner/consumer.go +++ b/internal/planner/consumer.go @@ -12,10 +12,12 @@ import ( "time" "github.com/canonical/github-runner-operators/internal/database" + gh "github.com/canonical/github-runner-operators/internal/github" "github.com/canonical/github-runner-operators/internal/queue" "github.com/google/go-github/v82/github" amqp "github.com/rabbitmq/amqp091-go" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/propagation" oteltrace "go.opentelemetry.io/otel/trace" ) @@ -27,9 +29,8 @@ type JobDatabase interface { } const ( - platform = "github" - githubEventHeaderKey = "X-GitHub-Event" - maxBackoff = 5 * time.Minute + platform = "github" + maxBackoff = 5 * time.Minute ) var supportedActions = []string{"queued", "in_progress", "completed"} @@ -94,14 +95,14 @@ func getWorkflowJob(ctx context.Context, headers map[string]interface{}, body [] // Returns (nil, nil) for non-workflow_job events (should be ignored). // Returns error only for malformed webhooks. func parseWorkflowJobEvent(ctx context.Context, headers map[string]interface{}, body []byte) (*github.WorkflowJobEvent, error) { - eventTypeHeader, ok := headers[githubEventHeaderKey] + eventTypeHeader, ok := headers[gh.EventHeader] if !ok { - return nil, fmt.Errorf("missing X-GitHub-Event header") + return nil, fmt.Errorf("missing %s header", gh.EventHeader) } eventType, ok := eventTypeHeader.(string) if !ok { - return nil, fmt.Errorf("X-GitHub-Event must be string") + return nil, fmt.Errorf("%s must be string", gh.EventHeader) } event, err := github.ParseWebHook(eventType, body) @@ -112,7 +113,7 @@ func parseWorkflowJobEvent(ctx context.Context, headers map[string]interface{}, jobEvent, ok := event.(*github.WorkflowJobEvent) if !ok { if eventType == "workflow_job" { - logger.WarnContext(ctx, "received workflow_job in \"X-GitHub-Event\" header but payload did not parse to expected type; possible GitHub API change or library issue") + logger.WarnContext(ctx, "received workflow_job event but payload did not parse to expected type; possible GitHub API change or library issue", "header_name", gh.EventHeader) } else { logger.DebugContext(ctx, "ignoring non-workflow_job event", "event_type", eventType) } @@ -270,9 +271,17 @@ func (c *JobConsumer) pullMessage(ctx context.Context) error { ctx, span := trace.Start(ctx, "consume webhook") defer span.End() + deliveryID, _ := msg.Headers[gh.DeliveryHeader].(string) + eventType, _ := msg.Headers[gh.EventHeader].(string) + span.SetAttributes( + attribute.String("github.delivery_id", deliveryID), + attribute.String("github.event", eventType), + ) + job, err := getWorkflowJob(ctx, msg.Headers, msg.Body) if err != nil { logger.ErrorContext(ctx, "cannot parse webhook payload, discarding to DLQ", "error", err) + logger.DebugContext(ctx, "discarded webhook", "delivery_id", deliveryID) span.RecordError(err) c.metrics.ObserveWebhookError(ctx, platform) c.discardMessage(ctx, &msg) @@ -280,11 +289,25 @@ func (c *JobConsumer) pullMessage(ctx context.Context) error { } if job == nil { + logger.DebugContext(ctx, "ignored webhook", "delivery_id", deliveryID) c.metrics.ObserveDiscardedWebhook(ctx, platform) c.ignoreMessage(ctx, &msg) return nil } + span.SetAttributes( + attribute.String("github.job_id", job.id), + attribute.String("github.repo", job.repo), + attribute.String("github.action", job.action), + ) + logger.DebugContext(ctx, "consuming webhook", + "delivery_id", deliveryID, + "job_id", job.id, + "repo", job.repo, + "action", job.action, + "labels", strings.Join(job.labels, ","), + ) + err = c.handleMessage(ctx, job) if err == nil { c.consumedMessage(ctx, &msg) diff --git a/internal/webhook/server.go b/internal/webhook/server.go index e9970505..c0730014 100644 --- a/internal/webhook/server.go +++ b/internal/webhook/server.go @@ -16,22 +16,15 @@ import ( "net/http" "time" + gh "github.com/canonical/github-runner-operators/internal/github" "github.com/canonical/github-runner-operators/internal/queue" "github.com/canonical/github-runner-operators/internal/server" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/propagation" ) -const ( - WebhookSignatureHeader = "X-Hub-Signature-256" - WebhookEventHeader = "X-GitHub-Event" - WebhookHookIDHeader = "X-GitHub-Hook-ID" - WebhookDeliveryHeader = "X-GitHub-Delivery" - WebhookHookInstallationTargetTypeHeader = "X-GitHub-Hook-Installation-Target-Type" - WebhookHookInstallationTargetIDHeader = "X-GitHub-Hook-Installation-Target-ID" - bodyLimit = 1048576 - WebhookSignaturePrefix = "sha256=" -) +const bodyLimit = 1048576 type httpError struct { code int @@ -57,7 +50,7 @@ type Handler struct { func (h *Handler) receiveWebhook(ctx context.Context, r *http.Request) ([]byte, error) { reader := io.LimitReader(r.Body, bodyLimit+1) - signature := r.Header.Get(WebhookSignatureHeader) + signature := r.Header.Get(gh.SignatureHeader) if signature == "" { logger.DebugContext(ctx, "missing signature header", "header", r.Header) return nil, &httpError{code: http.StatusForbidden, message: "missing signature header"} @@ -75,6 +68,9 @@ func (h *Handler) receiveWebhook(ctx context.Context, r *http.Request) ([]byte, logger.DebugContext(ctx, "invalid signature", "signature", signature) return nil, &httpError{code: http.StatusForbidden, message: "webhook contains invalid signature"} } + deliveryID := r.Header.Get(gh.DeliveryHeader) + eventType := r.Header.Get(gh.EventHeader) + logger.DebugContext(ctx, "received webhook", "delivery_id", deliveryID, "event", eventType) return body, nil } @@ -91,8 +87,12 @@ func (h *Handler) sendWebhook(ctx context.Context, githubHeaders map[string]stri } err := h.Producer.Push(ctx, rabbitHeaders, body) if err != nil { - return fmt.Errorf("failed to send webhook: %v", err) + return fmt.Errorf("failed to send webhook: %w", err) } + logger.DebugContext(ctx, "sent webhook to queue", + "delivery_id", githubHeaders[gh.DeliveryHeader], + "event", githubHeaders[gh.EventHeader], + ) return nil } @@ -109,21 +109,30 @@ func (h *Handler) serveHTTP(ctx context.Context, r *http.Request) error { span.RecordError(err) span.End() return err - } else { - inboundWebhook.Add(ctx, 1) - span.End() } + deliveryID := r.Header.Get(gh.DeliveryHeader) + eventType := r.Header.Get(gh.EventHeader) + span.SetAttributes( + attribute.String("github.delivery_id", deliveryID), + attribute.String("github.event", eventType), + ) + inboundWebhook.Add(ctx, 1) + span.End() // Extract GitHub headers from the request githubHeaders := map[string]string{ - WebhookEventHeader: r.Header.Get(WebhookEventHeader), - WebhookHookIDHeader: r.Header.Get(WebhookHookIDHeader), - WebhookDeliveryHeader: r.Header.Get(WebhookDeliveryHeader), - WebhookHookInstallationTargetTypeHeader: r.Header.Get(WebhookHookInstallationTargetTypeHeader), - WebhookHookInstallationTargetIDHeader: r.Header.Get(WebhookHookInstallationTargetIDHeader), + gh.EventHeader: r.Header.Get(gh.EventHeader), + gh.HookIDHeader: r.Header.Get(gh.HookIDHeader), + gh.DeliveryHeader: r.Header.Get(gh.DeliveryHeader), + gh.HookInstallationTargetTypeHeader: r.Header.Get(gh.HookInstallationTargetTypeHeader), + gh.HookInstallationTargetIDHeader: r.Header.Get(gh.HookInstallationTargetIDHeader), } ctx, span = trace.Start(ctx, "send webhook") + span.SetAttributes( + attribute.String("github.delivery_id", deliveryID), + attribute.String("github.event", eventType), + ) err = h.sendWebhook(ctx, githubHeaders, webhook) if err != nil { outboundWebhookErrors.Add(ctx, 1) @@ -162,13 +171,13 @@ func (h *Handler) Webhook(w http.ResponseWriter, r *http.Request) { } func validateSignature(message []byte, secret string, signature string) bool { - if len(signature) < len(WebhookSignaturePrefix) { + if len(signature) < len(gh.SignaturePrefix) { return false } - if signature[:len(WebhookSignaturePrefix)] != WebhookSignaturePrefix { + if signature[:len(gh.SignaturePrefix)] != gh.SignaturePrefix { return false } - signatureWithoutPrefix := signature[len(WebhookSignaturePrefix):] + signatureWithoutPrefix := signature[len(gh.SignaturePrefix):] h := hmac.New(sha256.New, []byte(secret)) h.Write(message) sig, err := hex.DecodeString(signatureWithoutPrefix) diff --git a/internal/webhook/server_test.go b/internal/webhook/server_test.go index e7f448e4..a39dd568 100644 --- a/internal/webhook/server_test.go +++ b/internal/webhook/server_test.go @@ -14,6 +14,7 @@ import ( "strings" "testing" + gh "github.com/canonical/github-runner-operators/internal/github" "github.com/canonical/github-runner-operators/internal/telemetry" "github.com/stretchr/testify/assert" ) @@ -64,7 +65,7 @@ func TestWebhookForwarded(t *testing.T) { assert.Equal(t, 1, len(fakeProducer.Messages), "expected 1 message in queue") assert.Equal(t, payload, string(fakeProducer.Messages[0]), "expected message body to match") assert.Equal(t, 1, len(fakeProducer.Headers), "expected 1 header set") - assert.Equal(t, "workflow_job", fakeProducer.Headers[0]["X-GitHub-Event"], "expected X-GitHub-Event header to match") + assert.Equal(t, "workflow_job", fakeProducer.Headers[0][gh.EventHeader], "expected X-GitHub-Event header to match") m := mr.Collect(t) assert.Equal(t, 1.0, m.Counter(t, "github-runner.webhook.gateway.inbound")) assert.Equal(t, 0.0, m.Counter(t, "github-runner.webhook.gateway.inbound.errors")) @@ -75,8 +76,8 @@ func TestWebhookForwarded(t *testing.T) { func setupRequest() *http.Request { req := httptest.NewRequest(http.MethodPost, webhookPath, strings.NewReader(payload)) req.Header.Set("Content-Type", "application/json") - req.Header.Set(WebhookSignatureHeader, valid_signature_header) - req.Header.Set(WebhookEventHeader, "workflow_job") + req.Header.Set(gh.SignatureHeader, valid_signature_header) + req.Header.Set(gh.EventHeader, "workflow_job") return req } @@ -127,7 +128,7 @@ func TestWebhookInvalidSignature(t *testing.T) { }, { name: "Wrong Prefix", - signature: "mac256=" + valid_signature_header[len(WebhookSignaturePrefix):], + signature: "mac256=" + valid_signature_header[len(gh.SignaturePrefix):], expectedResponseMessage: "invalid signature", }, { @@ -157,9 +158,9 @@ func TestWebhookInvalidSignature(t *testing.T) { defer telemetry.ReleaseTestMetricReader(t) req := setupRequest() if tt.name == "Missing Signature Header" { - req.Header.Del(WebhookSignatureHeader) + req.Header.Del(gh.SignatureHeader) } else { - req.Header.Set(WebhookSignatureHeader, tt.signature) + req.Header.Set(gh.SignatureHeader, tt.signature) } w := httptest.NewRecorder()