From 251b29cd97926eff9fdb2e591a113c314887f944 Mon Sep 17 00:00:00 2001 From: syntaxsdev Date: Mon, 11 May 2026 22:48:44 -0400 Subject: [PATCH] feat: add Cloudflare Stream webhook registration endpoint and environment tagging --- docs/stream.md | 2 +- internal/config/config.go | 9 +++++++ internal/stream/client.go | 54 +++++++++++++++++++++++++++++++++++++ internal/upload/handlers.go | 46 +++++++++++++++++++++++++++++++ internal/upload/service.go | 3 +++ main.go | 4 +++ 6 files changed, 117 insertions(+), 1 deletion(-) diff --git a/docs/stream.md b/docs/stream.md index 63e5bf3..5dcebda 100644 --- a/docs/stream.md +++ b/docs/stream.md @@ -17,7 +17,7 @@ For short-form creator content (trailers, clips), Stream is the lowest-engineeri ## Flow ``` -client → orchestrator (e.g. barta-api) +client → orchestrator └─ POST /v1/uploads/presign { profile: "trailer", ... } mediaflow → Stream: POST /accounts/{id}/stream/direct_upload { maxDurationSeconds: } diff --git a/internal/config/config.go b/internal/config/config.go index 0d605ea..067d9ef 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -24,6 +24,10 @@ type Config struct { // Cloudflare Stream (only required for profiles with delivery: stream) StreamAccountID string StreamAPIToken string + // Deployment env tag (development|staging|production). Stamped into + // Stream upload meta so the shared CF Stream account's single + // webhook can be routed back to the right destination service. + Environment string } func Load() *Config { @@ -49,6 +53,10 @@ func Load() *Config { // Cloudflare Stream StreamAccountID: getEnv("STREAM_ACCOUNT_ID", ""), StreamAPIToken: getEnv("STREAM_API_TOKEN", ""), + // Deployment env tag — stamped into Stream `meta.env` so the + // shared CF Stream account's single webhook can be routed to the + // right destination by the stream-webhook-router worker. + Environment: getEnv("ENVIRONMENT", "development"), } } @@ -190,3 +198,4 @@ func getEnv(key, defaultValue string) string { } return defaultValue } + diff --git a/internal/stream/client.go b/internal/stream/client.go index 28608ad..b9ee366 100644 --- a/internal/stream/client.go +++ b/internal/stream/client.go @@ -163,6 +163,60 @@ func (c *Client) GetVideo(ctx context.Context, uid string) (*VideoDetails, error }, nil } +// WebhookConfig is the slice of Cloudflare's webhook config response we +// expose. `Secret` is what Cloudflare signs webhook bodies with — the +// destination service needs it to verify deliveries. +type WebhookConfig struct { + NotificationURL string `json:"notification_url"` + Secret string `json:"secret"` + Modified string `json:"modified,omitempty"` +} + +// RegisterWebhook sets (or replaces) the Stream account-level webhook +// destination. PUT /accounts/{id}/stream/webhook is idempotent — running +// it again with the same URL just rotates the secret. +func (c *Client) RegisterWebhook(ctx context.Context, notificationURL string) (*WebhookConfig, error) { + if !c.Configured() { + return nil, fmt.Errorf("stream client not configured (missing STREAM_ACCOUNT_ID or STREAM_API_TOKEN)") + } + + body, err := json.Marshal(map[string]string{"notificationUrl": notificationURL}) + if err != nil { + return nil, fmt.Errorf("marshal webhook payload: %w", err) + } + + url := fmt.Sprintf("%s/accounts/%s/stream/webhook", apiBase, c.accountID) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body)) + if err != nil { + return nil, err + } + httpReq.Header.Set("Authorization", "Bearer "+c.apiToken) + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := c.http.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("stream register webhook: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + raw, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("stream register webhook status %d: %s", resp.StatusCode, string(raw)) + } + + var parsed struct { + Result WebhookConfig `json:"result"` + Success bool `json:"success"` + } + if err := json.NewDecoder(resp.Body).Decode(&parsed); err != nil { + return nil, fmt.Errorf("decode register webhook response: %w", err) + } + if !parsed.Success || parsed.Result.Secret == "" { + return nil, fmt.Errorf("stream register webhook returned empty result") + } + return &parsed.Result, nil +} + func (c *Client) DeleteVideo(ctx context.Context, uid string) error { if !c.Configured() { return fmt.Errorf("stream client not configured") diff --git a/internal/upload/handlers.go b/internal/upload/handlers.go index a6aa5b2..cd6fbe3 100644 --- a/internal/upload/handlers.go +++ b/internal/upload/handlers.go @@ -392,6 +392,52 @@ func (h *Handler) handleProbeStream(w http.ResponseWriter, r *http.Request, prof _ = json.NewEncoder(w).Encode(resp) } +// HandleStreamWebhookRegister handles POST /v1/stream/webhook/register +// +// Body: {"notification_url": "https://api.../v1/webhooks/stream"} +// Response (200): {"notification_url": "...", "secret": "...", "modified": "..."} +// +// One-time setup endpoint. Run once per environment after deploy to point +// Cloudflare Stream at the destination service. The returned `secret` is +// what Cloudflare signs webhook bodies with — the destination service +// needs to store it to verify deliveries. PUT-to-Cloudflare is +// idempotent; calling this again rotates the secret. +func (h *Handler) HandleStreamWebhookRegister(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + h.writeError(w, http.StatusMethodNotAllowed, ErrBadRequest, "Method not allowed", "Use POST") + return + } + + var req struct { + NotificationURL string `json:"notification_url"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, "Invalid JSON body", err.Error()) + return + } + if !strings.HasPrefix(req.NotificationURL, "https://") { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, "notification_url must be https://", req.NotificationURL) + return + } + + sc := h.uploadService.StreamClient() + if !sc.Configured() { + h.writeError(w, http.StatusInternalServerError, ErrBadRequest, "Stream not configured", "") + return + } + + cfg, err := sc.RegisterWebhook(r.Context(), req.NotificationURL) + if err != nil { + fmt.Printf("Stream register webhook error: %v\n", err) + h.writeError(w, http.StatusBadGateway, ErrUpstream, "Stream API error", err.Error()) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(cfg) +} + // parseAssetPath extracts {profile} and {key_base} from /v1/assets/{profile}/{key_base}{suffix}. func parseAssetPath(urlPath, suffix string) (profile, keyBase string, ok bool) { path := strings.TrimPrefix(urlPath, "/v1/assets/") diff --git a/internal/upload/service.go b/internal/upload/service.go index f55795b..19537e5 100644 --- a/internal/upload/service.go +++ b/internal/upload/service.go @@ -266,6 +266,9 @@ func (s *Service) presignStream(ctx context.Context, req *PresignRequest, profil Meta: map[string]string{ "key_base": req.KeyBase, "profile": req.Profile, + // Drives the stream-webhook-router worker — CF echoes this + // back on every webhook delivery for this video. + "env": s.config.Environment, }, }) if err != nil { diff --git a/main.go b/main.go index 0db4de2..430f716 100644 --- a/main.go +++ b/main.go @@ -72,6 +72,10 @@ func main() { // Asset operations (auth required) mux.Handle("/v1/assets/", authMiddleware(http.HandlerFunc(uploadHandler.RouteAssets))) + // One-time admin endpoint — register the Cloudflare Stream webhook + // destination. Auth-protected; run once per env at deploy time. + mux.Handle("/v1/stream/webhook/register", authMiddleware(http.HandlerFunc(uploadHandler.HandleStreamWebhookRegister))) + // Health check mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { response.JSON("OK").Write(w)