Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ For short-form creator content (trailers, clips), Stream is the lowest-engineeri
## Flow

```
client → orchestrator (e.g. barta-api)
client → orchestrator
└─ POST /v1/uploads/presign { profile: "trailer", ... }
mediaflow → Stream: POST /accounts/{id}/stream/direct_upload
{ maxDurationSeconds: <profile.max_duration_seconds> }
Expand Down
9 changes: 9 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ type Config struct {
// Cloudflare Stream (only required for profiles with delivery: stream)
StreamAccountID string
StreamAPIToken string
// Deployment env tag (development|staging|production). Stamped into
// Stream upload meta so the shared CF Stream account's single
// webhook can be routed back to the right destination service.
Environment string
}

func Load() *Config {
Expand All @@ -49,6 +53,10 @@ func Load() *Config {
// Cloudflare Stream
StreamAccountID: getEnv("STREAM_ACCOUNT_ID", ""),
StreamAPIToken: getEnv("STREAM_API_TOKEN", ""),
// Deployment env tag — stamped into Stream `meta.env` so the
// shared CF Stream account's single webhook can be routed to the
// right destination by the stream-webhook-router worker.
Environment: getEnv("ENVIRONMENT", "development"),
}
}

Expand Down Expand Up @@ -190,3 +198,4 @@ func getEnv(key, defaultValue string) string {
}
return defaultValue
}

54 changes: 54 additions & 0 deletions internal/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,60 @@ func (c *Client) GetVideo(ctx context.Context, uid string) (*VideoDetails, error
}, nil
}

// WebhookConfig is the slice of Cloudflare's webhook config response we
// expose. `Secret` is what Cloudflare signs webhook bodies with — the
// destination service needs it to verify deliveries.
type WebhookConfig struct {
NotificationURL string `json:"notification_url"`
Secret string `json:"secret"`
Modified string `json:"modified,omitempty"`
}

// RegisterWebhook sets (or replaces) the Stream account-level webhook
// destination. PUT /accounts/{id}/stream/webhook is idempotent — running
// it again with the same URL just rotates the secret.
func (c *Client) RegisterWebhook(ctx context.Context, notificationURL string) (*WebhookConfig, error) {
if !c.Configured() {
return nil, fmt.Errorf("stream client not configured (missing STREAM_ACCOUNT_ID or STREAM_API_TOKEN)")
}

body, err := json.Marshal(map[string]string{"notificationUrl": notificationURL})
if err != nil {
return nil, fmt.Errorf("marshal webhook payload: %w", err)
}

url := fmt.Sprintf("%s/accounts/%s/stream/webhook", apiBase, c.accountID)
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body))
if err != nil {
return nil, err
}
httpReq.Header.Set("Authorization", "Bearer "+c.apiToken)
httpReq.Header.Set("Content-Type", "application/json")

resp, err := c.http.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("stream register webhook: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode >= 400 {
raw, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("stream register webhook status %d: %s", resp.StatusCode, string(raw))
}

var parsed struct {
Result WebhookConfig `json:"result"`
Success bool `json:"success"`
}
if err := json.NewDecoder(resp.Body).Decode(&parsed); err != nil {
return nil, fmt.Errorf("decode register webhook response: %w", err)
}
if !parsed.Success || parsed.Result.Secret == "" {
return nil, fmt.Errorf("stream register webhook returned empty result")
}
return &parsed.Result, nil
}

func (c *Client) DeleteVideo(ctx context.Context, uid string) error {
if !c.Configured() {
return fmt.Errorf("stream client not configured")
Expand Down
46 changes: 46 additions & 0 deletions internal/upload/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,52 @@ func (h *Handler) handleProbeStream(w http.ResponseWriter, r *http.Request, prof
_ = json.NewEncoder(w).Encode(resp)
}

// HandleStreamWebhookRegister handles POST /v1/stream/webhook/register
//
// Body: {"notification_url": "https://api.../v1/webhooks/stream"}
// Response (200): {"notification_url": "...", "secret": "...", "modified": "..."}
//
// One-time setup endpoint. Run once per environment after deploy to point
// Cloudflare Stream at the destination service. The returned `secret` is
// what Cloudflare signs webhook bodies with — the destination service
// needs to store it to verify deliveries. PUT-to-Cloudflare is
// idempotent; calling this again rotates the secret.
func (h *Handler) HandleStreamWebhookRegister(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
h.writeError(w, http.StatusMethodNotAllowed, ErrBadRequest, "Method not allowed", "Use POST")
return
}

var req struct {
NotificationURL string `json:"notification_url"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
h.writeError(w, http.StatusBadRequest, ErrBadRequest, "Invalid JSON body", err.Error())
return
}
if !strings.HasPrefix(req.NotificationURL, "https://") {
h.writeError(w, http.StatusBadRequest, ErrBadRequest, "notification_url must be https://", req.NotificationURL)
return
}

sc := h.uploadService.StreamClient()
if !sc.Configured() {
h.writeError(w, http.StatusInternalServerError, ErrBadRequest, "Stream not configured", "")
return
}

cfg, err := sc.RegisterWebhook(r.Context(), req.NotificationURL)
if err != nil {
fmt.Printf("Stream register webhook error: %v\n", err)
h.writeError(w, http.StatusBadGateway, ErrUpstream, "Stream API error", err.Error())
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(cfg)
}

// parseAssetPath extracts {profile} and {key_base} from /v1/assets/{profile}/{key_base}{suffix}.
func parseAssetPath(urlPath, suffix string) (profile, keyBase string, ok bool) {
path := strings.TrimPrefix(urlPath, "/v1/assets/")
Expand Down
3 changes: 3 additions & 0 deletions internal/upload/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ func (s *Service) presignStream(ctx context.Context, req *PresignRequest, profil
Meta: map[string]string{
"key_base": req.KeyBase,
"profile": req.Profile,
// Drives the stream-webhook-router worker — CF echoes this
// back on every webhook delivery for this video.
"env": s.config.Environment,
},
})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func main() {
// Asset operations (auth required)
mux.Handle("/v1/assets/", authMiddleware(http.HandlerFunc(uploadHandler.RouteAssets)))

// One-time admin endpoint — register the Cloudflare Stream webhook
// destination. Auth-protected; run once per env at deploy time.
mux.Handle("/v1/stream/webhook/register", authMiddleware(http.HandlerFunc(uploadHandler.HandleStreamWebhookRegister)))

// Health check
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
response.JSON("OK").Write(w)
Expand Down
Loading