diff --git a/README.md b/README.md index 5b7e3f3..62a7eb1 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,8 @@ [![CI](https://github.com/amplify-security/carrier/actions/workflows/ci.yml/badge.svg?branch=main)](https://github.com/amplify-security/carrier/actions/workflows/ci.yml) [![Docker Pulls](https://img.shields.io/docker/pulls/amplifysecurity/carrier)](https://hub.docker.com/r/amplifysecurity/carrier) -A lightweight messaging adapter for webhooks written in Go. Currently, the -[`amplifysecurity/carrier`](https://hub.docker.com/r/amplifysecurity/carrier) +A lightweight messaging adapter for webhooks written in Go. Currently, the +[`amplifysecurity/carrier`](https://hub.docker.com/r/amplifysecurity/carrier) image size is under 8MB and at idle consumes under 5MB of RAM. Carrier can act as a messaging daemon in the SQS event worker pattern and is designed to be deployed as a sidecar container in Kubernetes pods alongside event workers. @@ -18,8 +18,6 @@ Carrier can be used locally in a Docker Compose stack. For example, with a worke at `/webhook` and runs on port `9000`: ```yml -version: "3.3" - services: sqs: image: roribio/alpine-sqs @@ -101,9 +99,9 @@ Carrier supports the concept of setting SQS visibility timouts dynamically from This can be useful for calculating distributed backoffs on specific messages. Carrier sends the following headers with each webhook: -| Header | Description | -| ------ | ----------- | -| `X-Carrier-Receive-Count` | The SQS message receive count. This field indicates how many times the particular message has been received. | +| Header | Description | +| ------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------- | +| `X-Carrier-Receive-Count` | The SQS message receive count. This field indicates how many times the particular message has been received. | | `X-Carrier-First-Receive-Time` | The SQS message first receive time (in seconds since epoch). This field is the timestamp of the first time the particular message was received. | Using these headers, any usual backoff scheme (like exponential) can be implemented on a distributed basis. @@ -123,29 +121,32 @@ attribute will be sent to the webhook in the `Content-Type` header. For a config ## Configuration -Carrier currently has limited configuration options. More configuration options will be added as +Carrier currently has limited configuration options. More configuration options will be added as the project continues to mature. Currently, all configuration is done via environment variables. -| Environment Variable | Required | Default | Description | -| -------------------- | -------- | ------- | ----------- | -| `CARRIER_WEBHOOK_ENDPOINT` | | `http://localhost:9000` | The full path, including protocol, that webhooks will be sent to. For example, if your worker expects webhooks at `/v1/events`, `http://worker:8080/v1/events`. | -| `CARRIER_WEBHOOK_TLS_INSECURE_SKIP_VERIFY` | | `false` | When set to true, the webhook transmitter will not attempt to validate TLS for an `https` webhook endpoint. | -| `CARRIER_WEBHOOK_DEFAULT_CONTENT_TYPE` | | `application/json` | The default value that will be sent in the `Content-Type` header in all HTTP POSTS to the webhook endpoint. | -| `CARRIER_WEBHOOK_REQUEST_TIMEOUT` | | `60s` | The webhook transmitter request timeout. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. | -| `CARRIER_WEBHOOK_HEALTH_CHECK_ENDPOINT` | | | When set, enables health check functionality using the provided endpoint. | -| `CARRIER_WEBHOOK_OFFLINE_THRESHOLD_COUNT` | | `5` | The number of failed health checks before the webhook is determined to be offline. | -| `CARRIER_WEBHOOK_HEALTH_CHECK_INTERVAL` | | `60s` | The time interval between webhook health checks. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. | -| `CARRIER_WEBHOOK_HEALTH_CHECK_TIMEOUT` | | `10s` | The webhook health check timeout. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. | -| `CARRIER_SQS_ENDPOINT` | :white_check_mark: | | The endpoint for the SQS service. Official AWS service endpoints can be found [here](https://docs.aws.amazon.com/general/latest/gr/sqs-service.html). | -| `CARRIER_SQS_QUEUE_NAME` | :white_check_mark: | | The SQS queue name. | -| `CARRIER_SQS_BATCH_SIZE` | | `1` | The batch size each SQS receiver will request from SQS. All webhooks are transmitted one message per HTTP request. | -| `CARRIER_SQS_RECEIVERS` | | `1` | The number of concurrent SQS receivers requesting messages from SQS. | -| `CARRIER_SQS_RECEIVER_WORKERS` | | `1` | The number of concurrent workers transmitting messages as webhooks for each receiver. A common pattern is to set the batch size and receiver workers to the same value, which will cause all messages in a batch to be transmitted in parallel HTTP requests. | +| Environment Variable | Required | Default | Description | +| ------------------------------------------ | ------------------ | ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `CARRIER_ENABLE_COLORIZED_LOGGING` | | `false` | When set to `true`, enables colorized log messages. This is useful when running carrier in a terminal or local Docker environment. | +| `CARRIER_ENABLE_STAT_LOG` | | `false` | When set to `true`, enables periodic statistics log messages. | +| `CARRIER_SQS_ENDPOINT` | :white_check_mark: | | The endpoint for the SQS service. Official AWS service endpoints can be found [here](https://docs.aws.amazon.com/general/latest/gr/sqs-service.html). | +| `CARRIER_SQS_BATCH_SIZE` | | `1` | The batch size each SQS receiver will request from SQS. All webhooks are transmitted one message per HTTP request. | +| `CARRIER_SQS_QUEUE_NAME` | :white_check_mark: | | The SQS queue name. | +| `CARRIER_SQS_RECEIVERS` | | `1` | The number of concurrent SQS receivers requesting messages from SQS. | +| `CARRIER_SQS_RECEIVER_WORKERS` | | `1` | The number of concurrent workers transmitting messages as webhooks for each receiver. A common pattern is to set the batch size and receiver workers to the same value, which will cause all messages in a batch to be transmitted in parallel HTTP requests. | +| `CARRIER_STAT_LOG_TIMER` | | `120s` | The interval between statistics log messages. | +| `CARRIER_WEBHOOK_ENDPOINT` | | `http://localhost:9000` | The full path, including protocol, that webhooks will be sent to. For example, if your worker expects webhooks at `/v1/events`, `http://worker:8080/v1/events`. | +| `CARRIER_WEBHOOK_TLS_INSECURE_SKIP_VERIFY` | | `false` | When set to true, the webhook transmitter will not attempt to validate TLS for an `https` webhook endpoint. | +| `CARRIER_WEBHOOK_DEFAULT_CONTENT_TYPE` | | `application/json` | The default value that will be sent in the `Content-Type` header in all HTTP POSTS to the webhook endpoint. | +| `CARRIER_WEBHOOK_REQUEST_TIMEOUT` | | `60s` | The webhook transmitter request timeout. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. | +| `CARRIER_WEBHOOK_HEALTH_CHECK_ENDPOINT` | | | When set, enables health check functionality using the provided endpoint. | +| `CARRIER_WEBHOOK_OFFLINE_THRESHOLD_COUNT` | | `5` | The number of failed health checks before the webhook is determined to be offline. | +| `CARRIER_WEBHOOK_HEALTH_CHECK_INTERVAL` | | `60s` | The time interval between webhook health checks. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. | +| `CARRIER_WEBHOOK_HEALTH_CHECK_TIMEOUT` | | `10s` | The webhook health check timeout. See Go's [`time.ParseDuration()`](https://pkg.go.dev/time#ParseDuration) for acceptable formats. | ## Architecture Carrier was built with the idea of separating receivers and transmitters. Receivers receive (or read) -messages and transmitters transmit (or send) messages. Currently, the only implemented receiver is +messages and transmitters transmit (or send) messages. Currently, the only implemented receiver is for SQS and the only implemented transmitter is for an HTTP POST (or webhook). In the future this architecture may be used to support multiple messaging queues and transmission methods. diff --git a/main.go b/main.go index 5853924..eaeef22 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,14 @@ import ( type ( Config struct { + EnableColorizedLogging bool `default:"false" split_words:"true"` + EnableStatLog bool `default:"false" split_words:"true"` + SQSBatchSize int `envconfig:"SQS_BATCH_SIZE" default:"1"` + SQSEndpoint string `envconfig:"SQS_ENDPOINT" required:"true"` + SQSQueueName string `envconfig:"SQS_QUEUE_NAME" required:"true"` + SQSReceivers int `envconfig:"SQS_RECEIVERS" default:"1"` + SQSReceiverWorkers int `envconfig:"SQS_RECEIVER_WORKERS" default:"1"` + StatLogTimer time.Duration `default:"120s" split_words:"true"` WebhookEndpoint string `default:"http://localhost:9000" split_words:"true"` WebhookTLSInsecureSkipVerify bool `envconfig:"WEBHOOK_TLS_INSECURE_SKIP_VERIFY" default:"false"` WebhookDefaultContentType string `default:"application/json" split_words:"true"` @@ -29,13 +37,6 @@ type ( WebhookOfflineThresholdCount int `default:"5" split_words:"true"` WebhookHealthCheckInterval time.Duration `default:"60s" split_words:"true"` WebhookHealthCheckTimeout time.Duration `default:"10s" split_words:"true"` - SQSEndpoint string `envconfig:"SQS_ENDPOINT" required:"true"` - SQSQueueName string `envconfig:"SQS_QUEUE_NAME" required:"true"` - SQSBatchSize int `envconfig:"SQS_BATCH_SIZE" default:"1"` - SQSReceivers int `envconfig:"SQS_RECEIVERS" default:"1"` - SQSReceiverWorkers int `envconfig:"SQS_RECEIVER_WORKERS" default:"1"` - EnableStatLog bool `default:"false" split_words:"true"` - StatLogTimer time.Duration `default:"120s" split_words:"true"` } // StatLogger is a utility for logging runtime statistics. @@ -79,10 +80,15 @@ func (l *StatLogger) Run() { // main entry point func main() { var envCfg Config + var logHandler slog.Handler if err := envconfig.Process("carrier", &envCfg); err != nil { panic(err) } - logHandler := tint.NewHandler(os.Stdout, &tint.Options{Level: slog.LevelInfo}) + if envCfg.EnableColorizedLogging { + logHandler = tint.NewHandler(os.Stdout, &tint.Options{Level: slog.LevelInfo}) + } else { + logHandler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}) + } log := slog.New(logHandler).With("source", "main") ctx, cancel := context.WithCancel(context.Background()) awsCfg, err := config.LoadDefaultConfig(ctx)