Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
[![CI](https://github.com/amplify-security/carrier/actions/workflows/ci.yml/badge.svg?branch=main)](https://github.com/amplify-security/carrier/actions/workflows/ci.yml)
[![Docker Pulls](https://img.shields.io/docker/pulls/amplifysecurity/carrier)](https://hub.docker.com/r/amplifysecurity/carrier)

A lightweight messaging adapter for webhooks written in Go. Currently, the
[`amplifysecurity/carrier`](https://hub.docker.com/r/amplifysecurity/carrier)
A lightweight messaging adapter for webhooks written in Go. Currently, the
[`amplifysecurity/carrier`](https://hub.docker.com/r/amplifysecurity/carrier)
image size is under 8MB and at idle consumes under 5MB of RAM. Carrier can act as a messaging daemon in the
SQS event worker pattern and is designed to be deployed as a sidecar container in Kubernetes pods alongside
event workers.
Expand All @@ -18,8 +18,6 @@ Carrier can be used locally in a Docker Compose stack. For example, with a worke
at `/webhook` and runs on port `9000`:

```yml
version: "3.3"

services:
sqs:
image: roribio/alpine-sqs
Expand Down Expand Up @@ -101,9 +99,9 @@ Carrier supports the concept of setting SQS visibility timouts dynamically from
This can be useful for calculating distributed backoffs on specific messages. Carrier sends the following
headers with each webhook:

| Header | Description |
| ------ | ----------- |
| `X-Carrier-Receive-Count` | The SQS message receive count. This field indicates how many times the particular message has been received. |
| Header | Description |
| ------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------- |
| `X-Carrier-Receive-Count` | The SQS message receive count. This field indicates how many times the particular message has been received. |
| `X-Carrier-First-Receive-Time` | The SQS message first receive time (in seconds since epoch). This field is the timestamp of the first time the particular message was received. |

Using these headers, any usual backoff scheme (like exponential) can be implemented on a distributed basis.
Expand All @@ -123,29 +121,32 @@ attribute will be sent to the webhook in the `Content-Type` header. For a config

## Configuration

Carrier currently has limited configuration options. More configuration options will be added as
Carrier currently has limited configuration options. More configuration options will be added as
the project continues to mature. Currently, all configuration is done via environment variables.

| Environment Variable | Required | Default | Description |
| -------------------- | -------- | ------- | ----------- |
| `CARRIER_WEBHOOK_ENDPOINT` | | `http://localhost:9000` | The full path, including protocol, that webhooks will be sent to. For example, if your worker expects webhooks at `/v1/events`, `http://worker:8080/v1/events`. |
| `CARRIER_WEBHOOK_TLS_INSECURE_SKIP_VERIFY` | | `false` | When set to true, the webhook transmitter will not attempt to validate TLS for an `https` webhook endpoint. |
| `CARRIER_WEBHOOK_DEFAULT_CONTENT_TYPE` | | `application/json` | The default value that will be sent in the `Content-Type` header in all HTTP POSTS to the webhook endpoint. |
| `CARRIER_WEBHOOK_REQUEST_TIMEOUT` | | `60s` | The webhook transmitter request timeout. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. |
| `CARRIER_WEBHOOK_HEALTH_CHECK_ENDPOINT` | | | When set, enables health check functionality using the provided endpoint. |
| `CARRIER_WEBHOOK_OFFLINE_THRESHOLD_COUNT` | | `5` | The number of failed health checks before the webhook is determined to be offline. |
| `CARRIER_WEBHOOK_HEALTH_CHECK_INTERVAL` | | `60s` | The time interval between webhook health checks. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. |
| `CARRIER_WEBHOOK_HEALTH_CHECK_TIMEOUT` | | `10s` | The webhook health check timeout. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. |
| `CARRIER_SQS_ENDPOINT` | :white_check_mark: | | The endpoint for the SQS service. Official AWS service endpoints can be found [here](https://docs.aws.amazon.com/general/latest/gr/sqs-service.html). |
| `CARRIER_SQS_QUEUE_NAME` | :white_check_mark: | | The SQS queue name. |
| `CARRIER_SQS_BATCH_SIZE` | | `1` | The batch size each SQS receiver will request from SQS. All webhooks are transmitted one message per HTTP request. |
| `CARRIER_SQS_RECEIVERS` | | `1` | The number of concurrent SQS receivers requesting messages from SQS. |
| `CARRIER_SQS_RECEIVER_WORKERS` | | `1` | The number of concurrent workers transmitting messages as webhooks for each receiver. A common pattern is to set the batch size and receiver workers to the same value, which will cause all messages in a batch to be transmitted in parallel HTTP requests. |
| Environment Variable | Required | Default | Description |
| ------------------------------------------ | ------------------ | ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `CARRIER_ENABLE_COLORIZED_LOGGING` | | `false` | When set to `true`, enables colorized log messages. This is useful when running carrier in a terminal or local Docker environment. |
| `CARRIER_ENABLE_STAT_LOG` | | `false` | When set to `true`, enables periodic statistics log messages. |
| `CARRIER_SQS_ENDPOINT` | :white_check_mark: | | The endpoint for the SQS service. Official AWS service endpoints can be found [here](https://docs.aws.amazon.com/general/latest/gr/sqs-service.html). |
| `CARRIER_SQS_BATCH_SIZE` | | `1` | The batch size each SQS receiver will request from SQS. All webhooks are transmitted one message per HTTP request. |
| `CARRIER_SQS_QUEUE_NAME` | :white_check_mark: | | The SQS queue name. |
| `CARRIER_SQS_RECEIVERS` | | `1` | The number of concurrent SQS receivers requesting messages from SQS. |
| `CARRIER_SQS_RECEIVER_WORKERS` | | `1` | The number of concurrent workers transmitting messages as webhooks for each receiver. A common pattern is to set the batch size and receiver workers to the same value, which will cause all messages in a batch to be transmitted in parallel HTTP requests. |
| `CARRIER_STAT_LOG_TIMER` | | `120s` | The interval between statistics log messages. |
| `CARRIER_WEBHOOK_ENDPOINT` | | `http://localhost:9000` | The full path, including protocol, that webhooks will be sent to. For example, if your worker expects webhooks at `/v1/events`, `http://worker:8080/v1/events`. |
| `CARRIER_WEBHOOK_TLS_INSECURE_SKIP_VERIFY` | | `false` | When set to true, the webhook transmitter will not attempt to validate TLS for an `https` webhook endpoint. |
| `CARRIER_WEBHOOK_DEFAULT_CONTENT_TYPE` | | `application/json` | The default value that will be sent in the `Content-Type` header in all HTTP POSTS to the webhook endpoint. |
| `CARRIER_WEBHOOK_REQUEST_TIMEOUT` | | `60s` | The webhook transmitter request timeout. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. |
| `CARRIER_WEBHOOK_HEALTH_CHECK_ENDPOINT` | | | When set, enables health check functionality using the provided endpoint. |
| `CARRIER_WEBHOOK_OFFLINE_THRESHOLD_COUNT` | | `5` | The number of failed health checks before the webhook is determined to be offline. |
| `CARRIER_WEBHOOK_HEALTH_CHECK_INTERVAL` | | `60s` | The time interval between webhook health checks. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. |
| `CARRIER_WEBHOOK_HEALTH_CHECK_TIMEOUT` | | `10s` | The webhook health check timeout. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. |

## Architecture

Carrier was built with the idea of separating receivers and transmitters. Receivers receive (or read)
messages and transmitters transmit (or send) messages. Currently, the only implemented receiver is
messages and transmitters transmit (or send) messages. Currently, the only implemented receiver is
for SQS and the only implemented transmitter is for an HTTP POST (or webhook). In the future this
architecture may be used to support multiple messaging queues and transmission methods.

Expand Down
22 changes: 14 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ import (

type (
Config struct {
EnableColorizedLogging bool `default:"false" split_words:"true"`
EnableStatLog bool `default:"false" split_words:"true"`
SQSBatchSize int `envconfig:"SQS_BATCH_SIZE" default:"1"`
SQSEndpoint string `envconfig:"SQS_ENDPOINT" required:"true"`
SQSQueueName string `envconfig:"SQS_QUEUE_NAME" required:"true"`
SQSReceivers int `envconfig:"SQS_RECEIVERS" default:"1"`
SQSReceiverWorkers int `envconfig:"SQS_RECEIVER_WORKERS" default:"1"`
StatLogTimer time.Duration `default:"120s" split_words:"true"`
WebhookEndpoint string `default:"http://localhost:9000" split_words:"true"`
WebhookTLSInsecureSkipVerify bool `envconfig:"WEBHOOK_TLS_INSECURE_SKIP_VERIFY" default:"false"`
WebhookDefaultContentType string `default:"application/json" split_words:"true"`
Expand All @@ -29,13 +37,6 @@ type (
WebhookOfflineThresholdCount int `default:"5" split_words:"true"`
WebhookHealthCheckInterval time.Duration `default:"60s" split_words:"true"`
WebhookHealthCheckTimeout time.Duration `default:"10s" split_words:"true"`
SQSEndpoint string `envconfig:"SQS_ENDPOINT" required:"true"`
SQSQueueName string `envconfig:"SQS_QUEUE_NAME" required:"true"`
SQSBatchSize int `envconfig:"SQS_BATCH_SIZE" default:"1"`
SQSReceivers int `envconfig:"SQS_RECEIVERS" default:"1"`
SQSReceiverWorkers int `envconfig:"SQS_RECEIVER_WORKERS" default:"1"`
EnableStatLog bool `default:"false" split_words:"true"`
StatLogTimer time.Duration `default:"120s" split_words:"true"`
}

// StatLogger is a utility for logging runtime statistics.
Expand Down Expand Up @@ -79,10 +80,15 @@ func (l *StatLogger) Run() {
// main entry point
func main() {
var envCfg Config
var logHandler slog.Handler
if err := envconfig.Process("carrier", &envCfg); err != nil {
panic(err)
}
logHandler := tint.NewHandler(os.Stdout, &tint.Options{Level: slog.LevelInfo})
if envCfg.EnableColorizedLogging {
logHandler = tint.NewHandler(os.Stdout, &tint.Options{Level: slog.LevelInfo})
} else {
logHandler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})
}
log := slog.New(logHandler).With("source", "main")
ctx, cancel := context.WithCancel(context.Background())
awsCfg, err := config.LoadDefaultConfig(ctx)
Expand Down