From 78c174b55b27b461ad864c72ea63f3f56e10af85 Mon Sep 17 00:00:00 2001 From: Rob Cook Date: Fri, 26 Jun 2026 09:22:16 -0400 Subject: [PATCH 1/2] Add NATS JetStream as a sink option. --- assets/svelte/consumers/ShowSink.svelte | 10 + assets/svelte/consumers/ShowSinkHeader.svelte | 3 + .../svelte/consumers/SinkConsumerForm.svelte | 9 + assets/svelte/consumers/SinkIndex.svelte | 7 + assets/svelte/consumers/dynamicRoutingDocs.ts | 16 ++ assets/svelte/consumers/types.ts | 20 ++ .../nats_jetstream/NatsJetstreamIcon.svelte | 26 +++ .../NatsJetstreamSinkCard.svelte | 145 ++++++++++++ .../NatsJetstreamSinkForm.svelte | 203 +++++++++++++++++ config/test.exs | 1 + docs/docs.json | 3 + .../stream-postgres-to-nats-jetstream.mdx | 148 +++++++++++++ docs/how-to/stream-postgres-to-nats.mdx | 3 - docs/quickstart/nats-jetstream.mdx | 207 ++++++++++++++++++ docs/reference/sinks/nats-jetstream.mdx | 105 +++++++++ lib/sequin/consumers/nats_jetstream_sink.ex | 77 +++++++ lib/sequin/consumers/sink_consumer.ex | 3 + lib/sequin/runtime/nats_jetstream_pipeline.ex | 71 ++++++ .../routing/consumers/nats_jetstream.ex | 16 ++ lib/sequin/runtime/routing/routing.ex | 1 + lib/sequin/runtime/sink_pipeline.ex | 1 + lib/sequin/sinks/nats/connection_cache.ex | 29 ++- lib/sequin/sinks/nats/jetstream.ex | 17 ++ lib/sequin/sinks/nats/jetstream_client.ex | 187 ++++++++++++++++ lib/sequin/transforms/transforms.ex | 34 +++ .../live/components/consumer_form.ex | 63 ++++++ lib/sequin_web/live/sink_consumers/show.ex | 19 ++ test/sequin/nats_jetstream_pipeline_test.exs | 135 ++++++++++++ test/support/factory/consumers_factory.ex | 13 ++ test/support/mocks.ex | 4 + 30 files changed, 1562 insertions(+), 14 deletions(-) create mode 100644 assets/svelte/sinks/nats_jetstream/NatsJetstreamIcon.svelte create mode 100644 assets/svelte/sinks/nats_jetstream/NatsJetstreamSinkCard.svelte create mode 100644 assets/svelte/sinks/nats_jetstream/NatsJetstreamSinkForm.svelte create mode 100644 docs/how-to/stream-postgres-to-nats-jetstream.mdx create mode 100644 docs/quickstart/nats-jetstream.mdx create mode 100644 docs/reference/sinks/nats-jetstream.mdx create mode 100644 lib/sequin/consumers/nats_jetstream_sink.ex create mode 100644 lib/sequin/runtime/nats_jetstream_pipeline.ex create mode 100644 lib/sequin/runtime/routing/consumers/nats_jetstream.ex create mode 100644 lib/sequin/sinks/nats/jetstream.ex create mode 100644 lib/sequin/sinks/nats/jetstream_client.ex create mode 100644 test/sequin/nats_jetstream_pipeline_test.exs diff --git a/assets/svelte/consumers/ShowSink.svelte b/assets/svelte/consumers/ShowSink.svelte index f3c7a4447..8e924b2ca 100644 --- a/assets/svelte/consumers/ShowSink.svelte +++ b/assets/svelte/consumers/ShowSink.svelte @@ -22,6 +22,7 @@ SequinStreamConsumer, GcpPubsubConsumer, NatsConsumer, + NatsJetstreamConsumer, RabbitMqConsumer, TypesenseConsumer, ElasticsearchConsumer, @@ -34,6 +35,7 @@ import KafkaSinkCard from "../sinks/kafka/KafkaSinkCard.svelte"; import KinesisSinkCard from "../sinks/kinesis/KinesisSinkCard.svelte"; import NatsSinkCard from "../sinks/nats/NatsSinkCard.svelte"; + import NatsJetstreamSinkCard from "../sinks/nats_jetstream/NatsJetstreamSinkCard.svelte"; import RabbitMqSinkCard from "../sinks/rabbitmq/RabbitMqSinkCard.svelte"; import RedisStreamSinkCard from "../sinks/redis-stream/RedisStreamSinkCard.svelte"; import RedisStringSinkCard from "../sinks/redis-string/RedisStringSinkCard.svelte"; @@ -143,6 +145,12 @@ return consumer.sink.type === "nats"; } + function isNatsJetstreamConsumer( + consumer: Consumer, + ): consumer is NatsJetstreamConsumer { + return consumer.sink.type === "nats_jetstream"; + } + function isGcpPubsubConsumer( consumer: Consumer, ): consumer is GcpPubsubConsumer { @@ -1246,6 +1254,8 @@ {:else if isNatsConsumer(consumer)} + {:else if isNatsJetstreamConsumer(consumer)} + {:else if isRabbitMqConsumer(consumer)} {:else if isTypesenseConsumer(consumer)} diff --git a/assets/svelte/consumers/ShowSinkHeader.svelte b/assets/svelte/consumers/ShowSinkHeader.svelte index fb7df76f3..b53638b58 100644 --- a/assets/svelte/consumers/ShowSinkHeader.svelte +++ b/assets/svelte/consumers/ShowSinkHeader.svelte @@ -27,6 +27,7 @@ import GcpPubsubIcon from "../sinks/gcp_pubsub/GcpPubsubIcon.svelte"; import SequinStreamIcon from "../sinks/sequin_stream/SequinStreamIcon.svelte"; import NatsIcon from "../sinks/nats/NatsIcon.svelte"; + import NatsJetstreamIcon from "../sinks/nats_jetstream/NatsJetstreamIcon.svelte"; import MeilisearchIcon from "../sinks/meilisearch/MeilisearchIcon.svelte"; import RabbitMqIcon from "../sinks/rabbitmq/RabbitMqIcon.svelte"; import AzureEventHubIcon from "../sinks/azure_event_hub/AzureEventHubIcon.svelte"; @@ -146,6 +147,8 @@ {:else if consumer.sink.type === "nats"} + {:else if consumer.sink.type === "nats_jetstream"} + {:else if consumer.sink.type === "meilisearch"} {:else if consumer.sink.type === "rabbitmq"} diff --git a/assets/svelte/consumers/SinkConsumerForm.svelte b/assets/svelte/consumers/SinkConsumerForm.svelte index d5f6484ae..fc9791563 100644 --- a/assets/svelte/consumers/SinkConsumerForm.svelte +++ b/assets/svelte/consumers/SinkConsumerForm.svelte @@ -30,6 +30,7 @@ import GcpPubsubSinkForm from "$lib/sinks/gcp_pubsub/GcpPubsubSinkForm.svelte"; import SequinStreamSinkForm from "$lib/sinks/sequin_stream/SequinStreamSinkForm.svelte"; import NatsSinkForm from "$lib/sinks/nats/NatsSinkForm.svelte"; + import NatsJetstreamSinkForm from "$lib/sinks/nats_jetstream/NatsJetstreamSinkForm.svelte"; import RabbitMqSinkForm from "$lib/sinks/rabbitmq/RabbitMqSinkForm.svelte"; import AzureEventHubSinkForm from "$lib/sinks/azure_event_hub/AzureEventHubSinkForm.svelte"; import { CircleAlert, Info, Plus } from "lucide-svelte"; @@ -752,6 +753,14 @@ {refreshFunctions} bind:functionRefreshState /> + {:else if consumer.type === "nats_jetstream"} + {:else if consumer.type === "rabbitmq"} = { }, }, }, + nats_jetstream: { + fields: { + subject: { + description: "The NATS JetStream subject to publish messages to", + staticValue: + "sequin....", + dynamicDefault: + "sequin....", + }, + headers: { + description: "Map of key value pairs", + staticValue: '%{"Nats-Msg-Id" => }', + dynamicDefault: '%{"Nats-Msg-Id" => }', + }, + }, + }, kafka: { fields: { topic: { diff --git a/assets/svelte/consumers/types.ts b/assets/svelte/consumers/types.ts index ab128c33c..8d9524181 100644 --- a/assets/svelte/consumers/types.ts +++ b/assets/svelte/consumers/types.ts @@ -138,6 +138,23 @@ export type NatsConsumer = BaseConsumer & { }; }; +// NATS JetStream specific sink +export type NatsJetstreamConsumer = BaseConsumer & { + sink: { + type: "nats_jetstream"; + host: string; + port: number; + stream_name: string; + domain: string; + publish_timeout_ms: number; + username: string; + password: string; + jwt: string; + nkey_seed: string; + tls: boolean; + }; +}; + // Azure Event Hub specific sink export type AzureEventHubConsumer = BaseConsumer & { sink: { @@ -260,6 +277,7 @@ export type Consumer = | SequinStreamConsumer | GcpPubsubConsumer | NatsConsumer + | NatsJetstreamConsumer | AzureEventHubConsumer | RabbitMqConsumer | TypesenseConsumer @@ -278,6 +296,7 @@ export const SinkTypeValues = [ "gcp_pubsub", "elasticsearch", "nats", + "nats_jetstream", "rabbitmq", "typesense", "meilisearch", @@ -292,6 +311,7 @@ export const RoutedSinkTypeValues = [ "redis_string", "redis_stream", "nats", + "nats_jetstream", "kafka", "gcp_pubsub", "typesense", diff --git a/assets/svelte/sinks/nats_jetstream/NatsJetstreamIcon.svelte b/assets/svelte/sinks/nats_jetstream/NatsJetstreamIcon.svelte new file mode 100644 index 000000000..995767310 --- /dev/null +++ b/assets/svelte/sinks/nats_jetstream/NatsJetstreamIcon.svelte @@ -0,0 +1,26 @@ + + + + + + + + + + + + + diff --git a/assets/svelte/sinks/nats_jetstream/NatsJetstreamSinkCard.svelte b/assets/svelte/sinks/nats_jetstream/NatsJetstreamSinkCard.svelte new file mode 100644 index 000000000..90ab60031 --- /dev/null +++ b/assets/svelte/sinks/nats_jetstream/NatsJetstreamSinkCard.svelte @@ -0,0 +1,145 @@ + + + + +
+

NATS JetStream configuration

+
+ +
+
+ Host +
+
+ {consumer.sink.host} +
+
+
+ +
+ Port +
+
+ {consumer.sink.port} +
+
+
+ +
+ Stream Name +
+ {consumer.sink.stream_name || "-"} +
+
+ +
+ Domain +
+ {consumer.sink.domain || "-"} +
+
+ +
+ Username +
+ {consumer.sink.username || "-"} +
+
+ +
+ TLS Enabled +
+ {consumer.sink.tls ? "Yes" : "No"} +
+
+ +
+ JWT +
+ {consumer.sink.jwt ? "********" : "-"} +
+
+ +
+ NKey Seed +
+ {consumer.sink.nkey_seed ? "********" : "-"} +
+
+ +
+ Publish Timeout +
+ {consumer.sink.publish_timeout_ms}ms +
+
+ +
+ Subject +
+ + {#if consumer.routing_id} + Determined by router + + {:else} + sequin.{consumer.database + .name}.<table_schema>.<table_name>.<action> + {/if} + +
+
+
+ + {#if consumer.routing} +
+ Router +
+
{consumer.routing.function.code}
+
+
+ {/if} +
+
diff --git a/assets/svelte/sinks/nats_jetstream/NatsJetstreamSinkForm.svelte b/assets/svelte/sinks/nats_jetstream/NatsJetstreamSinkForm.svelte new file mode 100644 index 000000000..d9c5d3fd5 --- /dev/null +++ b/assets/svelte/sinks/nats_jetstream/NatsJetstreamSinkForm.svelte @@ -0,0 +1,203 @@ + + + + + NATS JetStream Configuration + + + + + + {#if errors.sink?.host} + {errors.sink.host} + {/if} + + + + + + {#if errors.sink?.port} + {errors.sink.port} + {/if} + + + + + + {#if errors.sink?.stream_name} + {errors.sink.stream_name} + {/if} + + + + + + {#if errors.sink?.domain} + {errors.sink.domain} + {/if} + + + + + + {#if errors.sink?.publish_timeout_ms} + {errors.sink.publish_timeout_ms} + {/if} + + + +
+ { + form.sink.tls = checked; + }} + /> + +
+ {#if errors.sink?.tls} + {errors.sink.tls} + {/if} +
+ + + + + {#if errors.sink?.username} + {errors.sink.username} + {/if} + + + + +
+ + +
+ {#if errors.sink?.password} + {errors.sink.password} + {/if} +
+ + + +
+ + +
+ {#if errors.sink?.jwt} + {errors.sink.jwt} + {/if} +
+ + + +
+ + +
+ {#if errors.sink?.nkey_seed} + {errors.sink.nkey_seed} + {/if} +
+
+
+ + + + Routing + + + + + diff --git a/config/test.exs b/config/test.exs index f3db2d5f6..84e763312 100644 --- a/config/test.exs +++ b/config/test.exs @@ -113,6 +113,7 @@ config :sequin, redis_module: Sequin.Sinks.RedisMock, kafka_module: Sequin.Sinks.KafkaMock, nats_module: Sequin.Sinks.NatsMock, + nats_jetstream_module: Sequin.Sinks.NatsJetstreamMock, rabbitmq_module: Sequin.Sinks.RabbitMqMock, aws_module: Sequin.AwsMock, # Arbitrarily high memory limit for testing diff --git a/docs/docs.json b/docs/docs.json index 2a3ae212c..a439e164d 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -33,6 +33,7 @@ "quickstart/redis-stream", "quickstart/gcp-pubsub", "quickstart/nats", + "quickstart/nats-jetstream", "quickstart/rabbitmq", "quickstart/s2", "quickstart/sequin-stream", @@ -59,6 +60,7 @@ "how-to/stream-postgres-to-kafka", "how-to/stream-postgres-to-meilisearch", "how-to/stream-postgres-to-nats", + "how-to/stream-postgres-to-nats-jetstream", "how-to/stream-postgres-to-rabbitmq", "how-to/stream-postgres-to-redis-string", "how-to/stream-postgres-to-redis-stream", @@ -106,6 +108,7 @@ "reference/sinks/kafka", "reference/sinks/meilisearch", "reference/sinks/nats", + "reference/sinks/nats-jetstream", "reference/sinks/rabbitmq", "reference/sinks/redis-string", "reference/sinks/redis-stream", diff --git a/docs/how-to/stream-postgres-to-nats-jetstream.mdx b/docs/how-to/stream-postgres-to-nats-jetstream.mdx new file mode 100644 index 000000000..c59b124fa --- /dev/null +++ b/docs/how-to/stream-postgres-to-nats-jetstream.mdx @@ -0,0 +1,148 @@ +--- +title: "How to stream Postgres to NATS JetStream" +sidebarTitle: "NATS JetStream" +description: "Build persistent, event-driven applications with NATS JetStream and Postgres change data capture (CDC). Stream database changes to persistent JetStream streams for at-least-once delivery." +--- + +This guide shows you how to set up Postgres change data capture (CDC) and stream changes to NATS JetStream using Sequin. + +With Postgres data streaming to NATS JetStream, you can build durable pipelines that survive consumer downtime, trigger workflows, keep services in sync, [build audit logs](/how-to/create-audit-logs), [maintain caches](/how-to/maintain-caches), and more. + +By the end of this how-to, you'll have database changes flowing into a persistent NATS JetStream stream. + + + This is the how-to guide for streaming Postgres to NATS JetStream. See the [quickstart](/quickstart/nats-jetstream) for a step-by-step walkthrough or the [reference](/reference/sinks/nats-jetstream) for details on all configuration options. + + +## Prerequisites + +If you're self-hosting Sequin, you'll need: + +1. [Sequin installed](/running-sequin) +2. [A database connected](/connect-postgres) +3. A NATS server with JetStream enabled + +## Basic setup + +### Prepare your NATS server + +You'll need a NATS server with JetStream enabled. You can use a local server for development or a cloud-hosted NATS service in production. + +#### Local development with Docker + +Start a NATS server with JetStream enabled: + +```bash +docker run --name nats -p 4222:4222 -d nats:latest -js +``` + +#### Create a JetStream stream + +Before creating the Sequin sink, create a JetStream stream to capture the messages. Using the NATS CLI: + +```bash +nats stream add SEQUIN \ + --subjects "sequin.>" \ + --storage file \ + --replicas 1 \ + --retention limits \ + --max-msgs=-1 \ + --max-bytes=-1 \ + --max-age=0 \ + --discard old +``` + +This creates a stream named `SEQUIN` that captures all subjects matching `sequin.>`. + +## Create NATS JetStream sink + +Navigate to the "Sinks" tab, click "Create Sink", and select "NATS JetStream Sink". + +### Configure the source + + + + Under "Source", select the schemas and tables you want to stream data from. + + + + Add [filters](/reference/filters) to the sink to control which database changes are sent to your stream. + + + + Add a [transform](/reference/transforms) to the sink to modify the payload before it's sent to JetStream. + + + + You can optionally indicate if you want JetStream to receive a [backfill](/reference/backfills) of all or a portion of the table's existing data. + + You can backfill at any time. If you don't want to backfill, toggle "Backfill" off. + + + + Under "Message grouping", leave the default option selected to ensure events for the same row are sent to JetStream in order. + + + +### Configure NATS JetStream + + + + Fill in your NATS connection details: + + - **Host** (required): The hostname of your NATS server (e.g., `localhost` or `nats.example.com`) + - **Port** (required): The port number (default: `4222`) + - **Stream Name** (optional): The name of your JetStream stream. Sequin ensures the stream exists when the sink is enabled. If not set, the default `SEQUIN` is used. + - **Domain** (optional): The JetStream domain, if using leaf nodes or a specific domain. + - **Publish Timeout** (optional): Milliseconds to wait for a JetStream publish acknowledgment (default: `5000`). + + + + Click "Test Connection". Sequin will verify both connectivity and that JetStream is enabled. + + + + Give your sink a name, then click "Create NATS JetStream Sink". + + + +## Verify & debug + +To verify that your NATS JetStream sink is working: + +1. Subscribe to your stream using the NATS CLI: + ```bash + nats consumer add SEQUIN my-consumer --pull --deliver all --filter "sequin.>" + nats consumer next SEQUIN my-consumer --count 10 + ``` +2. Make some changes in your source table +3. Verify that the message count for your sink increases in the Sequin web console +4. Verify that the messages appear in your JetStream consumer + +If messages don't seem to be flowing: + +1. Click the "Messages" tab to view the state of messages for your sink +2. Click any failed message +3. Check the delivery logs for error details, including JetStream publish errors + +Common issues to check: +- NATS server is running with JetStream enabled (`-js` flag) +- The stream exists and its subject filter matches `sequin.>` (or your configured subjects) +- Host and port are correct +- Network connectivity between Sequin and NATS + +## Next steps + +- **Set up a consumer** + + Now that your Postgres data is flowing into a JetStream stream, set up a pull or push consumer to process the messages. See the [NATS JetStream documentation](https://docs.nats.io/nats-concepts/jetstream) for details. + + Refer to the [NATS JetStream sink reference](/reference/sinks/nats-jetstream) for the shape of messages that Sequin will publish. + +- **Deploy your implementation** + + When you're ready to deploy, see "[How to deploy to production](/how-to/deploy-to-production)". + +- **Advanced configuration** + + For more about how NATS JetStream sinks work, see the [NATS JetStream sink reference](/reference/sinks/nats-jetstream). diff --git a/docs/how-to/stream-postgres-to-nats.mdx b/docs/how-to/stream-postgres-to-nats.mdx index 53a46d64c..9ad0173af 100644 --- a/docs/how-to/stream-postgres-to-nats.mdx +++ b/docs/how-to/stream-postgres-to-nats.mdx @@ -22,9 +22,6 @@ If you're self-hosting Sequin, you'll need: 2. [A database connected](/connect-postgres) 3. A NATS server ready to go - -3. A NATS server ready to go - ## Basic setup ### Prepare your NATS server diff --git a/docs/quickstart/nats-jetstream.mdx b/docs/quickstart/nats-jetstream.mdx new file mode 100644 index 000000000..22ff6028f --- /dev/null +++ b/docs/quickstart/nats-jetstream.mdx @@ -0,0 +1,207 @@ +--- +title: 'Get started with NATS JetStream' +sidebarTitle: 'NATS JetStream' +description: 'Stream Postgres changes to NATS JetStream streams in minutes. Build persistent, at-least-once real-time pipelines on top of your Postgres data with NATS JetStream.' +--- + +import QuickstartInitialSteps from '/snippets/quickstart-initial-steps.mdx'; +import QuickstartSourceStep from '/snippets/quickstart-source-step.mdx'; +import QuickstartBackfillStep from '/snippets/quickstart-backfill-step.mdx'; + +In this quickstart, you'll create a real-time data pipeline that streams changes from a Postgres database to a NATS JetStream stream. You'll: + +- Boot Sequin +- Connect to a sample playground database +- Configure a JetStream stream to receive database changes +- See your changes flow in real-time + +By the end, you'll have hands-on experience setting up Postgres change data capture (CDC) with Sequin and NATS JetStream. + + + This is the quickstart for streaming Postgres to NATS JetStream. See the [how-to guide](/how-to/stream-postgres-to-nats-jetstream) for an explanation of how to use the NATS JetStream sink or the [reference](/reference/sinks/nats-jetstream) for details on all configuration options. + + + + + If you don't already have NATS running with JetStream enabled, start it with Docker: + + ```bash + docker run --name nats -p 4222:4222 -d nats:latest -js + ``` + + Then create a stream to capture Sequin messages: + + ```bash + docker run --rm --network host natsio/nats-box:latest \ + nats stream add SEQUIN \ + --subjects "sequin.>" \ + --storage file \ + --replicas 1 \ + --retention limits \ + --defaults + ``` + + + If you're using an existing NATS server, ensure JetStream is enabled and you have the connection details ready. + + + + + + + Before creating the sink, set up a consumer to watch for messages: + + ```bash + # Run commands one at a time to subscribe successfully + docker run --rm -it --network host natsio/nats-box:latest + + nats consumer add SEQUIN quickstart --pull --deliver all --filter "sequin.>" --defaults + + nats consumer next SEQUIN quickstart --count 100 --wait 60s + ``` + + Keep this terminal open — you'll see messages start flowing once we create the sink. + + + + With the playground database connected, you can create a [sink](/reference/sinks/overview). This sink will send changes to the `products` table to your JetStream stream: + + + + Click "Sinks" in the sidebar navigation, then click "Create Sink". + + + + Select "NATS JetStream" as the sink type and click "Continue". + + + + + + + In the "NATS JetStream Configuration" card, enter your connection details: + + - **Host**: If running NATS and Sequin locally with Docker, use `host.docker.internal` + - **Port**: The port NATS is listening on (default: `4222`) + - **Stream Name**: `SEQUIN` (the stream we created above) + - **Domain**: (Optional) Use for NATS Leaf Node disambiguation + - **Username**: (Optional) Your NATS username if authentication is enabled + - **Password**: (Optional) Your NATS password if authentication is enabled + - **JWT**: (Optional) Your NATS JWT if authentication is enabled + - **NKey Seed**: (Optional) Your NATS NKey seed if NKey authentication is enabled + - **TLS**: Enable TLS/SSL for secure connection (default: disabled) + + + + At the bottom of the form, click the "Test Connection" button. Sequin will verify connectivity and JetStream is enabled. + + + Sequin can connect to your NATS server and JetStream is enabled. + + + + + You can leave the rest of the defaults. As configured, the JetStream stream will first receive a backfill of all rows currently in the `products` table. Then, it will receive all changes to the `products` table in real-time. + + Click "Create Sink" to finish setting up your NATS JetStream sink. + + + If the stream does not exist, it will be created when the pipeline starts for the first time. + + + + + + + On the new sink's overview page, you should see the "Health" status turn green, indicating data is flowing to your JetStream stream. + + Let's confirm messages are flowing: + + + + Click the "Messages" tab. You'll see a list of the recently delivered messages. + + + Sequin indicates it backfilled the `products` table to your JetStream stream. + + + + Return to the terminal where you started the NATS consumer earlier. You should see the messages that were sent from Sequin. + + These are [`read` events](/reference/messages) from the initial backfill of the `products` table. + + + Messages are flowing from Sequin to your NATS JetStream stream. + + + + + Let's make some changes to the `products` table and see them flow to your stream. + + In your terminal, run the following command to insert a new row into the `products` table: + + ```bash + docker exec -i sequin-sequin_postgres-1 \ + psql -U postgres -d sequin_playground -c \ + "insert into products (name, price) values ('Organic Honey (16 oz)', 12.99);" + ``` + + In your NATS consumer terminal, you should see a message corresponding to the inserted row. + + Feel free to try other changes: + + + + ```bash + docker exec -i sequin-sequin_postgres-1 \ + psql -U postgres -d sequin_playground -c \ + "update products set price = 7.99 where name = 'Avocados (3 pack)';" + ``` + + + + ```bash + docker exec -i sequin-sequin_postgres-1 \ + psql -U postgres -d sequin_playground -c \ + "update products set name = 'Organic Avocados (3 pack)' where name = 'Avocados (3 pack)';" + ``` + + + + ```bash + docker exec -i sequin-sequin_postgres-1 \ + psql -U postgres -d sequin_playground -c \ + "delete from products where name = 'Blueberries (6 oz)';" + ``` + + + + Each change will appear in your JetStream stream within a few seconds. + + + + + + + Great work! + + +You've successfully: + +- Set up a complete Postgres change data capture pipeline with at-least-once delivery +- Loaded existing data through a backfill +- Made changes to the `products` table +- Verified changes are flowing to your NATS JetStream stream + +## Ready to stream + +Now you're ready to connect your own database to Sequin and start streaming changes: + + + + Connect your Postgres database to Sequin. + + + Setup a NATS JetStream sink to send changes to your stream. + + diff --git a/docs/reference/sinks/nats-jetstream.mdx b/docs/reference/sinks/nats-jetstream.mdx new file mode 100644 index 000000000..549337f26 --- /dev/null +++ b/docs/reference/sinks/nats-jetstream.mdx @@ -0,0 +1,105 @@ +--- +title: "NATS JetStream sink" +sidebarTitle: "NATS JetStream sink" +description: "Reference for configuring and using the NATS JetStream sink with Sequin." +--- + +The NATS JetStream sink publishes messages to NATS JetStream streams with at-least-once delivery guarantees. + + + This is the reference for the NATS JetStream sink. See the [quickstart](/quickstart/nats-jetstream) for a step-by-step walkthrough or the [how-to guide](/how-to/stream-postgres-to-nats-jetstream) for an explanation of how to use the NATS JetStream sink. + + +## Configuration + +- **Host** + + The hostname of your NATS server (e.g., `localhost`). + +- **Port** + + The port number for your NATS server (default: `4222`). + +- **Stream Name** (optional) + + The name of the JetStream stream to publish to (default: `SEQUIN`). If the stream does not exist, Sequin will create it automatically with a subject filter of `sequin.>` when the sink is first enabled. + +- **Domain** (optional) + + The JetStream domain, used when connecting to JetStream via leaf nodes or a specific domain. + +- **Publish Timeout** (optional) + + The timeout in milliseconds to wait for a JetStream publish acknowledgment (default: `5000`). If the stream does not acknowledge the message within this time, Sequin will retry. + +- **Username** (optional) + + Username for authentication with the NATS server. + +- **Password** (optional) + + Password for authentication with the NATS server. + +- **JWT** (optional) + + JSON Web Token for authentication with the NATS server. + +- **NKey Seed** (optional) + + NKey seed for authentication with the NATS server. + +- **TLS** (optional) + + Enable TLS/SSL for secure connections to the NATS server (default: `false`). + +## JetStream vs. core NATS + +Unlike the [NATS sink](/reference/sinks/nats), which uses fire-and-forget publishing, the NATS JetStream sink uses request-reply publishing. Sequin waits for a publish acknowledgment from the JetStream server before marking the message as delivered. This provides: + +- **At-least-once delivery**: Messages are guaranteed to reach the stream or Sequin will retry. +- **Persistence**: Messages are stored in the stream and can be replayed by consumers. +- **Server-side deduplication**: The `Nats-Msg-Id` header included with each message enables JetStream's duplicate window to deduplicate replayed messages. + +## Message format + +Sequin sends messages to NATS JetStream as JSON. You can find the shape of the messages in the [messages reference](/reference/messages). + +## Subject naming + +By default, messages are published to subjects using the following pattern: + +``` +sequin.... +``` + +For example, if you're streaming changes from a table called `products` in the `public` schema of a database called `shop`: + +- `sequin.shop_prod.public.products.insert` +- `sequin.shop_prod.public.products.update` +- `sequin.shop_prod.public.products.delete` + +Your JetStream stream must be configured to capture these subjects (e.g., with a subject filter of `sequin.>`). + +You can use a [routing function](/reference/routing) to generate your own dynamic subjects. + +## Message headers + +Each message includes the following NATS header: + +- `Nats-Msg-Id`: A unique Sequin identifier for the message, used for JetStream server-side deduplication within the stream's duplicate window. + +## Retry behavior + +If Sequin does not receive a JetStream publish acknowledgment within the configured publish timeout, or if the stream returns an error, Sequin will retry the message indefinitely with exponential backoff up to a maximum of roughly 3 minutes. + +## Message ordering + +JetStream preserves message order within a subject. Messages are published in the same order they are received from your source table. + +## Debugging + +You can view the status of your NATS JetStream sink in the Sequin web console. + +On the "Messages" tab, you can see which messages are in-flight to JetStream, which messages Sequin is unable to deliver, and recently delivered messages. + +Messages that Sequin is unable to deliver will have a "Deliver count" greater than `1`. You can click on a message to see more details, including the last error response from the JetStream server. diff --git a/lib/sequin/consumers/nats_jetstream_sink.ex b/lib/sequin/consumers/nats_jetstream_sink.ex new file mode 100644 index 000000000..ddf0ffabd --- /dev/null +++ b/lib/sequin/consumers/nats_jetstream_sink.ex @@ -0,0 +1,77 @@ +defmodule Sequin.Consumers.NatsJetstreamSink do + @moduledoc false + use Ecto.Schema + use TypedEctoSchema + + import Ecto.Changeset + + alias Sequin.Encrypted.Field, as: EncryptedField + + @derive {Jason.Encoder, only: [:host, :port, :stream_name, :domain]} + @derive {Inspect, except: [:password, :jwt, :nkey_seed]} + @primary_key false + typed_embedded_schema do + field :type, Ecto.Enum, values: [:nats_jetstream], default: :nats_jetstream + field :host, :string + field :port, :integer + field :username, :string + field :password, EncryptedField + field :jwt, EncryptedField + field :nkey_seed, EncryptedField + field :tls, :boolean, default: false + field :stream_name, :string + field :domain, :string + field :publish_timeout_ms, :integer, default: 5_000 + field :connection_id, :string + end + + def changeset(struct, params) do + struct + |> cast(params, [ + :host, + :port, + :username, + :password, + :jwt, + :nkey_seed, + :tls, + :stream_name, + :domain, + :publish_timeout_ms + ]) + |> validate_required([:host, :port]) + |> validate_number(:port, greater_than: 0, less_than: 65_536) + |> validate_number(:publish_timeout_ms, greater_than: 0, less_than_or_equal_to: 60_000) + |> validate_length(:stream_name, max: 255) + |> validate_format(:stream_name, ~r/^[a-zA-Z0-9_-]+$/, + message: "must contain only alphanumeric characters, hyphens, and underscores" + ) + |> put_connection_id() + end + + defp put_connection_id(changeset) do + case get_field(changeset, :connection_id) do + nil -> put_change(changeset, :connection_id, Ecto.UUID.generate()) + _ -> changeset + end + end + + def connection_opts(%__MODULE__{} = sink) do + %{ + host: sink.host, + port: sink.port, + username: sink.username, + password: sink.password, + jwt: sink.jwt, + nkey_seed: sink.nkey_seed, + tls: sink.tls + } + end + + def ipv6?(%__MODULE__{} = sink) do + case :inet.getaddr(to_charlist(sink.host), :inet) do + {:ok, _} -> false + {:error, _} -> true + end + end +end diff --git a/lib/sequin/consumers/sink_consumer.ex b/lib/sequin/consumers/sink_consumer.ex index 13bbb61a6..f3ab9b080 100644 --- a/lib/sequin/consumers/sink_consumer.ex +++ b/lib/sequin/consumers/sink_consumer.ex @@ -19,6 +19,7 @@ defmodule Sequin.Consumers.SinkConsumer do alias Sequin.Consumers.KafkaSink alias Sequin.Consumers.KinesisSink alias Sequin.Consumers.MeilisearchSink + alias Sequin.Consumers.NatsJetstreamSink alias Sequin.Consumers.NatsSink alias Sequin.Consumers.RabbitMqSink alias Sequin.Consumers.RedisStreamSink @@ -47,6 +48,7 @@ defmodule Sequin.Consumers.SinkConsumer do :sequin_stream, :gcp_pubsub, :nats, + :nats_jetstream, :rabbitmq, :azure_event_hub, :typesense, @@ -132,6 +134,7 @@ defmodule Sequin.Consumers.SinkConsumer do sequin_stream: SequinStreamSink, gcp_pubsub: GcpPubsubSink, nats: NatsSink, + nats_jetstream: NatsJetstreamSink, rabbitmq: RabbitMqSink, azure_event_hub: AzureEventHubSink, typesense: TypesenseSink, diff --git a/lib/sequin/runtime/nats_jetstream_pipeline.ex b/lib/sequin/runtime/nats_jetstream_pipeline.ex new file mode 100644 index 000000000..91485810e --- /dev/null +++ b/lib/sequin/runtime/nats_jetstream_pipeline.ex @@ -0,0 +1,71 @@ +defmodule Sequin.Runtime.NatsJetstreamPipeline do + @moduledoc false + @behaviour Sequin.Runtime.SinkPipeline + + alias Sequin.Error + alias Sequin.Runtime.Routing + alias Sequin.Runtime.SinkPipeline + alias Sequin.Sinks.NatsJetstream + + require Logger + + @impl SinkPipeline + def init(context, _opts) do + %{consumer: consumer} = context + setup_allowances(Map.get(context, :test_pid)) + + case NatsJetstream.ensure_stream(consumer.sink) do + :ok -> + :ok + + {:error, error} -> + Logger.warning("Failed to ensure JetStream stream exists: #{inspect(error)}") + end + + context + end + + @impl SinkPipeline + def batchers_config(_consumer) do + concurrency = min(System.schedulers_online() * 2, 80) + + [ + default: [ + concurrency: concurrency, + batch_size: 10, + batch_timeout: 5 + ] + ] + end + + @impl SinkPipeline + def handle_batch(:default, messages, _batch_info, context) do + %{consumer: consumer, test_pid: test_pid} = context + setup_allowances(test_pid) + + routed_messages = Routing.route_and_transform_messages(consumer, messages) + + case NatsJetstream.send_messages(consumer, routed_messages) do + :ok -> + {:ok, messages, context} + + {:error, error} -> + reason = + Error.service( + service: :nats, + code: "publish_error", + message: "NATS JetStream publish failed", + details: %{error: error} + ) + + {:error, reason} + end + end + + defp setup_allowances(nil), do: :ok + + defp setup_allowances(test_pid) do + Mox.allow(Sequin.Sinks.NatsJetstreamMock, test_pid, self()) + Mox.allow(Sequin.TestSupport.DateTimeMock, test_pid, self()) + end +end diff --git a/lib/sequin/runtime/routing/consumers/nats_jetstream.ex b/lib/sequin/runtime/routing/consumers/nats_jetstream.ex new file mode 100644 index 000000000..9ba718942 --- /dev/null +++ b/lib/sequin/runtime/routing/consumers/nats_jetstream.ex @@ -0,0 +1,16 @@ +defmodule Sequin.Runtime.Routing.Consumers.NatsJetstream do + @moduledoc false + use Sequin.Runtime.Routing.RoutedConsumer + + alias Sequin.Runtime.Routing.Consumers.Nats + + @primary_key false + @derive {Jason.Encoder, only: [:subject]} + typed_embedded_schema do + field :subject, :string + field :headers, :map + end + + defdelegate changeset(struct, params), to: Nats + defdelegate route(action, record, changes, metadata), to: Nats +end diff --git a/lib/sequin/runtime/routing/routing.ex b/lib/sequin/runtime/routing/routing.ex index 22e5ce5cc..a48733531 100644 --- a/lib/sequin/runtime/routing/routing.ex +++ b/lib/sequin/runtime/routing/routing.ex @@ -85,6 +85,7 @@ defmodule Sequin.Runtime.Routing do :redis_string -> Sequin.Runtime.Routing.Consumers.RedisString :redis_stream -> Sequin.Runtime.Routing.Consumers.RedisStream :nats -> Sequin.Runtime.Routing.Consumers.Nats + :nats_jetstream -> Sequin.Runtime.Routing.Consumers.NatsJetstream :kafka -> Sequin.Runtime.Routing.Consumers.Kafka :gcp_pubsub -> Sequin.Runtime.Routing.Consumers.GcpPubsub :s2 -> Sequin.Runtime.Routing.Consumers.S2 diff --git a/lib/sequin/runtime/sink_pipeline.ex b/lib/sequin/runtime/sink_pipeline.ex index 4cade4020..5bc020844 100644 --- a/lib/sequin/runtime/sink_pipeline.ex +++ b/lib/sequin/runtime/sink_pipeline.ex @@ -444,6 +444,7 @@ defmodule Sequin.Runtime.SinkPipeline do :kafka -> Sequin.Runtime.KafkaPipeline :kinesis -> Sequin.Runtime.KinesisPipeline :nats -> Sequin.Runtime.NatsPipeline + :nats_jetstream -> Sequin.Runtime.NatsJetstreamPipeline :rabbitmq -> Sequin.Runtime.RabbitMqPipeline :redis_stream -> Sequin.Runtime.RedisStreamPipeline :redis_string -> Sequin.Runtime.RedisStringPipeline diff --git a/lib/sequin/sinks/nats/connection_cache.ex b/lib/sequin/sinks/nats/connection_cache.ex index 93befb0c0..a7071d87f 100644 --- a/lib/sequin/sinks/nats/connection_cache.ex +++ b/lib/sequin/sinks/nats/connection_cache.ex @@ -5,7 +5,8 @@ defmodule Sequin.Sinks.Nats.ConnectionCache do By caching these connections, we can avoid paying a significant startup penalty when performing multiple operations on the same NATS instance. - Each `Sequin.Consumers.NatsSink` gets its own connection in the cache. + Each `Sequin.Consumers.NatsSink` or `Sequin.Consumers.NatsJetstreamSink` + gets its own connection in the cache. The cache takes ownership of the NATS connections and is responsible for closing them when they are invalidated (or when the cache is stopped). Thus, @@ -19,6 +20,7 @@ defmodule Sequin.Sinks.Nats.ConnectionCache do use GenServer + alias Sequin.Consumers.NatsJetstreamSink alias Sequin.Consumers.NatsSink require Logger @@ -26,7 +28,7 @@ defmodule Sequin.Sinks.Nats.ConnectionCache do defmodule Cache do @moduledoc false - @type sink :: NatsSink.t() + @type sink :: NatsSink.t() | NatsJetstreamSink.t() @type entry :: %{ conn: pid() | atom(), options_hash: binary() @@ -85,9 +87,10 @@ defmodule Sequin.Sinks.Nats.ConnectionCache do @moduledoc false use TypedStruct + alias Sequin.Consumers.NatsJetstreamSink alias Sequin.Consumers.NatsSink - @type sink :: NatsSink.t() + @type sink :: NatsSink.t() | NatsJetstreamSink.t() @type opt :: {:start_fn, State.start_function()} | {:stop_fn, State.stop_function()} @type start_function :: (sink() -> start_result()) @type start_result :: @@ -151,7 +154,7 @@ defmodule Sequin.Sinks.Nats.ConnectionCache do %{state | cache: new_cache} end - defp default_start(%NatsSink{} = sink) do + defp default_start(sink) do %{host: sink.host, port: sink.port} |> put_opt_key(:username, sink.username) |> put_opt_key(:password, sink.password) @@ -166,6 +169,10 @@ defmodule Sequin.Sinks.Nats.ConnectionCache do if NatsSink.ipv6?(sink), do: Map.put(opts, :tcp_opts, [:inet6, :binary]), else: opts end + defp put_ipv6(opts, %NatsJetstreamSink{} = sink) do + if NatsJetstreamSink.ipv6?(sink), do: Map.put(opts, :tcp_opts, [:inet6, :binary]), else: opts + end + defp put_tls(opts, true) do opts |> Map.put(:tls, true) @@ -183,7 +190,7 @@ defmodule Sequin.Sinks.Nats.ConnectionCache do end end - @type sink :: NatsSink.t() + @type sink :: NatsSink.t() | NatsJetstreamSink.t() @type opt :: State.opt() @type start_result :: State.start_result() @@ -195,18 +202,18 @@ defmodule Sequin.Sinks.Nats.ConnectionCache do @spec connection(sink()) :: start_result() @spec connection(GenServer.server(), sink()) :: start_result() - def connection(server \\ __MODULE__, %NatsSink{} = sink) do + def connection(server \\ __MODULE__, sink) do GenServer.call(server, {:connection, sink, true}) end @spec invalidate_connection(GenServer.server(), sink()) :: :ok - def invalidate_connection(server \\ __MODULE__, %NatsSink{} = sink) do + def invalidate_connection(server \\ __MODULE__, sink) do GenServer.cast(server, {:invalidate_connection, sink}) end # This function is intended for test purposes only @spec cache_connection(GenServer.server(), sink(), pid()) :: :ok - def cache_connection(server \\ __MODULE__, %NatsSink{} = sink, conn) do + def cache_connection(server \\ __MODULE__, sink, conn) do GenServer.call(server, {:cache_connection, sink, conn}) end @@ -218,7 +225,7 @@ defmodule Sequin.Sinks.Nats.ConnectionCache do end @impl GenServer - def handle_call({:connection, %NatsSink{} = sink, create_on_miss}, _from, %State{} = state) do + def handle_call({:connection, sink, create_on_miss}, _from, %State{} = state) do case State.find_or_create_connection(state, sink, create_on_miss) do {:ok, conn, new_state} -> {:reply, {:ok, conn}, new_state} @@ -233,14 +240,14 @@ defmodule Sequin.Sinks.Nats.ConnectionCache do # This function is intended for test purposes only @impl GenServer - def handle_call({:cache_connection, %NatsSink{} = sink, conn}, _from, %State{} = state) do + def handle_call({:cache_connection, sink, conn}, _from, %State{} = state) do new_cache = Cache.store(state.cache, sink, conn) new_state = %{state | cache: new_cache} {:reply, :ok, new_state} end @impl GenServer - def handle_cast({:invalidate_connection, %NatsSink{} = sink}, %State{} = state) do + def handle_cast({:invalidate_connection, sink}, %State{} = state) do new_state = State.invalidate_connection(state, sink) {:noreply, new_state} end diff --git a/lib/sequin/sinks/nats/jetstream.ex b/lib/sequin/sinks/nats/jetstream.ex new file mode 100644 index 000000000..5e31d9dbb --- /dev/null +++ b/lib/sequin/sinks/nats/jetstream.ex @@ -0,0 +1,17 @@ +defmodule Sequin.Sinks.NatsJetstream do + @moduledoc false + alias Sequin.Consumers.NatsJetstreamSink + alias Sequin.Consumers.SinkConsumer + alias Sequin.Error + alias Sequin.Runtime.Routing.RoutedMessage + + @callback send_messages(SinkConsumer.t(), [RoutedMessage.t()]) :: + :ok | {:error, Error.t()} + @callback test_connection(NatsJetstreamSink.t()) :: :ok | {:error, Error.t()} + @callback ensure_stream(NatsJetstreamSink.t()) :: :ok | {:error, Error.t()} + + @client Application.compile_env(:sequin, :nats_jetstream_module, Sequin.Sinks.Nats.JetstreamClient) + defdelegate send_messages(consumer, messages), to: @client + defdelegate test_connection(sink), to: @client + defdelegate ensure_stream(sink), to: @client +end diff --git a/lib/sequin/sinks/nats/jetstream_client.ex b/lib/sequin/sinks/nats/jetstream_client.ex new file mode 100644 index 000000000..80e692255 --- /dev/null +++ b/lib/sequin/sinks/nats/jetstream_client.ex @@ -0,0 +1,187 @@ +defmodule Sequin.Sinks.Nats.JetstreamClient do + @moduledoc false + @behaviour Sequin.Sinks.NatsJetstream + + alias Sequin.Consumers.NatsJetstreamSink + alias Sequin.Consumers.SinkConsumer + alias Sequin.Error + alias Sequin.NetworkUtils + alias Sequin.Runtime.Routing.RoutedMessage + alias Sequin.Sinks.Nats.ConnectionCache + alias Sequin.Sinks.NatsJetstream + + require Logger + + @impl NatsJetstream + def send_messages(%SinkConsumer{sink: %NatsJetstreamSink{} = sink} = _consumer, messages) when is_list(messages) do + with {:ok, connection} <- ConnectionCache.connection(sink) do + Enum.reduce_while(messages, :ok, fn message, :ok -> + case publish_message(sink, message, connection) do + :ok -> {:cont, :ok} + {:error, error} -> {:halt, {:error, error}} + end + end) + end + end + + @test_timeout 5_000 + + @impl NatsJetstream + def test_connection(%NatsJetstreamSink{} = sink) do + with :ok <- + NetworkUtils.test_tcp_reachability( + sink.host, + sink.port, + NatsJetstreamSink.ipv6?(sink), + to_timeout(second: 10) + ), + {:ok, connection} <- ConnectionCache.connection(sink) do + verify_jetstream_enabled(connection) + end + catch + :exit, error -> + {:error, to_sequin_error(error)} + end + + @impl NatsJetstream + def ensure_stream(%NatsJetstreamSink{} = sink) do + with {:ok, connection} <- ConnectionCache.connection(sink) do + ensure_stream_with_connection(connection, sink) + end + end + + defp ensure_stream_with_connection(connection, sink) do + stream_name = sink.stream_name || "SEQUIN" + + case Gnat.Jetstream.API.Stream.info(connection, stream_name) do + {:ok, _info} -> + :ok + + {:error, %{"code" => 404}} -> + body = Jason.encode!(%{name: stream_name, subjects: ["sequin.>"]}) + + case Gnat.request(connection, "$JS.API.STREAM.CREATE.#{stream_name}", body, receive_timeout: @test_timeout) do + {:ok, %{body: response_body}} -> + case Jason.decode(response_body) do + {:ok, %{"error" => error}} -> + {:error, + Error.service( + service: :nats, + message: "Failed to create JetStream stream \"#{stream_name}\": #{inspect(error)}" + )} + + {:ok, _} -> + Logger.info("Created JetStream stream \"#{stream_name}\" with subject filter \"sequin.>\"") + :ok + + {:error, _} -> + :ok + end + + {:error, error} -> + {:error, + Error.service( + service: :nats, + message: "Failed to create JetStream stream \"#{stream_name}\": #{inspect(error)}" + )} + end + + {:error, error} -> + {:error, + Error.service( + service: :nats, + message: "Failed to check JetStream stream \"#{stream_name}\": #{inspect(error)}" + )} + end + end + + defp verify_jetstream_enabled(connection) do + case Gnat.request(connection, "$JS.API.INFO", "", receive_timeout: @test_timeout) do + {:ok, %{body: body}} -> + case Jason.decode(body) do + {:ok, %{"error" => %{"description" => desc}}} -> + {:error, Error.service(service: :nats, message: "JetStream is not enabled: #{desc}")} + + {:ok, _info} -> + :ok + + {:error, _} -> + :ok + end + + {:error, :timeout} -> + {:error, + Error.service( + service: :nats, + message: + "Failed to verify JetStream is enabled: no response after #{@test_timeout}ms. Verify JetStream is enabled on your NATS server" + )} + + {:error, _} -> + {:error, + Error.service( + service: :nats, + message: "Failed to verify JetStream is enabled. Verify JetStream is enabled on your NATS server" + )} + end + end + + defp publish_message( + %NatsJetstreamSink{publish_timeout_ms: timeout}, + %RoutedMessage{routing_info: %{subject: subject, headers: headers}, transformed_message: transformed_message}, + connection + ) do + list_headers = + case headers do + %{} -> + Map.to_list(headers) + + headers when is_list(headers) -> + headers + + _ -> + raise "Invalid headers shape. Only maps and lists of tuples are supported. Got: #{inspect(headers)}" + end + + opts = [headers: list_headers, receive_timeout: timeout] + + try do + case Gnat.request(connection, subject, Jason.encode_to_iodata!(transformed_message), opts) do + {:ok, %{body: body}} -> + case Jason.decode(body) do + {:ok, %{"error" => error}} -> + {:error, Error.service(service: :nats, message: "JetStream publish rejected: #{inspect(error)}")} + + {:ok, _ack} -> + :ok + + {:error, _} -> + :ok + end + + {:error, :timeout} -> + {:error, + Error.service( + service: :nats, + message: "JetStream publish timed out after #{timeout}ms" + )} + + {:error, error} -> + {:error, to_sequin_error(error)} + end + catch + error -> + {:error, to_sequin_error(error)} + end + end + + defp to_sequin_error(error) do + case error do + error when is_binary(error) -> + Error.service(service: :nats, message: "NATS JetStream error: #{error}") + + _ -> + Error.service(service: :nats, message: "Unknown NATS JetStream error") + end + end +end diff --git a/lib/sequin/transforms/transforms.ex b/lib/sequin/transforms/transforms.ex index ebd12dffc..f67b5d6de 100644 --- a/lib/sequin/transforms/transforms.ex +++ b/lib/sequin/transforms/transforms.ex @@ -16,6 +16,7 @@ defmodule Sequin.Transforms do alias Sequin.Consumers.KafkaSink alias Sequin.Consumers.KinesisSink alias Sequin.Consumers.MeilisearchSink + alias Sequin.Consumers.NatsJetstreamSink alias Sequin.Consumers.NatsSink alias Sequin.Consumers.PathFunction alias Sequin.Consumers.RabbitMqSink @@ -385,6 +386,22 @@ defmodule Sequin.Transforms do }) end + def to_external(%NatsJetstreamSink{} = sink, show_sensitive) do + reject_nil_values(%{ + type: "nats_jetstream", + host: sink.host, + port: sink.port, + username: sink.username, + password: SensitiveValue.new(sink.password, show_sensitive), + jwt: SensitiveValue.new(sink.jwt, show_sensitive), + nkey_seed: SensitiveValue.new(sink.nkey_seed, show_sensitive), + tls: sink.tls, + stream_name: sink.stream_name, + domain: sink.domain, + publish_timeout_ms: sink.publish_timeout_ms + }) + end + def to_external(%AzureEventHubSink{} = sink, show_sensitive) do reject_nil_values(%{ type: "azure_event_hub", @@ -1270,6 +1287,23 @@ defmodule Sequin.Transforms do }} end + defp parse_sink(%{"type" => "nats_jetstream"} = attrs, _resources) do + {:ok, + %{ + type: :nats_jetstream, + host: attrs["host"], + port: attrs["port"], + username: attrs["username"], + password: attrs["password"], + jwt: attrs["jwt"], + nkey_seed: attrs["nkey_seed"], + tls: attrs["tls"] || false, + stream_name: attrs["stream_name"], + domain: attrs["domain"], + publish_timeout_ms: attrs["publish_timeout_ms"] + }} + end + defp parse_sink(%{"type" => "gcp_pubsub"} = attrs, _resources) do {:ok, %{ diff --git a/lib/sequin_web/live/components/consumer_form.ex b/lib/sequin_web/live/components/consumer_form.ex index f8f34dc8e..084cbad66 100644 --- a/lib/sequin_web/live/components/consumer_form.ex +++ b/lib/sequin_web/live/components/consumer_form.ex @@ -16,6 +16,7 @@ defmodule SequinWeb.Components.ConsumerForm do alias Sequin.Consumers.KafkaSink alias Sequin.Consumers.KinesisSink alias Sequin.Consumers.MeilisearchSink + alias Sequin.Consumers.NatsJetstreamSink alias Sequin.Consumers.NatsSink alias Sequin.Consumers.RabbitMqSink alias Sequin.Consumers.RedisStreamSink @@ -44,6 +45,7 @@ defmodule SequinWeb.Components.ConsumerForm do alias Sequin.Sinks.Kafka alias Sequin.Sinks.Meilisearch.Client, as: MeilisearchClient alias Sequin.Sinks.Nats + alias Sequin.Sinks.NatsJetstream alias Sequin.Sinks.RabbitMq alias Sequin.Sinks.Redis alias Sequin.Sinks.Typesense.Client, as: TypesenseClient @@ -239,6 +241,12 @@ defmodule SequinWeb.Components.ConsumerForm do {:error, error} -> {:reply, %{ok: false, error: error}, socket} end + :nats_jetstream -> + case test_nats_jetstream_connection(socket) do + :ok -> {:reply, %{ok: true}, socket} + {:error, error} -> {:reply, %{ok: false, error: error}, socket} + end + :sqs -> case test_sqs_connection(socket) do :ok -> @@ -592,6 +600,27 @@ defmodule SequinWeb.Components.ConsumerForm do end end + defp test_nats_jetstream_connection(socket) do + sink_changeset = + socket.assigns.changeset + |> Ecto.Changeset.get_field(:sink) + |> case do + %Ecto.Changeset{} = changeset -> changeset + %NatsJetstreamSink{} = sink -> NatsJetstreamSink.changeset(sink, %{}) + end + + if sink_changeset.valid? do + sink = Ecto.Changeset.apply_changes(sink_changeset) + + case NatsJetstream.test_connection(sink) do + :ok -> :ok + {:error, error} -> {:error, Exception.message(error)} + end + else + {:error, encode_errors(sink_changeset)} + end + end + defp test_azure_event_hub_connection(socket) do sink_changeset = socket.assigns.changeset @@ -878,6 +907,22 @@ defmodule SequinWeb.Components.ConsumerForm do } end + defp decode_sink(:nats_jetstream, sink) do + %{ + "type" => "nats_jetstream", + "host" => sink["host"], + "port" => sink["port"], + "username" => sink["username"], + "password" => sink["password"], + "jwt" => sink["jwt"], + "nkey_seed" => sink["nkey_seed"], + "tls" => sink["tls"], + "stream_name" => sink["stream_name"], + "domain" => sink["domain"], + "publish_timeout_ms" => sink["publish_timeout_ms"] + } + end + defp decode_sink(:sequin_stream, _sink) do %{ "type" => "sequin_stream" @@ -1149,6 +1194,22 @@ defmodule SequinWeb.Components.ConsumerForm do } end + defp encode_sink(%NatsJetstreamSink{} = sink) do + %{ + "type" => "nats_jetstream", + "host" => sink.host, + "port" => sink.port, + "username" => sink.username, + "password" => sink.password, + "jwt" => sink.jwt, + "nkey_seed" => sink.nkey_seed, + "tls" => sink.tls, + "stream_name" => sink.stream_name, + "domain" => sink.domain, + "publish_timeout_ms" => sink.publish_timeout_ms + } + end + defp encode_sink(%SequinStreamSink{}) do %{ "type" => "sequin_stream" @@ -1455,6 +1516,7 @@ defmodule SequinWeb.Components.ConsumerForm do :sequin_stream -> "Sequin Stream Sink" :gcp_pubsub -> "GCP Pub/Sub Sink" :nats -> "NATS Sink" + :nats_jetstream -> "NATS JetStream Sink" :rabbitmq -> "RabbitMQ Sink" :azure_event_hub -> "Azure Event Hub Sink" :typesense -> "Typesense Sink" @@ -1487,6 +1549,7 @@ defmodule SequinWeb.Components.ConsumerForm do :sequin_stream -> {%SequinStreamSink{}, %{}} :gcp_pubsub -> {%GcpPubsubSink{}, %{message_grouping: false, batch_size: 100}} :nats -> {%NatsSink{}, %{}} + :nats_jetstream -> {%NatsJetstreamSink{}, %{}} :rabbitmq -> {%RabbitMqSink{virtual_host: "/"}, %{}} :azure_event_hub -> {%AzureEventHubSink{}, %{}} :typesense -> {%TypesenseSink{}, %{}} diff --git a/lib/sequin_web/live/sink_consumers/show.ex b/lib/sequin_web/live/sink_consumers/show.ex index 0fad970fa..72e7bd893 100644 --- a/lib/sequin_web/live/sink_consumers/show.ex +++ b/lib/sequin_web/live/sink_consumers/show.ex @@ -21,6 +21,7 @@ defmodule SequinWeb.SinkConsumersLive.Show do alias Sequin.Consumers.KafkaSink alias Sequin.Consumers.KinesisSink alias Sequin.Consumers.MeilisearchSink + alias Sequin.Consumers.NatsJetstreamSink alias Sequin.Consumers.NatsSink alias Sequin.Consumers.PathFunction alias Sequin.Consumers.RabbitMqSink @@ -972,6 +973,23 @@ defmodule SequinWeb.SinkConsumersLive.Show do } end + defp encode_sink(%SinkConsumer{sink: %NatsJetstreamSink{} = sink}) do + %{ + type: :nats_jetstream, + host: sink.host, + port: sink.port, + username: sink.username, + password: sink.password, + tls: sink.tls, + nkey_seed: sink.nkey_seed, + jwt: sink.jwt, + stream_name: sink.stream_name, + domain: sink.domain, + publish_timeout_ms: sink.publish_timeout_ms, + connection_id: sink.connection_id + } + end + defp encode_sink(%SinkConsumer{sink: %RabbitMqSink{} = sink} = consumer) do database_name = consumer.postgres_database.name @@ -1402,6 +1420,7 @@ defmodule SequinWeb.SinkConsumersLive.Show do defp consumer_title(%{sink: %{type: :http_push}}), do: "Webhook Sink" defp consumer_title(%{sink: %{type: :kafka}}), do: "Kafka Sink" defp consumer_title(%{sink: %{type: :nats}}), do: "NATS Sink" + defp consumer_title(%{sink: %{type: :nats_jetstream}}), do: "NATS JetStream Sink" defp consumer_title(%{sink: %{type: :rabbitmq}}), do: "RabbitMQ Sink" defp consumer_title(%{sink: %{type: :redis_stream}}), do: "Redis Stream Sink" defp consumer_title(%{sink: %{type: :redis_string}}), do: "Redis String Sink" diff --git a/test/sequin/nats_jetstream_pipeline_test.exs b/test/sequin/nats_jetstream_pipeline_test.exs new file mode 100644 index 000000000..d1f555e40 --- /dev/null +++ b/test/sequin/nats_jetstream_pipeline_test.exs @@ -0,0 +1,135 @@ +defmodule Sequin.Runtime.NatsJetstreamPipelineTest do + use Sequin.DataCase, async: true + + alias Sequin.Consumers + alias Sequin.Error + alias Sequin.Factory.ConsumersFactory + alias Sequin.Runtime.SinkPipeline + alias Sequin.Sinks.NatsJetstreamMock + + describe "message handling" do + setup do + stub(NatsJetstreamMock, :ensure_stream, fn _sink -> :ok end) + + consumer = + ConsumersFactory.insert_sink_consumer!( + type: :nats_jetstream, + sink: %{ + type: :nats_jetstream, + host: "localhost", + port: 4222, + stream_name: "sequin" + } + ) + + {:ok, %{consumer: consumer}} + end + + test "successfully publishes event messages to NATS JetStream", %{consumer: consumer} do + expect(NatsJetstreamMock, :send_messages, fn _consumer, messages -> + assert length(messages) == 1 + message = hd(messages) + assert String.ends_with?(message.routing_info.subject, "insert") + :ok + end) + + event = + ConsumersFactory.consumer_event( + consumer_id: consumer.id, + action: :insert + ) + + ref = send_test_event(consumer, event) + + assert_receive {:ack, ^ref, [%{data: %{data: %{action: :insert}}}], []}, 1_000 + + refute Consumers.reload(event) + end + + @tag capture_log: true + test "handles NATS JetStream publish failures", %{consumer: consumer} do + expect(NatsJetstreamMock, :send_messages, fn _consumer, _messages -> + {:error, Error.service(service: :nats, code: "publish_error", message: "JetStream publish failed")} + end) + + event = + ConsumersFactory.consumer_event( + consumer_id: consumer.id, + action: :insert + ) + + ref = send_test_event(consumer, event) + + assert_receive {:ack, ^ref, [], [_failed]}, 2_000 + end + + test "batches multiple messages together", %{consumer: consumer} do + consumer = %{consumer | batch_size: 2} + + expect(NatsJetstreamMock, :send_messages, fn _consumer, messages -> + assert length(messages) == 2 + + assert Enum.map(messages, fn message -> + message.routing_info.subject |> String.split(".") |> List.last() + end) == ["insert", "update"] + + :ok + end) + + event1 = + ConsumersFactory.consumer_event( + consumer_id: consumer.id, + action: :insert + ) + + event2 = + ConsumersFactory.consumer_event( + consumer_id: consumer.id, + action: :update + ) + + ref = send_test_batch(consumer, [event1, event2]) + + assert_receive {:ack, ^ref, + [ + %{data: %{data: %{action: :insert}}}, + %{data: %{data: %{action: :update}}} + ], []}, + 1_000 + end + end + + defp send_test_event(consumer, event) do + start_supervised!( + {SinkPipeline, + [ + consumer_id: consumer.id, + producer: Broadway.DummyProducer, + test_pid: self() + ]} + ) + + Broadway.test_message( + SinkPipeline.via_tuple(consumer.id), + event, + metadata: %{topic: "test_topic", headers: []} + ) + end + + defp send_test_batch(consumer, events) do + start_supervised!( + {SinkPipeline, + [ + consumer_id: consumer.id, + producer: Broadway.DummyProducer, + test_pid: self() + ]} + ) + + Broadway.test_batch( + SinkPipeline.via_tuple(consumer.id), + events, + metadata: %{topic: "test_topic", headers: []} + ) + end +end diff --git a/test/support/factory/consumers_factory.ex b/test/support/factory/consumers_factory.ex index e0f6876e9..3a3d1839c 100644 --- a/test/support/factory/consumers_factory.ex +++ b/test/support/factory/consumers_factory.ex @@ -16,6 +16,7 @@ defmodule Sequin.Factory.ConsumersFactory do alias Sequin.Consumers.KafkaSink alias Sequin.Consumers.KinesisSink alias Sequin.Consumers.MeilisearchSink + alias Sequin.Consumers.NatsJetstreamSink alias Sequin.Consumers.NatsSink alias Sequin.Consumers.RabbitMqSink alias Sequin.Consumers.RedisStreamSink @@ -272,6 +273,18 @@ defmodule Sequin.Factory.ConsumersFactory do ) end + defp sink(:nats_jetstream, _account_id, attrs) do + merge_attributes( + %NatsJetstreamSink{ + type: :nats_jetstream, + host: "localhost", + port: 4222, + stream_name: "sequin" + }, + attrs + ) + end + defp sink(:rabbitmq, _account_id, attrs) do merge_attributes( %RabbitMqSink{ diff --git a/test/support/mocks.ex b/test/support/mocks.ex index 95034e64c..7033c3e64 100644 --- a/test/support/mocks.ex +++ b/test/support/mocks.ex @@ -10,6 +10,10 @@ Mox.defmock(Sequin.Sinks.NatsMock, for: Sequin.Sinks.Nats ) +Mox.defmock(Sequin.Sinks.NatsJetstreamMock, + for: Sequin.Sinks.NatsJetstream +) + Mox.defmock(Sequin.Sinks.RabbitMqMock, for: Sequin.Sinks.RabbitMq ) From 34495d8885e2604321760e8e1da45690e1637a21 Mon Sep 17 00:00:00 2001 From: Rob Cook Date: Fri, 26 Jun 2026 09:33:36 -0400 Subject: [PATCH 2/2] Update README with NATS JetStream sink reference. --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 5a91a25fc..ddf93420e 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,7 @@ Sequin works great for change data capture use cases like: | Kinesis | [Kinesis](https://sequinstream.com/docs/quickstart/kinesis) | [Reference](https://sequinstream.com/docs/reference/sinks/kinesis) | Send messages to Amazon Kinesis streams queues | | Meilisearch | [Meilisearch](https://sequinstream.com/docs/quickstart/meilisearch) | [Reference](https://sequinstream.com/docs/reference/sinks/meilisearch) | Index database changes with Meilisearch | | NATS | [NATS](https://sequinstream.com/docs/quickstart/nats) | [Reference](https://sequinstream.com/docs/reference/sinks/nats) | Stream changes to NATS subjects | +| NATS JetStream | [NATS JetStream](https://sequinstream.com/docs/quickstart/nats-jetstream) | [Reference](https://sequinstream.com/docs/reference/sinks/nats-jetstream) | Stream changes to NATS JetStream | | RabbitMQ | [RabbitMQ](https://sequinstream.com/docs/quickstart/rabbitmq) | [Reference](https://sequinstream.com/docs/reference/sinks/rabbitmq) | Publish messages to RabbitMQ exchanges | | Redis Stream | [Redis Stream](https://sequinstream.com/docs/quickstart/redis-stream) | [Reference](https://sequinstream.com/docs/reference/sinks/redis-stream) | `XADD` to Redis Streams | | Redis String | [Redis String](https://sequinstream.com/docs/quickstart/redis-string) | [Reference](https://sequinstream.com/docs/reference/sinks/redis-string) | `SET` to Redis keys |