From d13ae9c7557461b4486b26131a45811b7606803f Mon Sep 17 00:00:00 2001 From: Michael Fox Date: Tue, 2 Sep 2025 13:56:35 -0600 Subject: [PATCH 1/3] made json logging default + added colorized logging config --- README.md | 135 ++++++++++++++++++++++++++++-------------------------- main.go | 22 +++++---- 2 files changed, 83 insertions(+), 74 deletions(-) diff --git a/README.md b/README.md index 5b7e3f3..d689212 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,8 @@ [![CI](https://github.com/amplify-security/carrier/actions/workflows/ci.yml/badge.svg?branch=main)](https://github.com/amplify-security/carrier/actions/workflows/ci.yml) [![Docker Pulls](https://img.shields.io/docker/pulls/amplifysecurity/carrier)](https://hub.docker.com/r/amplifysecurity/carrier) -A lightweight messaging adapter for webhooks written in Go. Currently, the -[`amplifysecurity/carrier`](https://hub.docker.com/r/amplifysecurity/carrier) +A lightweight messaging adapter for webhooks written in Go. Currently, the +[`amplifysecurity/carrier`](https://hub.docker.com/r/amplifysecurity/carrier) image size is under 8MB and at idle consumes under 5MB of RAM. Carrier can act as a messaging daemon in the SQS event worker pattern and is designed to be deployed as a sidecar container in Kubernetes pods alongside event workers. @@ -21,22 +21,22 @@ at `/webhook` and runs on port `9000`: version: "3.3" services: - sqs: - image: roribio/alpine-sqs - carrier: - image: amplifysecurity/carrier - restart: unless-stopped - volumes: - - ${HOME}/.aws/credentials:/.aws/credentials - links: - - sqs:sqs - - worker:worker - environment: - CARRIER_WEBHOOK_ENDPOINT: http://worker:9000/webhook - CARRIER_SQS_ENDPOINT: http://sqs:9324 - CARRIER_SQS_QUEUE_NAME: default - worker: - build: . + sqs: + image: roribio/alpine-sqs + carrier: + image: amplifysecurity/carrier + restart: unless-stopped + volumes: + - ${HOME}/.aws/credentials:/.aws/credentials + links: + - sqs:sqs + - worker:worker + environment: + CARRIER_WEBHOOK_ENDPOINT: http://worker:9000/webhook + CARRIER_SQS_ENDPOINT: http://sqs:9324 + CARRIER_SQS_QUEUE_NAME: default + worker: + build: . ``` > **Note**: This example still requires AWS credentials to be mounted even though they are not used or the AWS SDK will panic. @@ -53,35 +53,35 @@ queue `carrier-demo` in `us-west-2` for a worker that expects webhooks at `/webh apiVersion: apps/v1 kind: Deployment metadata: - name: carrier-demo - namespace: demo + name: carrier-demo + namespace: demo spec: - replicas: 1 - selector: - matchLabels: - app: carrier-demo - template: - metadata: - labels: - app: carrier-demo - spec: - serviceAccountName: carrier-demo - containers: - - name: carrier - image: amplifysecurity/carrier - securityContext: - runAsUser: 1000 - allowPrivilegeEscalation: false - runAsNonRoot: true - env: - - name: CARRIER_WEBHOOK_ENDPOINT - value: http://localhost:9000/webhook - - name: CARRIER_SQS_ENDPOINT - value: https://sqs.us-west-2.amazonaws.com - - name: CARRIER_SQS_QUEUE_NAME - value: carrier-demo - - name: worker - image: ${registry}/${container}:${tag} + replicas: 1 + selector: + matchLabels: + app: carrier-demo + template: + metadata: + labels: + app: carrier-demo + spec: + serviceAccountName: carrier-demo + containers: + - name: carrier + image: amplifysecurity/carrier + securityContext: + runAsUser: 1000 + allowPrivilegeEscalation: false + runAsNonRoot: true + env: + - name: CARRIER_WEBHOOK_ENDPOINT + value: http://localhost:9000/webhook + - name: CARRIER_SQS_ENDPOINT + value: https://sqs.us-west-2.amazonaws.com + - name: CARRIER_SQS_QUEUE_NAME + value: carrier-demo + - name: worker + image: ${registry}/${container}:${tag} ``` > **Note**: This example assumes that the Kubernetes service account `carrier-demo` is mapped to an IAM role that has the appropriate permissions to access the `carrier-demo` SQS queue. @@ -101,9 +101,9 @@ Carrier supports the concept of setting SQS visibility timouts dynamically from This can be useful for calculating distributed backoffs on specific messages. Carrier sends the following headers with each webhook: -| Header | Description | -| ------ | ----------- | -| `X-Carrier-Receive-Count` | The SQS message receive count. This field indicates how many times the particular message has been received. | +| Header | Description | +| ------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------- | +| `X-Carrier-Receive-Count` | The SQS message receive count. This field indicates how many times the particular message has been received. | | `X-Carrier-First-Receive-Time` | The SQS message first receive time (in seconds since epoch). This field is the timestamp of the first time the particular message was received. | Using these headers, any usual backoff scheme (like exponential) can be implemented on a distributed basis. @@ -123,29 +123,32 @@ attribute will be sent to the webhook in the `Content-Type` header. For a config ## Configuration -Carrier currently has limited configuration options. More configuration options will be added as +Carrier currently has limited configuration options. More configuration options will be added as the project continues to mature. Currently, all configuration is done via environment variables. -| Environment Variable | Required | Default | Description | -| -------------------- | -------- | ------- | ----------- | -| `CARRIER_WEBHOOK_ENDPOINT` | | `http://localhost:9000` | The full path, including protocol, that webhooks will be sent to. For example, if your worker expects webhooks at `/v1/events`, `http://worker:8080/v1/events`. | -| `CARRIER_WEBHOOK_TLS_INSECURE_SKIP_VERIFY` | | `false` | When set to true, the webhook transmitter will not attempt to validate TLS for an `https` webhook endpoint. | -| `CARRIER_WEBHOOK_DEFAULT_CONTENT_TYPE` | | `application/json` | The default value that will be sent in the `Content-Type` header in all HTTP POSTS to the webhook endpoint. | -| `CARRIER_WEBHOOK_REQUEST_TIMEOUT` | | `60s` | The webhook transmitter request timeout. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. | -| `CARRIER_WEBHOOK_HEALTH_CHECK_ENDPOINT` | | | When set, enables health check functionality using the provided endpoint. | -| `CARRIER_WEBHOOK_OFFLINE_THRESHOLD_COUNT` | | `5` | The number of failed health checks before the webhook is determined to be offline. | -| `CARRIER_WEBHOOK_HEALTH_CHECK_INTERVAL` | | `60s` | The time interval between webhook health checks. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. | -| `CARRIER_WEBHOOK_HEALTH_CHECK_TIMEOUT` | | `10s` | The webhook health check timeout. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. | -| `CARRIER_SQS_ENDPOINT` | :white_check_mark: | | The endpoint for the SQS service. Official AWS service endpoints can be found [here](https://docs.aws.amazon.com/general/latest/gr/sqs-service.html). | -| `CARRIER_SQS_QUEUE_NAME` | :white_check_mark: | | The SQS queue name. | -| `CARRIER_SQS_BATCH_SIZE` | | `1` | The batch size each SQS receiver will request from SQS. All webhooks are transmitted one message per HTTP request. | -| `CARRIER_SQS_RECEIVERS` | | `1` | The number of concurrent SQS receivers requesting messages from SQS. | -| `CARRIER_SQS_RECEIVER_WORKERS` | | `1` | The number of concurrent workers transmitting messages as webhooks for each receiver. A common pattern is to set the batch size and receiver workers to the same value, which will cause all messages in a batch to be transmitted in parallel HTTP requests. | +| Environment Variable | Required | Default | Description | +| ------------------------------------------ | ------------------ | ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `CARRIER_ENABLE_COLORIZED_LOGGING` | | `false` | When set to `true`, enables colorized log messages. This is useful when running carrier in a terminal or local Docker environment. | +| `CARRIER_ENABLE_STAT_LOG` | | `false` | When set to `true`, enables periodic statistics log messages. | +| `CARRIER_SQS_ENDPOINT` | :white_check_mark: | | The endpoint for the SQS service. Official AWS service endpoints can be found [here](https://docs.aws.amazon.com/general/latest/gr/sqs-service.html). | +| `CARRIER_SQS_BATCH_SIZE` | | `1` | The batch size each SQS receiver will request from SQS. All webhooks are transmitted one message per HTTP request. | +| `CARRIER_SQS_QUEUE_NAME` | :white_check_mark: | | The SQS queue name. | +| `CARRIER_SQS_RECEIVERS` | | `1` | The number of concurrent SQS receivers requesting messages from SQS. | +| `CARRIER_SQS_RECEIVER_WORKERS` | | `1` | The number of concurrent workers transmitting messages as webhooks for each receiver. A common pattern is to set the batch size and receiver workers to the same value, which will cause all messages in a batch to be transmitted in parallel HTTP requests. | +| `CARRIER_STAT_LOG_TIMER` | | `120s` | The interval between statistics log messages. | +| `CARRIER_WEBHOOK_ENDPOINT` | | `http://localhost:9000` | The full path, including protocol, that webhooks will be sent to. For example, if your worker expects webhooks at `/v1/events`, `http://worker:8080/v1/events`. | +| `CARRIER_WEBHOOK_TLS_INSECURE_SKIP_VERIFY` | | `false` | When set to true, the webhook transmitter will not attempt to validate TLS for an `https` webhook endpoint. | +| `CARRIER_WEBHOOK_DEFAULT_CONTENT_TYPE` | | `application/json` | The default value that will be sent in the `Content-Type` header in all HTTP POSTS to the webhook endpoint. | +| `CARRIER_WEBHOOK_REQUEST_TIMEOUT` | | `60s` | The webhook transmitter request timeout. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. | +| `CARRIER_WEBHOOK_HEALTH_CHECK_ENDPOINT` | | | When set, enables health check functionality using the provided endpoint. | +| `CARRIER_WEBHOOK_OFFLINE_THRESHOLD_COUNT` | | `5` | The number of failed health checks before the webhook is determined to be offline. | +| `CARRIER_WEBHOOK_HEALTH_CHECK_INTERVAL` | | `60s` | The time interval between webhook health checks. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. | +| `CARRIER_WEBHOOK_HEALTH_CHECK_TIMEOUT` | | `10s` | The webhook health check timeout. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. | ## Architecture Carrier was built with the idea of separating receivers and transmitters. Receivers receive (or read) -messages and transmitters transmit (or send) messages. Currently, the only implemented receiver is +messages and transmitters transmit (or send) messages. Currently, the only implemented receiver is for SQS and the only implemented transmitter is for an HTTP POST (or webhook). In the future this architecture may be used to support multiple messaging queues and transmission methods. diff --git a/main.go b/main.go index 5853924..eaeef22 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,14 @@ import ( type ( Config struct { + EnableColorizedLogging bool `default:"false" split_words:"true"` + EnableStatLog bool `default:"false" split_words:"true"` + SQSBatchSize int `envconfig:"SQS_BATCH_SIZE" default:"1"` + SQSEndpoint string `envconfig:"SQS_ENDPOINT" required:"true"` + SQSQueueName string `envconfig:"SQS_QUEUE_NAME" required:"true"` + SQSReceivers int `envconfig:"SQS_RECEIVERS" default:"1"` + SQSReceiverWorkers int `envconfig:"SQS_RECEIVER_WORKERS" default:"1"` + StatLogTimer time.Duration `default:"120s" split_words:"true"` WebhookEndpoint string `default:"http://localhost:9000" split_words:"true"` WebhookTLSInsecureSkipVerify bool `envconfig:"WEBHOOK_TLS_INSECURE_SKIP_VERIFY" default:"false"` WebhookDefaultContentType string `default:"application/json" split_words:"true"` @@ -29,13 +37,6 @@ type ( WebhookOfflineThresholdCount int `default:"5" split_words:"true"` WebhookHealthCheckInterval time.Duration `default:"60s" split_words:"true"` WebhookHealthCheckTimeout time.Duration `default:"10s" split_words:"true"` - SQSEndpoint string `envconfig:"SQS_ENDPOINT" required:"true"` - SQSQueueName string `envconfig:"SQS_QUEUE_NAME" required:"true"` - SQSBatchSize int `envconfig:"SQS_BATCH_SIZE" default:"1"` - SQSReceivers int `envconfig:"SQS_RECEIVERS" default:"1"` - SQSReceiverWorkers int `envconfig:"SQS_RECEIVER_WORKERS" default:"1"` - EnableStatLog bool `default:"false" split_words:"true"` - StatLogTimer time.Duration `default:"120s" split_words:"true"` } // StatLogger is a utility for logging runtime statistics. @@ -79,10 +80,15 @@ func (l *StatLogger) Run() { // main entry point func main() { var envCfg Config + var logHandler slog.Handler if err := envconfig.Process("carrier", &envCfg); err != nil { panic(err) } - logHandler := tint.NewHandler(os.Stdout, &tint.Options{Level: slog.LevelInfo}) + if envCfg.EnableColorizedLogging { + logHandler = tint.NewHandler(os.Stdout, &tint.Options{Level: slog.LevelInfo}) + } else { + logHandler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}) + } log := slog.New(logHandler).With("source", "main") ctx, cancel := context.WithCancel(context.Background()) awsCfg, err := config.LoadDefaultConfig(ctx) From 1dff428adc5e35161eafb92bbbfd300986c9aec4 Mon Sep 17 00:00:00 2001 From: Michael Fox Date: Tue, 2 Sep 2025 14:03:23 -0600 Subject: [PATCH 2/3] fixed YAML tab spacing in code examples --- README.md | 88 +++++++++++++++++++++++++++---------------------------- 1 file changed, 43 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index d689212..0092f47 100644 --- a/README.md +++ b/README.md @@ -18,25 +18,23 @@ Carrier can be used locally in a Docker Compose stack. For example, with a worke at `/webhook` and runs on port `9000`: ```yml -version: "3.3" - services: - sqs: - image: roribio/alpine-sqs - carrier: - image: amplifysecurity/carrier - restart: unless-stopped - volumes: - - ${HOME}/.aws/credentials:/.aws/credentials - links: - - sqs:sqs - - worker:worker - environment: - CARRIER_WEBHOOK_ENDPOINT: http://worker:9000/webhook - CARRIER_SQS_ENDPOINT: http://sqs:9324 - CARRIER_SQS_QUEUE_NAME: default + sqs: + image: roribio/alpine-sqs + carrier: + image: amplifysecurity/carrier + restart: unless-stopped + volumes: + - ${HOME}/.aws/credentials:/.aws/credentials + links: + - sqs:sqs + - worker:worker + environment: + CARRIER_WEBHOOK_ENDPOINT: http://worker:9000/webhook + CARRIER_SQS_ENDPOINT: http://sqs:9324 + CARRIER_SQS_QUEUE_NAME: default worker: - build: . + build: . ``` > **Note**: This example still requires AWS credentials to be mounted even though they are not used or the AWS SDK will panic. @@ -53,35 +51,35 @@ queue `carrier-demo` in `us-west-2` for a worker that expects webhooks at `/webh apiVersion: apps/v1 kind: Deployment metadata: - name: carrier-demo - namespace: demo + name: carrier-demo + namespace: demo spec: - replicas: 1 - selector: - matchLabels: - app: carrier-demo - template: - metadata: - labels: - app: carrier-demo - spec: - serviceAccountName: carrier-demo - containers: - - name: carrier - image: amplifysecurity/carrier - securityContext: - runAsUser: 1000 - allowPrivilegeEscalation: false - runAsNonRoot: true - env: - - name: CARRIER_WEBHOOK_ENDPOINT - value: http://localhost:9000/webhook - - name: CARRIER_SQS_ENDPOINT - value: https://sqs.us-west-2.amazonaws.com - - name: CARRIER_SQS_QUEUE_NAME - value: carrier-demo - - name: worker - image: ${registry}/${container}:${tag} + replicas: 1 + selector: + matchLabels: + app: carrier-demo + template: + metadata: + labels: + app: carrier-demo + spec: + serviceAccountName: carrier-demo + containers: + - name: carrier + image: amplifysecurity/carrier + securityContext: + runAsUser: 1000 + allowPrivilegeEscalation: false + runAsNonRoot: true + env: + - name: CARRIER_WEBHOOK_ENDPOINT + value: http://localhost:9000/webhook + - name: CARRIER_SQS_ENDPOINT + value: https://sqs.us-west-2.amazonaws.com + - name: CARRIER_SQS_QUEUE_NAME + value: carrier-demo + - name: worker + image: ${registry}/${container}:${tag} ``` > **Note**: This example assumes that the Kubernetes service account `carrier-demo` is mapped to an IAM role that has the appropriate permissions to access the `carrier-demo` SQS queue. From b09cb9951e32358a62c1c6c59c164f45174593e1 Mon Sep 17 00:00:00 2001 From: Michael Fox Date: Tue, 2 Sep 2025 14:30:56 -0600 Subject: [PATCH 3/3] fixing YAML snippet indents in README --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 0092f47..62a7eb1 100644 --- a/README.md +++ b/README.md @@ -33,8 +33,8 @@ services: CARRIER_WEBHOOK_ENDPOINT: http://worker:9000/webhook CARRIER_SQS_ENDPOINT: http://sqs:9324 CARRIER_SQS_QUEUE_NAME: default - worker: - build: . + worker: + build: . ``` > **Note**: This example still requires AWS credentials to be mounted even though they are not used or the AWS SDK will panic. @@ -78,8 +78,8 @@ spec: value: https://sqs.us-west-2.amazonaws.com - name: CARRIER_SQS_QUEUE_NAME value: carrier-demo - - name: worker - image: ${registry}/${container}:${tag} + - name: worker + image: ${registry}/${container}:${tag} ``` > **Note**: This example assumes that the Kubernetes service account `carrier-demo` is mapped to an IAM role that has the appropriate permissions to access the `carrier-demo` SQS queue.