From e30efa62ab576e035312e4c361f7ca3254c922f3 Mon Sep 17 00:00:00 2001 From: Nicola Moretto Date: Thu, 19 Mar 2026 09:30:36 +0100 Subject: [PATCH 1/6] Remove references to indexer CLI --- docs/products/fast_data_v2/concepts.mdx | 2 +- .../farm_data/20_Configuration.mdx | 3 +- .../fast_data_v2/farm_data/30_Usage.md | 42 +------------------ 3 files changed, 3 insertions(+), 44 deletions(-) diff --git a/docs/products/fast_data_v2/concepts.mdx b/docs/products/fast_data_v2/concepts.mdx index def2d7125e..6407fa6cca 100644 --- a/docs/products/fast_data_v2/concepts.mdx +++ b/docs/products/fast_data_v2/concepts.mdx @@ -57,7 +57,7 @@ modification that occurred to the record. In particular, the message contains - the value of the record before the modification occurred (available in `UPDATE` and `DELETE` operations) - the value of the record after the modification occurred - (available in `CREATE` and `UPDATE` operations) + (available in `READ`, `CREATE` and `UPDATE` operations) - an optional metadata describing the data source that contains the record and when the modification occurred on it - an optional metadata describing when the modification was diff --git a/docs/products/fast_data_v2/farm_data/20_Configuration.mdx b/docs/products/fast_data_v2/farm_data/20_Configuration.mdx index 16ee20164e..649df858f3 100644 --- a/docs/products/fast_data_v2/farm_data/20_Configuration.mdx +++ b/docs/products/fast_data_v2/farm_data/20_Configuration.mdx @@ -144,8 +144,7 @@ The `persistence` configuration defines how aggregated data is stored. Currently, `PersistenceConfig` can only be of type `mongo`. The `MongoConfig` requires a `url`, which is a `Secret`, and can optionally specify a `database`. -For proper functioning, sink collections need to be created. The `indexer` CLI can -be run to automatically create the necessary indexes. +For proper functioning, sink collections need to be created. Example of connection configuration: diff --git a/docs/products/fast_data_v2/farm_data/30_Usage.md b/docs/products/fast_data_v2/farm_data/30_Usage.md index 117fd119d0..26bc147e10 100644 --- a/docs/products/fast_data_v2/farm_data/30_Usage.md +++ b/docs/products/fast_data_v2/farm_data/30_Usage.md @@ -10,11 +10,7 @@ To use the plugin, the following requirements must be met: - Kafka connection must have permission to read and write the topics declared in the configuration file; - Kafka topics must exist on the Kafka cluster, with the appropriate number of partitions (which constrain service replicas), retention and replication factor; -- MongoDB collections must be defined on the MongoDB cluster with the necessary indexes; - to aid in the generation of them it is possible to exploit provided tools, such as - the `indexer` command of `the_world_outside` CLI. This command connects to the - configured persistence layer and, analyzing the aggregation graph, automatically - generate the recommended indexes for your use case. +- MongoDB collections must be defined on the MongoDB cluster with the necessary indexes. ## Messages Spec @@ -228,42 +224,6 @@ the indexes defined: } ``` -### Indexer CLI - -To aid in the creation of `__sink` collections alongside their indexes, the -`the_world_outside` CLI is provided. It reads a Farm Data configuration file, -connects to the defined persistence storage and generate the needed collections -and their indexes. When generating indexes, it accounts also for complex filters, -so that lookup queries are correctly supported. - -Secrets resolution is handled automatically. The secret just need to be reachable -from the CLI, either from an environmental variable or from a file. - -The CLI can be installed using the following command: - -```shell -npm install -g @mia-platform-internal/the-world-outside -``` - -and launched as show below: - -```shell -two indexer -c -``` - -In the `-c` argument it is necessary to give the path to a directory folder which -holds the Farm Data configuration file (either named `farm-data.json` or `config.json`). - -:::warning - -In order to use `two` CLI it is necessary to own a Mia-Platform subscription and -have access to the private repository. - -Furthermore, since the CLI directly connects to your MongoDB cluster, please ensure that -you can connect to it from your machine. - -::: - ## Aggregation Graph The aggregation graph is a [Direct Acyclic Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) From bfa86c2e17bc7cdc17c28be68d359f8830bbaf17 Mon Sep 17 00:00:00 2001 From: Nicola Moretto Date: Thu, 19 Mar 2026 10:09:38 +0100 Subject: [PATCH 2/6] Add Kafka page --- README.md | 2 +- .../farm_data/20_Configuration.mdx | 132 ++------------ .../fast_data_v2/farm_data/30_Usage.md | 2 +- docs/products/fast_data_v2/kafka.md | 165 ++++++++++++++++++ .../fast_data_v2/kango/20_Configuration.mdx | 33 +--- docs/products/fast_data_v2/kango/30_Usage.md | 2 +- .../fast_data_v2/mongezium_cdc/30_Usage.md | 2 +- .../stream_processor/20_Configuration.mdx | 4 + .../fast_data_v2/stream_processor/30_Usage.md | 2 +- 9 files changed, 196 insertions(+), 148 deletions(-) create mode 100644 docs/products/fast_data_v2/kafka.md diff --git a/README.md b/README.md index 9453ca37c1..f72eb8beaa 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ We believe in the open source community, we think it is important to give back a The documentation site is built using [Docusaurus 2](https://v2.docusaurus.io/); to develop locally you need: -- Node 16+ +- Node 24+ To setup node, please if possible try to use [nvm][nvm], so you can manage multiple versions easily. diff --git a/docs/products/fast_data_v2/farm_data/20_Configuration.mdx b/docs/products/fast_data_v2/farm_data/20_Configuration.mdx index 649df858f3..ab09c5e745 100644 --- a/docs/products/fast_data_v2/farm_data/20_Configuration.mdx +++ b/docs/products/fast_data_v2/farm_data/20_Configuration.mdx @@ -64,47 +64,8 @@ is an object where each key can represent a specific Kafka consumer configuratio Additional properties for Kafka consumer configuration can be found in the [librdkafka](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html) documentation. -:::warning -When configuring the service, it is important to configure the following Kafka consumer properties: - -```text -queued.min.messages -queued.max.messages.kbytes -``` -Since they regulate the amount of memory consumed by the service, these values should be -tuned depending on the resources (CPU and memory) that can assigned to the service -and the size of the underlying database. Indeed, those external factors affect the processing -throughput; therefore, retaining more messages in the consumer fetch queue than -the amount the service can process may risk to waste service memory. - -For example, when using a MongoDB M30 instance with a maximum of 2400 IOPS for read+write, -we recommend to start by setting these values to: - -```text -queued.min.messages=1000 -queued.max.messages.kbytes=16384 -``` -::: - -:::info -When configuring consumers, it is important to know that different configurations (e.g. `group.id`) -would trigger the creation of different consumer clients. This may be necessary to enable -the same stream to be aggregated in different manners. - -Please, notice that the instantiation of additional consumers may require to increase service -memory requests and limits, since these consumers would have their own internal fetch queue. -::: - :::note -The following Kafka consumer properties are not configurable: - -- `allow.auto.create.topics` → `"false"` -- `enable.auto.commit` → `"false"` - -The first parameter is included to enforce user responsibility over topics creation, -so that the proper configurations, such as number of partitions, replication factor -and retention policy are set. In addition, the latter property disables the driver -auto-commit feature in favour of an ad-hoc internal logic. +`queued.min.messages` and `queued.max.messages.kbytes` must be tuned to balance memory usage and throughput. Different `group.id` values create independent consumer groups, each with its own fetch queue. The following properties are hardcoded and not configurable: `allow.auto.create.topics` → `"false"` and `enable.auto.commit` → `"false"`. See [Consumer Configuration](/products/fast_data_v2/kafka.md#consumer-configuration) and [Consumer Queue Tuning](/products/fast_data_v2/kafka.md#consumer-queue-tuning) in the Kafka Reference for details. ::: ### Producer @@ -118,19 +79,7 @@ The `producer` configuration defines how the system produces data. Currently, Additional properties for Kafka producer configuration can be found in the [librdkafka](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html) documentation. :::note -Kafka producer is configured to compress messages by default using [`snappy`](https://en.wikipedia.org/wiki/Snappy_(compression)) algorithm. -This enables reducing the disk space consumed on the Kafka broker. - -Furthermore, the following properties are not configurable: - -- `allow.auto.create.topics` → `"false"` -- `enable.idempotence` → `"true"` -- `acks` → `"all"` - -The first parameter is included to enforce user responsibility over topics creation, -so that the proper configurations, such as number of partitions, replication factor -and retention policy are set. In addition, the latter properties ensure that -no duplicated messages are produced on Kafka brokers. +The producer uses [`snappy`](https://en.wikipedia.org/wiki/Snappy_(compression)) compression by default to reduce disk space on the Kafka broker. `allow.auto.create.topics`, `enable.idempotence`, and `acks` are hardcoded and not configurable. See [Producer Configuration](/products/fast_data_v2/kafka.md#producer-configuration) in the Kafka Reference for details. ::: ### Processor @@ -222,32 +171,9 @@ will be added in the future. ### Recommended Kafka Configuration -When configuring Kafka consumer, it is advised to set appropriate values for -constraining the consumer internal queue. In this manner: - -- the maximum amount of memory employed by the service can be finely tuned to avoid - wasting resources, since only the number of messages that can effectively be - processed in real-time should be pulled in memory; -- it is ensured that consumer continuously poll the broker to avoid it exiting the - consumer group, since a lower number of buffered messages can trigger a new fetch to - replenish it; +For a full explanation of the consumer queue tuning parameters, see [Consumer Queue Tuning](/products/fast_data_v2/kafka.md#consumer-queue-tuning) in the Kafka Reference. -The main values to tune are: - -- `queued.max.messages.kbytes`: maximum number of kilobytes of queued pre-fetched - messages in the local consumer queue; -- `queued.min.messages`: minimum number of messages per topic+partition _librdkafka_ - tries to maintain in the local consumer queue; - -It is recommended to set `queued.min.messages` to a value greater, but close to the -average message consumption rate. It is possible to observer: - -- `kafka_consumer_rx_msgs_total` → messages read -- `farm_data_processed_msg` → total number of processed messages - -to check the average values. - -For _Farm Data_ service, an example of configuration can be the following one: +**Starting values for Farm Data input consumers:** ```json { @@ -256,30 +182,20 @@ For _Farm Data_ service, an example of configuration can be the following one: } ``` -Another important property that might need to be tuned is `fetch.message.max.bytes`, -which however should be set only in case `queued.max.messages.kbytes` is set to -a value lower than `1024`. +For a MongoDB M30 cluster (2400 IOPS), a more specific recommendation is: + +```json +{ + "queued.min.messages": "1000", + "queued.max.messages.kbytes": "16384" +} +``` #### Internal Updates Consumer -`internal-updates` consumer requires an **ad-hoc consumer configuration** due -to its messages' uniqueness. In fact, `internal-update` messages are very small -(in the bytes range), but they trigger a larger computation that may require different -milliseconds to complete. - -Due to the combination of these factors, using the default queue parameters or even the ones -adopted for the input streams is not recommended. Indeed, the Kafka consumer tries -to fetch and buffer a large amount of events, since they are small, but it takes a considered -amount of time to clear them from the queue. This prevents the consumer from fetching newer -messages within the constraint set by `max.poll.interval.ms` (a default interval of 5 minutes). -Once that time elapses, the consumer instance is considered dead by the Kafka broker and -forces it to leave the group, triggering a restart of the service since its events stream has -terminated. - -To prevent this unwanted situation that hinders the advancement of events processing, it -has been observed that modifying the consumer parameters can improve the stability of the -service itself. Thus, below are provided the recommended configuration to apply to the -Kafka consumer of the `internal-updates` configuration: +The `internal-updates` consumer requires a **dedicated configuration** because its messages are very small (bytes range) but trigger heavier computations. Using the same queue parameters as the regular input consumers causes the consumer to fail to poll within `max.poll.interval.ms` (default: 5 minutes), resulting in eviction from the consumer group and a service restart. + +Apply the following configuration to the `internal-updates` consumer: ```json { @@ -289,26 +205,12 @@ Kafka consumer of the `internal-updates` configuration: } ``` -As it can be observed, here also `fetch.message.max.bytes` parameter has changed, since -it governs how many bytes are fetched per topic+partition the first time the consumer -connects. Consequently, leaving the default value of 1MB would lead to a behavior where -the service starts, aggregates events for about 5 minutes and then it restarts because -it has been forced out of the consumer group. - :::danger - -When a consumer instance is forced out of its consumer group, such instance may not have -the chance to commit the work it has already carried out. Thus, setting -the proper values is fundamental to guaranteed service stability and progress in -consuming messages from Kafka topic. - +When a consumer instance is evicted from its consumer group, it may not have the chance to commit the work it has already carried out. Setting the proper values is fundamental for service stability and message-processing progress. ::: :::note - -`queued.max.messages.kbytes` value use **KBytes** unit, whereas `fetch.message.max.bytes` -use **bytes** unit. Thus, the latter appears larger, though it isn't. - +`queued.max.messages.kbytes` uses **KB** units; `fetch.message.max.bytes` uses **bytes**. Despite the apparent difference in magnitude, both values above are intentional. ::: ## Kubernetes diff --git a/docs/products/fast_data_v2/farm_data/30_Usage.md b/docs/products/fast_data_v2/farm_data/30_Usage.md index 26bc147e10..690ca1b615 100644 --- a/docs/products/fast_data_v2/farm_data/30_Usage.md +++ b/docs/products/fast_data_v2/farm_data/30_Usage.md @@ -9,7 +9,7 @@ sidebar_label: Usage To use the plugin, the following requirements must be met: - Kafka connection must have permission to read and write the topics declared in the configuration file; -- Kafka topics must exist on the Kafka cluster, with the appropriate number of partitions (which constrain service replicas), retention and replication factor; +- Kafka topics must exist on the Kafka cluster with the appropriate configuration (partitions, retention, replication factor); see [Topics](/products/fast_data_v2/kafka.md#topics) in the Kafka Reference; - MongoDB collections must be defined on the MongoDB cluster with the necessary indexes. ## Messages Spec diff --git a/docs/products/fast_data_v2/kafka.md b/docs/products/fast_data_v2/kafka.md new file mode 100644 index 0000000000..0907c8b476 --- /dev/null +++ b/docs/products/fast_data_v2/kafka.md @@ -0,0 +1,165 @@ +--- +id: kafka +title: Kafka +sidebar_label: Kafka +--- + +Fast Data v2 uses [Apache Kafka](https://kafka.apache.org/) as its streaming backbone. All workloads — Mongezium, Stream Processor, Farm Data, and Kango — communicate through Kafka topics. This page is the central reference for Kafka concepts and cross-service configuration guidance within Fast Data v2. + +:::tip New to Kafka? +If you are not yet familiar with Kafka, the [Apache Kafka introduction](https://kafka.apache.org/intro) is a good starting point. In short: Kafka is a distributed event-streaming platform where services exchange messages through durable, replayable channels called _topics_. +::: + +## Overview + +Each Fast Data v2 workload plays a distinct role in the pipeline: + +| Workload | Reads from Kafka | Writes to Kafka | +|---|---|---| +| [Mongezium](/products/fast_data_v2/mongezium_cdc/10_Overview.md) | — | Produces CDC change events onto topics | +| [Stream Processor](/products/fast_data_v2/stream_processor/10_Overview.md) | ✓ Input topic | ✓ Output topic | +| [Farm Data](/products/fast_data_v2/farm_data/10_Overview.md) | ✓ Multiple input topics + internal updates topic | ✓ Output topic + internal updates topic | +| [Kango](/products/fast_data_v2/kango/10_Overview.md) | ✓ Input topic | — | + +Kafka topics act as durable, replayable buffers that decouple producers from consumers, allowing pipeline components to be deployed, scaled, and upgraded independently. + +## Topics + +Before deploying any Fast Data v2 workload, all Kafka topics it reads from or writes to **must already exist**. Auto-creation of topics is disabled across all workloads to enforce intentional configuration. + +:::warning +All Fast Data v2 workloads have `allow.auto.create.topics` hardcoded to `"false"`. Topics must be created with the proper configuration before starting the services. +::: + +Each topic must be configured with: + +- **Partitions**: The number of partitions directly constrains the maximum number of service replicas that can process in parallel. Each partition is consumed by at most one consumer within the same consumer group. Plan your partition count based on the desired degree of parallelism. +- **Retention**: Set a retention policy that covers your expected reprocessing or replay window. For CDC pipelines, ensure retention is long enough to survive planned infrastructure downtime without data loss. +- **Replication factor**: Set a replication factor ≥ 2 for fault tolerance in production environments. + +### Internal Updates Topic (Farm Data) + +Farm Data requires a dedicated internal updates topic in addition to its regular input and output topics. This topic is used for internal coordination within the aggregation pipeline and requires a specific consumer configuration distinct from the regular input consumers. See [Farm Data Configuration — Internal Updates](/products/fast_data_v2/farm_data/20_Configuration.mdx#internal-updates) for details, and [Consumer Queue Tuning](#consumer-queue-tuning) below for the recommended configuration values. + +## Connections Configuration + +All Fast Data v2 services expose a `connections` map in their `config.json` for centralizing Kafka (and MongoDB) connection details. A Kafka connection entry looks like this: + +```json +{ + "connections": { + "kafka": { + "type": "kafka", + "config": { + "url": "" + } + } + } +} +``` + +The connection name (here `"kafka"`) is referenced by consumer and producer configurations via the `connectionName` field. + +The `url` field — and other connection properties — support [**secret resolution**](/products/fast_data_v2/secrets_resolution.md), so you can inject credentials at runtime without storing sensitive data in plain text: + +```json +{ + "connections": { + "kafka": { + "type": "kafka", + "config": { + "url": { + "type": "env", + "key": "KAFKA_CONNECTION_STRING" + } + } + } + } +} +``` + +Additional connection-level properties follow the [librdkafka configuration](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html) format. + +## Consumer Configuration + +### Required Properties + +| Property | Description | +|---|---| +| `group.id` | **Required.** Consumer group identifier. Consumers sharing the same `group.id` divide the topic partitions among themselves. | +| `client.id` | Unique identifier for this consumer instance. Recommended for observability and diagnostics. | + +:::info Consumer Groups +Different `group.id` values create independent consumer groups, each processing the full topic independently. This allows the same stream to be consumed by multiple services simultaneously (e.g. different aggregation pipelines reading the same source topic). + +Be aware that each additional consumer group requires its own internal fetch queue, which increases memory usage. Adjust your service's memory requests and limits accordingly. +::: + +### Offset Management + +| Property | Default | Description | +|---|---|---| +| `auto.offset.reset` | `earliest` | Behavior when no committed offset exists for the consumer group. `earliest` starts from the oldest available message; `latest` starts from newly arriving messages only. | +| `commitIntervalMs` | `500ms` | Interval between manual offset commits (Fast Data v2-specific field). Minimum is `0`. When changing this value, keep it consistent across all consumers within a service. | +| `enable.auto.commit` | `false` (hardcoded) | All Fast Data v2 workloads manage offset commits internally. This property cannot be overridden. | + +### Consumer Queue Tuning + +The following librdkafka properties control how many messages each consumer prefetches into local memory. Tuning them is important to avoid out-of-memory issues and consumer group evictions: + +| Property | Description | +|---|---| +| `queued.max.messages.kbytes` | Maximum kilobytes of pre-fetched messages in the local consumer queue. | +| `queued.min.messages` | Minimum number of messages per topic+partition that librdkafka tries to maintain locally. | + +Setting these values too high wastes memory. Setting them too low risks the consumer being evicted from its consumer group if pollingdoes not happen frequently enough within `max.poll.interval.ms` (default: 5 minutes). + +To determine appropriate values, observe the following metrics: + +- `kafka_consumer_rx_msgs_total` → messages read per unit of time +- `ka_flushed_messages` → messages written to the persistence layer + +Then set `queued.min.messages` slightly above the observed average consumption rate. + +**Recommended starting values by service:** + +| Service | Consumer | `queued.min.messages` | `queued.max.messages.kbytes` | `fetch.message.max.bytes` | +|---|---|---|---|---| +| Farm Data | Input topics (MongoDB M30) | `1000` | `16384` | *(default)* | +| Farm Data | Internal updates | `160` | `96` | `40320` | +| Kango | Input topic (MongoDB M50) | `5000` | `32840` | *(default)* | +| Stream Processor | Input topic | `5000` | `32840` | *(default)* | + +:::note +`queued.max.messages.kbytes` uses **KB** as its unit, while `fetch.message.max.bytes` uses **bytes**. Adjust `fetch.message.max.bytes` only if `queued.max.messages.kbytes` is set below `1024`. +::: + +## Producer Configuration + +### Fixed Properties + +The following producer properties are hardcoded across all Fast Data v2 workloads and cannot be overridden: + +| Property | Value | Reason | +|---|---|---| +| `allow.auto.create.topics` | `"false"` | Topics must be created manually with the correct partition, retention, and replication settings. | +| `enable.idempotence` | `"true"` | Prevents duplicate messages from being produced to the broker. | +| `acks` | `"all"` | Requires acknowledgement from all in-sync replicas before a write is considered successful. | + +### Compression + +Farm Data compresses produced messages using the [`snappy`](https://en.wikipedia.org/wiki/Snappy_(compression)) algorithm by default, reducing disk space consumed on the Kafka broker. Stream Processor also supports setting `compression.type` in its producer configuration. + +## Tombstone Events + +Kafka supports _tombstone events_: messages with a non-null key but a `null` (0-byte) payload. In Fast Data v2: + +- **Farm Data**: ignores tombstone events received on input topics. When processing a `delete` (`d`) operation, it produces a tombstone on the output topic (in the default Read-Delete output mode). +- **Kango**: receives tombstones and handles them according to its configured write mode. +- **Stream Processor**: receives tombstones as a `null` payload; the processing function can handle them explicitly. + +See the [Fast Data Message Format](/products/fast_data_v2/concepts.mdx#fast-data-message-format) for the full event schema and operation types (`c`, `r`, `u`, `d`). + +## Schema Registry + +Stream Processor supports a `jsonWithSchema` deserialization strategy for payloads produced with a Kafka schema registry. In this mode, the actual message data is expected under a `payload` subkey. See [Stream Processor — Input Payload Deserialization](/products/fast_data_v2/stream_processor/30_Usage.md#input-payload-deserialization) for configuration details. diff --git a/docs/products/fast_data_v2/kango/20_Configuration.mdx b/docs/products/fast_data_v2/kango/20_Configuration.mdx index cb5543385a..8fdb86d005 100644 --- a/docs/products/fast_data_v2/kango/20_Configuration.mdx +++ b/docs/products/fast_data_v2/kango/20_Configuration.mdx @@ -67,32 +67,9 @@ a central Control Plane instance will be added in the future. ## Recommended Kafka Configuration -When configuring Kafka consumer, it is advised to set appropriate values for -constraining the consumer internal queue. In this manner: +For a full explanation of the consumer queue tuning parameters and how to determine appropriate values, see [Consumer Queue Tuning](/products/fast_data_v2/kafka.md#consumer-queue-tuning) in the Kafka Reference. -- the maximum amount of memory employed by the service can be finely tuned to avoid - wasting resources, since only the number of messages that can effectively be - processed in real-time should be pulled in memory; -- it is ensured that consumer continuously poll the broker to avoid it exiting the - consumer group, since a lower number of buffered messages can trigger a new fetch to - replenish it; - -The main values to tune are: - -- `queued.max.messages.kbytes`: maximum number of kilobytes of queued pre-fetched - messages in the local consumer queue; -- `queued.min.messages`: minimum number of messages per topic+partition _librdkafka_ - tries to maintain in the local consumer queue; - -It is recommended to set `queued.min.messages` to a value greater, but close to the -average message consumption rate. It is possible to observer: - -- `kafka_consumer_rx_msgs_total` → messages read -- `ka_flushed_messages` → messages written to the persistence layer - -to check the average values. - -For _Kango_ service, when connected to a MongoDB M50 cluster instance, an example of configuration can be the following one: +For _Kango_ connected to a MongoDB M50 cluster, the following starting values are recommended: ```json { @@ -101,9 +78,9 @@ For _Kango_ service, when connected to a MongoDB M50 cluster instance, an exampl } ``` -Another important property that might need to be tuned is `fetch.message.max.bytes`, -which however should be changed only in case `queued.max.messages.kbytes` is set to -a value lower than `1024`. +:::note +`fetch.message.max.bytes` should only be adjusted if `queued.max.messages.kbytes` is set below `1024`. +::: ## Kubernetes diff --git a/docs/products/fast_data_v2/kango/30_Usage.md b/docs/products/fast_data_v2/kango/30_Usage.md index 18ff12f97c..c68a516158 100644 --- a/docs/products/fast_data_v2/kango/30_Usage.md +++ b/docs/products/fast_data_v2/kango/30_Usage.md @@ -9,7 +9,7 @@ sidebar_label: Usage To use the application, the following requirements must be met: - Kafka connection must have permission to read the topic declared in the configuration file; -- Kafka topic must exist on the Kafka cluster, with the appropriate number of partitions (which constrain service replicas), retention and replication factor; +- Kafka topic must exist on the Kafka cluster with the appropriate configuration (partitions, retention, replication factor); see [Topics](/products/fast_data_v2/kafka.md#topics) in the Kafka Reference; - MongoDB collection must be defined on the MongoDB cluster with the necessary indexes; in particular, all the fields of the message key should belong to a unique index, which would ensure record uniqueness on the database ## Write Mode diff --git a/docs/products/fast_data_v2/mongezium_cdc/30_Usage.md b/docs/products/fast_data_v2/mongezium_cdc/30_Usage.md index f0fde5bdb3..8fca0a9e40 100644 --- a/docs/products/fast_data_v2/mongezium_cdc/30_Usage.md +++ b/docs/products/fast_data_v2/mongezium_cdc/30_Usage.md @@ -37,7 +37,7 @@ To use the application, the following requirements must be met: - MongoDB must be in replica-set. - the connection string must have privileges to access the `oplog` and the `admin` collection. More specifically, it needs permission to enable `changeStreamPreAndPostImages` on the collection of the configured database; -- Kafka connection must have permission to read/write the topics declared in the `collectionMappings` registry; +- Kafka connection must have permission to read/write the topics declared in the `collectionMappings` registry; topics must be pre-created with the appropriate configuration — see [Topics](/products/fast_data_v2/kafka.md#topics) in the Kafka Reference; - both collections and topics must be defined in the MongoDB cluster and the Kafka Cluster, respectively. ## Messages Spec diff --git a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx index f162ee5a72..08981b5872 100644 --- a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx +++ b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx @@ -123,6 +123,8 @@ The `consumer` field configures the input stream source. Currently supports: } ``` +For details on consumer group behavior, offset management, and queue tuning recommendations, see [Consumer Configuration](/products/fast_data_v2/kafka.md#consumer-configuration) in the Kafka Reference. + #### Producer Configuration The `producer` field configures the output stream destination. Currently supports: @@ -149,6 +151,8 @@ The `producer` field configures the output stream destination. Currently support } ``` +For details on producer fixed properties and compression settings, see [Producer Configuration](/products/fast_data_v2/kafka.md#producer-configuration) in the Kafka Reference. + #### Processor Configuration The `processor` field configures the processing engine. Currently supports: diff --git a/docs/products/fast_data_v2/stream_processor/30_Usage.md b/docs/products/fast_data_v2/stream_processor/30_Usage.md index e59914edab..b6c70fbe7a 100644 --- a/docs/products/fast_data_v2/stream_processor/30_Usage.md +++ b/docs/products/fast_data_v2/stream_processor/30_Usage.md @@ -9,7 +9,7 @@ sidebar_label: Usage To use the plugin, the following requirements must be met: - Kafka connection must have permission to read and write the topics declared in the configuration file; -- Kafka topics must exist on the Kafka cluster, with the appropriate number of partitions (which constrain service replicas), retention and replication factor; +- Kafka topics must exist on the Kafka cluster with the appropriate configuration (partitions, retention, replication factor); see [Topics](/products/fast_data_v2/kafka.md#topics) in the Kafka Reference; - when enabling the `cache` feature, it is necessary to provide a connection to a MongoDB cluster and the ad-hoc collection should be created with the appropriate indexes; From 88d3a4c440815bf6d59156ef090704f1013dec64 Mon Sep 17 00:00:00 2001 From: Nicola Moretto Date: Thu, 19 Mar 2026 10:11:38 +0100 Subject: [PATCH 3/6] Update sidebar --- sidebars.json | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/sidebars.json b/sidebars.json index a017d49b38..562838df1b 100644 --- a/sidebars.json +++ b/sidebars.json @@ -2166,9 +2166,20 @@ "type": "doc" }, { - "id": "products/fast_data_v2/architecture", + "type": "category", "label": "Architecture", - "type": "doc" + "collapsed": false, + "link": { + "id": "products/fast_data_v2/architecture", + "type": "doc" + }, + "items": [ + { + "id": "products/fast_data_v2/kafka", + "label": "Kafka", + "type": "doc" + } + ] }, { "type": "category", @@ -2322,4 +2333,4 @@ ] } ] -} +} \ No newline at end of file From 9b5d201077fb90cc1f4361f8f3afcecc033712e5 Mon Sep 17 00:00:00 2001 From: Nicola Moretto Date: Fri, 20 Mar 2026 15:15:27 +0100 Subject: [PATCH 4/6] Refine Kafka doc --- .../farm_data/20_Configuration.mdx | 16 +-- docs/products/fast_data_v2/kafka.md | 109 ++++++++++++------ .../stream_processor/20_Configuration.mdx | 29 +---- 3 files changed, 74 insertions(+), 80 deletions(-) diff --git a/docs/products/fast_data_v2/farm_data/20_Configuration.mdx b/docs/products/fast_data_v2/farm_data/20_Configuration.mdx index ab09c5e745..7b15b0db48 100644 --- a/docs/products/fast_data_v2/farm_data/20_Configuration.mdx +++ b/docs/products/fast_data_v2/farm_data/20_Configuration.mdx @@ -62,11 +62,7 @@ is an object where each key can represent a specific Kafka consumer configuratio with a default of 500ms and a minimum of 0. In case the need to change this value should arise, we recommend to assign the same value to all the consumers. -Additional properties for Kafka consumer configuration can be found in the [librdkafka](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html) documentation. - -:::note -`queued.min.messages` and `queued.max.messages.kbytes` must be tuned to balance memory usage and throughput. Different `group.id` values create independent consumer groups, each with its own fetch queue. The following properties are hardcoded and not configurable: `allow.auto.create.topics` → `"false"` and `enable.auto.commit` → `"false"`. See [Consumer Configuration](/products/fast_data_v2/kafka.md#consumer-configuration) and [Consumer Queue Tuning](/products/fast_data_v2/kafka.md#consumer-queue-tuning) in the Kafka Reference for details. -::: +Additional information about Kafka consumer configuration can be found in the [Kafka reference page](/products/fast_data_v2/kafka.md#consumer-configuration) or in the [librdkafka](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html) documentation. ### Producer @@ -76,11 +72,7 @@ The `producer` configuration defines how the system produces data. Currently, * `connectionName`: The name of the Kafka connection to use from the `connections` map. * `topic`: The name of the Kafka topic to which the producer will send messages. -Additional properties for Kafka producer configuration can be found in the [librdkafka](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html) documentation. - -:::note -The producer uses [`snappy`](https://en.wikipedia.org/wiki/Snappy_(compression)) compression by default to reduce disk space on the Kafka broker. `allow.auto.create.topics`, `enable.idempotence`, and `acks` are hardcoded and not configurable. See [Producer Configuration](/products/fast_data_v2/kafka.md#producer-configuration) in the Kafka Reference for details. -::: +Additional properties for Kafka producer configuration can be found in the [Kafka reference page](/products/fast_data_v2/kafka.md#producer-configuration) or in the [librdkafka](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html) documentation. ### Processor @@ -205,10 +197,6 @@ Apply the following configuration to the `internal-updates` consumer: } ``` -:::danger -When a consumer instance is evicted from its consumer group, it may not have the chance to commit the work it has already carried out. Setting the proper values is fundamental for service stability and message-processing progress. -::: - :::note `queued.max.messages.kbytes` uses **KB** units; `fetch.message.max.bytes` uses **bytes**. Despite the apparent difference in magnitude, both values above are intentional. ::: diff --git a/docs/products/fast_data_v2/kafka.md b/docs/products/fast_data_v2/kafka.md index 0907c8b476..e3d2184227 100644 --- a/docs/products/fast_data_v2/kafka.md +++ b/docs/products/fast_data_v2/kafka.md @@ -4,22 +4,24 @@ title: Kafka sidebar_label: Kafka --- -Fast Data v2 uses [Apache Kafka](https://kafka.apache.org/) as its streaming backbone. All workloads — Mongezium, Stream Processor, Farm Data, and Kango — communicate through Kafka topics. This page is the central reference for Kafka concepts and cross-service configuration guidance within Fast Data v2. +Fast Data v2 uses [Apache Kafka](https://kafka.apache.org/) as its streaming backbone. All workloads — [Mongezium][mongezium], [Stream Processor][stream-processor], [Farm Data][farm-data], and [Kango][kango] — communicate through Kafka topics. This page is the central reference for Kafka concepts and cross-service configuration guidance within Fast Data v2. :::tip New to Kafka? -If you are not yet familiar with Kafka, the [Apache Kafka introduction](https://kafka.apache.org/intro) is a good starting point. In short: Kafka is a distributed event-streaming platform where services exchange messages through durable, replayable channels called _topics_. + +If you are not familiar with Kafka, the [Apache Kafka introduction](https://kafka.apache.org/intro) is a good starting point. In short: Kafka is a distributed event-streaming platform where services exchange messages through durable, replayable channels called _topics_. + ::: ## Overview Each Fast Data v2 workload plays a distinct role in the pipeline: -| Workload | Reads from Kafka | Writes to Kafka | -|---|---|---| -| [Mongezium](/products/fast_data_v2/mongezium_cdc/10_Overview.md) | — | Produces CDC change events onto topics | -| [Stream Processor](/products/fast_data_v2/stream_processor/10_Overview.md) | ✓ Input topic | ✓ Output topic | -| [Farm Data](/products/fast_data_v2/farm_data/10_Overview.md) | ✓ Multiple input topics + internal updates topic | ✓ Output topic + internal updates topic | -| [Kango](/products/fast_data_v2/kango/10_Overview.md) | ✓ Input topic | — | +| Workload | Reads from Kafka | Writes to Kafka | +| ------------------------------------ | ------------------------------------------------ | --------------------------------------- | +| [Mongezium][mongezium] | — | Produces CDC change events onto topics | +| [Stream Processor][stream-processor] | ✓ One input topic | ✓ One output topic | +| [Farm Data][farm-data] | ✓ Multiple input topics + internal updates topic | ✓ Output topic + internal updates topic | +| [Kango][kango] | ✓ Input topic | — | Kafka topics act as durable, replayable buffers that decouple producers from consumers, allowing pipeline components to be deployed, scaled, and upgraded independently. @@ -28,7 +30,9 @@ Kafka topics act as durable, replayable buffers that decouple producers from con Before deploying any Fast Data v2 workload, all Kafka topics it reads from or writes to **must already exist**. Auto-creation of topics is disabled across all workloads to enforce intentional configuration. :::warning + All Fast Data v2 workloads have `allow.auto.create.topics` hardcoded to `"false"`. Topics must be created with the proper configuration before starting the services. + ::: Each topic must be configured with: @@ -43,7 +47,7 @@ Farm Data requires a dedicated internal updates topic in addition to its regular ## Connections Configuration -All Fast Data v2 services expose a `connections` map in their `config.json` for centralizing Kafka (and MongoDB) connection details. A Kafka connection entry looks like this: +All Fast Data v2 services expose a `connections` map in their `config.json` for centralizing Kafka connection details. A Kafka connection entry looks like this: ```json { @@ -78,41 +82,56 @@ The `url` field — and other connection properties — support [**secret resolu } ``` -Additional connection-level properties follow the [librdkafka configuration](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html) format. +Additional connection-level properties follow the [librdkafka configuration][librdkafka] format. ## Consumer Configuration ### Required Properties -| Property | Description | -|---|---| -| `group.id` | **Required.** Consumer group identifier. Consumers sharing the same `group.id` divide the topic partitions among themselves. | -| `client.id` | Unique identifier for this consumer instance. Recommended for observability and diagnostics. | +| Property | Required | Description | +| ----------- | -------- | -------------------------------------------------------------------------------------------------------------- | +| `group.id` | Yes | Consumer group identifier. Consumers sharing the same `group.id` divide the topic partitions among themselves. | +| `client.id` | No | Unique identifier for this consumer instance. Recommended for observability and diagnostics. | :::info Consumer Groups + Different `group.id` values create independent consumer groups, each processing the full topic independently. This allows the same stream to be consumed by multiple services simultaneously (e.g. different aggregation pipelines reading the same source topic). Be aware that each additional consumer group requires its own internal fetch queue, which increases memory usage. Adjust your service's memory requests and limits accordingly. + ::: ### Offset Management -| Property | Default | Description | -|---|---|---| -| `auto.offset.reset` | `earliest` | Behavior when no committed offset exists for the consumer group. `earliest` starts from the oldest available message; `latest` starts from newly arriving messages only. | -| `commitIntervalMs` | `500ms` | Interval between manual offset commits (Fast Data v2-specific field). Minimum is `0`. When changing this value, keep it consistent across all consumers within a service. | -| `enable.auto.commit` | `false` (hardcoded) | All Fast Data v2 workloads manage offset commits internally. This property cannot be overridden. | +| Property | Default | Description | +| -------------------- | ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `auto.offset.reset` | `earliest` | Behavior when no committed offset exists for the consumer group. `earliest` starts from the oldest available message; `latest` starts from newly arriving messages only. | +| `commitIntervalMs` | `500ms` | Interval between manual offset commits (Fast Data v2-specific field). Minimum is `0`. When changing this value, keep it consistent across all consumers within a service. | +| `enable.auto.commit` | `false` (hardcoded) | All Fast Data v2 workloads manage offset commits internally. This property cannot be overridden. | ### Consumer Queue Tuning -The following librdkafka properties control how many messages each consumer prefetches into local memory. Tuning them is important to avoid out-of-memory issues and consumer group evictions: +When configuring Kafka consumers, it is recommended to set appropriate values for +constraining the consumer internal queue. In this manner: + +- the maximum amount of memory employed by the service can be finely tuned to avoid + wasting resources, since only the number of messages that can effectively be + processed in real-time should be pulled in memory; +- it is ensured that consumer continuously poll the broker to avoid it exiting the + consumer group, since a lower number of buffered messages can trigger a new fetch to + replenish it. + +The following [librdkafka][librdkafka] properties control how many messages each consumer prefetches into local memory. Tuning them is important to avoid out-of-memory issues and consumer group evictions. -| Property | Description | -|---|---| -| `queued.max.messages.kbytes` | Maximum kilobytes of pre-fetched messages in the local consumer queue. | -| `queued.min.messages` | Minimum number of messages per topic+partition that librdkafka tries to maintain locally. | +| Property | Description | +| ---------------------------- | ------------------------------------------------------------------------------------------------------- | +| `queued.max.messages.kbytes` | Maximum kilobytes of pre-fetched messages in the local consumer queue. | +| `queued.min.messages` | Minimum number of messages per topic+partition that [librdkafka][librdkafka] tries to maintain locally. | -Setting these values too high wastes memory. Setting them too low risks the consumer being evicted from its consumer group if pollingdoes not happen frequently enough within `max.poll.interval.ms` (default: 5 minutes). +Setting these values too high wastes memory. Setting them too low risks the consumer being evicted from its consumer group if polling does not happen frequently enough within `max.poll.interval.ms` (default: 5 minutes). + +When a consumer instance is evicted from its consumer group, it may not have the chance to commit the work it has already carried out. Setting the proper +values is fundamental for service stability and message-processing progress. To determine appropriate values, observe the following metrics: @@ -121,17 +140,23 @@ To determine appropriate values, observe the following metrics: Then set `queued.min.messages` slightly above the observed average consumption rate. -**Recommended starting values by service:** +Another important property that might need to be tuned is `fetch.message.max.bytes`, +which however should be changed only in case `queued.max.messages.kbytes` is set to +a value lower than `1024`. + +#### Recommended consumer configurations by service -| Service | Consumer | `queued.min.messages` | `queued.max.messages.kbytes` | `fetch.message.max.bytes` | -|---|---|---|---|---| -| Farm Data | Input topics (MongoDB M30) | `1000` | `16384` | *(default)* | -| Farm Data | Internal updates | `160` | `96` | `40320` | -| Kango | Input topic (MongoDB M50) | `5000` | `32840` | *(default)* | -| Stream Processor | Input topic | `5000` | `32840` | *(default)* | +| Service | Consumer | `queued.min.messages` | `queued.max.messages.kbytes` | `fetch.message.max.bytes` | +| ------------------------------------ | -------------------------- | --------------------- | ---------------------------- | ------------------------- | +| [Farm Data][farm-data] | Input topics (MongoDB M30) | `1000` | `16384` | *(default)* | +| [Farm Data][farm-data] | Internal updates | `160` | `96` | `40320` | +| [Kango][kango] | Input topic (MongoDB M50) | `5000` | `32840` | *(default)* | +| [Stream Processor][stream-processor] | Input topic | `5000` | `32840` | *(default)* | :::note + `queued.max.messages.kbytes` uses **KB** as its unit, while `fetch.message.max.bytes` uses **bytes**. Adjust `fetch.message.max.bytes` only if `queued.max.messages.kbytes` is set below `1024`. + ::: ## Producer Configuration @@ -140,15 +165,15 @@ Then set `queued.min.messages` slightly above the observed average consumption r The following producer properties are hardcoded across all Fast Data v2 workloads and cannot be overridden: -| Property | Value | Reason | -|---|---|---| +| Property | Value | Reason | +| -------------------------- | --------- | ------------------------------------------------------------------------------------------------ | | `allow.auto.create.topics` | `"false"` | Topics must be created manually with the correct partition, retention, and replication settings. | -| `enable.idempotence` | `"true"` | Prevents duplicate messages from being produced to the broker. | -| `acks` | `"all"` | Requires acknowledgement from all in-sync replicas before a write is considered successful. | +| `enable.idempotence` | `"true"` | Prevents duplicate messages from being produced to the broker. | +| `acks` | `"all"` | Requires acknowledgement from all in-sync replicas before a write is considered successful. | ### Compression -Farm Data compresses produced messages using the [`snappy`](https://en.wikipedia.org/wiki/Snappy_(compression)) algorithm by default, reducing disk space consumed on the Kafka broker. Stream Processor also supports setting `compression.type` in its producer configuration. +Farm Data compresses produced messages using the [`snappy`](https://en.wikipedia.org/wiki/Snappy_(compression)) algorithm by default, reducing disk space consumed on the Kafka broker. [Stream Processor][stream-processor] also supports setting `compression.type` in its producer configuration. ## Tombstone Events @@ -162,4 +187,12 @@ See the [Fast Data Message Format](/products/fast_data_v2/concepts.mdx#fast-data ## Schema Registry -Stream Processor supports a `jsonWithSchema` deserialization strategy for payloads produced with a Kafka schema registry. In this mode, the actual message data is expected under a `payload` subkey. See [Stream Processor — Input Payload Deserialization](/products/fast_data_v2/stream_processor/30_Usage.md#input-payload-deserialization) for configuration details. +[Stream Processor][stream-processor] supports a `jsonWithSchema` deserialization strategy for payloads produced with a Kafka schema registry. In this mode, the actual message data is expected under a `payload` subkey. See [Stream Processor — Input Payload Deserialization](/products/fast_data_v2/stream_processor/30_Usage.md#input-payload-deserialization) for configuration details. + + +[mongezium]: /products/fast_data_v2/mongezium_cdc/10_Overview.md +[stream-processor]: /products/fast_data_v2/stream_processor/10_Overview.md +[farm-data]: /products/fast_data_v2/farm_data/10_Overview.md +[kango]: /products/fast_data_v2/kango/10_Overview.md + +[librdkafka]: https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html \ No newline at end of file diff --git a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx index 08981b5872..4f134ee5de 100644 --- a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx +++ b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx @@ -249,31 +249,6 @@ and a central Control Plane instance will be added in the future. ## Recommended Kafka Configuration -When configuring Kafka consumer, it is advised to set appropriate values for -constraining the consumer internal queue. In this manner: - -- the maximum amount of memory employed by the service can be finely tuned to avoid - wasting resources, since only the number of messages that can effectively be - processed in real-time should be pulled in memory; -- it is ensured that consumer continuously poll the broker to avoid it exiting the - consumer group, since a lower number of buffered messages can trigger a new fetch to - replenish it; - -The main values to tune are: - -- `queued.max.messages.kbytes`: maximum number of kilobytes of queued pre-fetched - messages in the local consumer queue; -- `queued.min.messages`: minimum number of messages per topic+partition _librdkafka_ - tries to maintain in the local consumer queue; - -It is recommended to set `queued.min.messages` to a value greater, but close to the -average message consumption rate. It is possible to observer: - -- `kafka_consumer_rx_msgs_total` → messages read -- `sp_processed_messages` → total number of processed messages - -to check the average values. - For _Stream Processor_ service, an example of configuration can be the following one: ```json @@ -283,9 +258,7 @@ For _Stream Processor_ service, an example of configuration can be the following } ``` -Another important property that might need to be tuned is `fetch.message.max.bytes`, -which however should be changed only in case `queued.max.messages.kbytes` is set to -a value lower than `1024`. +For additional information about Kafka consumer configuration and performance tuning, please check the [Kafka reference page](/products/fast_data_v2/kafka.md#consumer-configuration). ## Kubernetes From c515d2bf953c7e074fe8967ea9d6e1b0bd55f146 Mon Sep 17 00:00:00 2001 From: Nicola Moretto Date: Fri, 20 Mar 2026 15:28:38 +0100 Subject: [PATCH 5/6] Restore some delete Kafka details --- docs/products/fast_data_v2/kafka.md | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/docs/products/fast_data_v2/kafka.md b/docs/products/fast_data_v2/kafka.md index e3d2184227..9ebe854e5b 100644 --- a/docs/products/fast_data_v2/kafka.md +++ b/docs/products/fast_data_v2/kafka.md @@ -43,7 +43,15 @@ Each topic must be configured with: ### Internal Updates Topic (Farm Data) -Farm Data requires a dedicated internal updates topic in addition to its regular input and output topics. This topic is used for internal coordination within the aggregation pipeline and requires a specific consumer configuration distinct from the regular input consumers. See [Farm Data Configuration — Internal Updates](/products/fast_data_v2/farm_data/20_Configuration.mdx#internal-updates) for details, and [Consumer Queue Tuning](#consumer-queue-tuning) below for the recommended configuration values. +Farm Data requires a dedicated internal updates topic in addition to its regular input and output topics. This topic is used for internal coordination within the aggregation pipeline and requires a specific consumer configuration distinct from the regular input consumers due to its messages' uniqueness. + +In fact, `internal-update` messages are very small (in the bytes range), but they trigger a larger computation that may require different milliseconds to complete. + +Due to the combination of these factors, using the default queue parameters or even the ones adopted for the input streams is not recommended. Indeed, the Kafka consumer tries to fetch and buffer a large amount of events, since they are small, but it takes a considered amount of time to clear them from the queue. This prevents the consumer from fetching newer messages within the constraint set by `max.poll.interval.ms` (a default interval of 5 minutes). Once that time elapses, the consumer instance is considered dead by the Kafka broker and forces it to leave the group, triggering a restart of the service since its events stream has terminated. + +To prevent this unwanted situation that hinders the advancement of events processing, it has been observed that modifying the consumer parameters can improve the stability of the service itself. + +See [Farm Data Configuration — Internal Updates](/products/fast_data_v2/farm_data/20_Configuration.mdx#internal-updates) for details, and [Consumer Queue Tuning](#consumer-queue-tuning) below for the recommended configuration values. ## Connections Configuration @@ -111,15 +119,10 @@ Be aware that each additional consumer group requires its own internal fetch que ### Consumer Queue Tuning -When configuring Kafka consumers, it is recommended to set appropriate values for -constraining the consumer internal queue. In this manner: +When configuring Kafka consumers, it is recommended to set appropriate values for constraining the consumer internal queue. In this manner: -- the maximum amount of memory employed by the service can be finely tuned to avoid - wasting resources, since only the number of messages that can effectively be - processed in real-time should be pulled in memory; -- it is ensured that consumer continuously poll the broker to avoid it exiting the - consumer group, since a lower number of buffered messages can trigger a new fetch to - replenish it. +- the maximum amount of memory employed by the service can be finely tuned to avoid wasting resources, since only the number of messages that can effectively be processed in real-time should be pulled in memory; +- it is ensured that consumer continuously poll the broker to avoid it exiting the consumer group, since a lower number of buffered messages can trigger a new fetch to replenish it. The following [librdkafka][librdkafka] properties control how many messages each consumer prefetches into local memory. Tuning them is important to avoid out-of-memory issues and consumer group evictions. @@ -128,6 +131,8 @@ The following [librdkafka][librdkafka] properties control how many messages each | `queued.max.messages.kbytes` | Maximum kilobytes of pre-fetched messages in the local consumer queue. | | `queued.min.messages` | Minimum number of messages per topic+partition that [librdkafka][librdkafka] tries to maintain locally. | +Since they regulate the amount of memory consumed by the service, these values should be tuned depending on the resources (CPU and memory) that can assigned to the service and the size of the underlying database. Indeed, those external factors affect the processing throughput; therefore, retaining more messages in the consumer fetch queue than the amount the service can process may risk to waste service memory. + Setting these values too high wastes memory. Setting them too low risks the consumer being evicted from its consumer group if polling does not happen frequently enough within `max.poll.interval.ms` (default: 5 minutes). When a consumer instance is evicted from its consumer group, it may not have the chance to commit the work it has already carried out. Setting the proper @@ -171,6 +176,8 @@ The following producer properties are hardcoded across all Fast Data v2 workload | `enable.idempotence` | `"true"` | Prevents duplicate messages from being produced to the broker. | | `acks` | `"all"` | Requires acknowledgement from all in-sync replicas before a write is considered successful. | +The first parameter is included to enforce user responsibility over topics creation, so that the proper configurations, such as number of partitions, replication factor and retention policy are set. In addition, the latter properties ensure that no duplicated messages are produced on Kafka brokers. + ### Compression Farm Data compresses produced messages using the [`snappy`](https://en.wikipedia.org/wiki/Snappy_(compression)) algorithm by default, reducing disk space consumed on the Kafka broker. [Stream Processor][stream-processor] also supports setting `compression.type` in its producer configuration. From bad75c334e8a96bd9ea8fec95ca170f4397f97ad Mon Sep 17 00:00:00 2001 From: Nicola Moretto Date: Fri, 20 Mar 2026 16:01:40 +0100 Subject: [PATCH 6/6] Add MongoDB page --- .../farm_data/20_Configuration.mdx | 26 +- .../fast_data_v2/farm_data/30_Usage.md | 52 +--- docs/products/fast_data_v2/kango/30_Usage.md | 2 +- .../fast_data_v2/mongezium_cdc/30_Usage.md | 2 +- docs/products/fast_data_v2/mongodb.md | 251 ++++++++++++++++++ .../stream_processor/20_Configuration.mdx | 4 +- sidebars.json | 5 + 7 files changed, 264 insertions(+), 78 deletions(-) create mode 100644 docs/products/fast_data_v2/mongodb.md diff --git a/docs/products/fast_data_v2/farm_data/20_Configuration.mdx b/docs/products/fast_data_v2/farm_data/20_Configuration.mdx index 7b15b0db48..6a69db7e6a 100644 --- a/docs/products/fast_data_v2/farm_data/20_Configuration.mdx +++ b/docs/products/fast_data_v2/farm_data/20_Configuration.mdx @@ -87,31 +87,7 @@ The `MongoConfig` requires a `url`, which is a `Secret`, and can optionally spec a `database`. For proper functioning, sink collections need to be created. -Example of connection configuration: - -```json -{ - // other properties - "persistence": { - "type": "mongo", - "config": { - "url": "mongodb://localhost:27017/farm-data", - "database": "/farm-data", - "appName": "eu.miaplatfor.farm-data.lakes" - } - }, - // other properties -} -``` - -:::warning - -Farm Data service heavily relies on the persistence layer to cache the current stream -state. Consequently, it is highly recommended to configure MongoDB instance to -actually sustain a heavy IO load, setting maximum IOPS to at least a value of 2400 IOPS (1200 read + 1200 write), -though higher values would only benefit the service throughput. - -::: +For detailed information on Farm Data's MongoDB persistence model, required indexes, IOPS requirements, and collection naming conventions, see [Farm Data — Aggregation Persistence](/products/fast_data_v2/mongodb.md#farm-data--aggregation-persistence) in the MongoDB Reference. #### Internal Updates diff --git a/docs/products/fast_data_v2/farm_data/30_Usage.md b/docs/products/fast_data_v2/farm_data/30_Usage.md index 690ca1b615..5a2e58490e 100644 --- a/docs/products/fast_data_v2/farm_data/30_Usage.md +++ b/docs/products/fast_data_v2/farm_data/30_Usage.md @@ -172,57 +172,9 @@ Output messages are compliant with [Fast Data message format](/products/fast_dat As explained in the [Configuration page](/products/fast_data_v2/farm_data/20_Configuration.mdx), Farm Data service processes events using a stateful model and therefore it needs to store intermediate results on persistent storage system. -Currently, Farm Data supports only MongoDB as external persistence system. +Currently, Farm Data supports only MongoDB as external persistence system. -### MongoDB Persistence - -When using MongoDB as persistence storage, Farm Data service needs the details -for connecting to a MongoDB cluster, which allows the service to create the -needed collections and store there the intermediate aggregated documents. - -In fact, the service creates on the selected database a collection for each -aggregation node (processing unit), which will store the intermediate results. -These collections are named following the pattern below: - -```text -__sink__ -``` - -where: - -- `__sink` is a constant prefix to signal that the collection is used internally; -- `` is the value of configuration field `id` that identifies the - specific aggregation process that is employing such collection. - Please, beware that this identifier MUST be between 8 and 16 characters and it - should satisfy MongoDB [collection name restrictions](https://www.mongodb.com/docs/manual/reference/limits/#mongodb-limit-Restriction-on-Collection-Names); -- `` is the name of a node in the aggregation graph; - -To support read/write operations over `__sink` collections, each of them should have -the indexes defined: - -- a **unique** index supporting the internal primary key of the record - ```json - { - "__pk": 1 - } - ``` -- an index for each `value` property of the current aggregation node, which - is employed for lookup operations by children nodes (in the aggregation graph). - For example: - ```json - { - "value.userId": 1 - } - ``` -- an index for each `dependency.*.__pk` property of the current aggregation node, which - is employed for lookup operations by current towards children nodes - (in the aggregation graph). - For example: - ```json - { - "dependency.posts.__pk": 1 - } - ``` +For complete details on how Farm Data uses MongoDB — including collection naming conventions, required indexes, IOPS requirements, and configuration examples — see [Farm Data — Aggregation Persistence](/products/fast_data_v2/mongodb.md#farm-data--aggregation-persistence) in the MongoDB Reference. ## Aggregation Graph diff --git a/docs/products/fast_data_v2/kango/30_Usage.md b/docs/products/fast_data_v2/kango/30_Usage.md index c68a516158..38124a21a9 100644 --- a/docs/products/fast_data_v2/kango/30_Usage.md +++ b/docs/products/fast_data_v2/kango/30_Usage.md @@ -10,7 +10,7 @@ To use the application, the following requirements must be met: - Kafka connection must have permission to read the topic declared in the configuration file; - Kafka topic must exist on the Kafka cluster with the appropriate configuration (partitions, retention, replication factor); see [Topics](/products/fast_data_v2/kafka.md#topics) in the Kafka Reference; -- MongoDB collection must be defined on the MongoDB cluster with the necessary indexes; in particular, all the fields of the message key should belong to a unique index, which would ensure record uniqueness on the database +- MongoDB collection must be defined on the MongoDB cluster with the necessary indexes; see [Kango — Output Sink](/products/fast_data_v2/mongodb.md#kango--output-sink) in the MongoDB Reference for index requirements and write mode details. ## Write Mode diff --git a/docs/products/fast_data_v2/mongezium_cdc/30_Usage.md b/docs/products/fast_data_v2/mongezium_cdc/30_Usage.md index 8fca0a9e40..394d78a544 100644 --- a/docs/products/fast_data_v2/mongezium_cdc/30_Usage.md +++ b/docs/products/fast_data_v2/mongezium_cdc/30_Usage.md @@ -35,7 +35,7 @@ will be performed. To use the application, the following requirements must be met: -- MongoDB must be in replica-set. +- MongoDB must be in replica-set; see [Mongezium — CDC Source](/products/fast_data_v2/mongodb.md#mongezium--cdc-source) in the MongoDB Reference for replica-set requirements, required privileges, and resume token behavior; - the connection string must have privileges to access the `oplog` and the `admin` collection. More specifically, it needs permission to enable `changeStreamPreAndPostImages` on the collection of the configured database; - Kafka connection must have permission to read/write the topics declared in the `collectionMappings` registry; topics must be pre-created with the appropriate configuration — see [Topics](/products/fast_data_v2/kafka.md#topics) in the Kafka Reference; - both collections and topics must be defined in the MongoDB cluster and the Kafka Cluster, respectively. diff --git a/docs/products/fast_data_v2/mongodb.md b/docs/products/fast_data_v2/mongodb.md new file mode 100644 index 0000000000..6b9a4787ab --- /dev/null +++ b/docs/products/fast_data_v2/mongodb.md @@ -0,0 +1,251 @@ +--- +id: mongodb +title: MongoDB +sidebar_label: MongoDB +--- + +Fast Data v2 uses [MongoDB](https://www.mongodb.com/) as its primary storage layer. Depending on the workload, MongoDB is used as a CDC source, an intermediate aggregation state store, a final output sink, or a stateful processing cache. This page is the central reference for MongoDB roles, connection configuration, index requirements, and operational guidance within Fast Data v2. + +:::tip New to MongoDB? +If you are not yet familiar with MongoDB, the [MongoDB introduction](https://www.mongodb.com/docs/manual/introduction/) is a good starting point. In short: MongoDB is a document-oriented database where data is stored in flexible, JSON-like documents grouped into collections, inside databases. +::: + +## Overview + +Each Fast Data v2 workload uses MongoDB in a different way: + +| Workload | MongoDB role | Direction | +|---|---|---| +| [Mongezium](/products/fast_data_v2/mongezium_cdc/10_Overview.md) | CDC **source** — captures change events from MongoDB collections and produces them onto Kafka topics | Reads from MongoDB | +| [Farm Data](/products/fast_data_v2/farm_data/10_Overview.md) | Aggregation **persistence** — stores intermediate aggregated documents used for stateful processing | Reads and writes | +| [Kango](/products/fast_data_v2/kango/10_Overview.md) | Output **sink** — persists Kafka records into MongoDB collections as the final destination | Writes to MongoDB | +| [Stream Processor](/products/fast_data_v2/stream_processor/10_Overview.md) | Optional stateful **cache** — stores processing state to enable stateful transformations across events | Reads and writes | + +## Connections Configuration + +All Fast Data v2 services that connect to MongoDB expose a `connections` map in their `config.json` for centralizing connection details. A MongoDB connection entry looks like this: + +```json +{ + "connections": { + "mongodb": { + "type": "mongodb", + "config": { + "url": "" + } + } + } +} +``` + +The connection name (here `"mongodb"`) is referenced from the persistence or cache configuration. + +The `url` field — and other connection properties — support [**secret resolution**](/products/fast_data_v2/secrets_resolution.md), so you can inject credentials at runtime without storing sensitive data in plain text: + +```json +{ + "connections": { + "mongodb": { + "type": "mongodb", + "config": { + "url": { + "type": "env", + "key": "MONGODB_CONNECTION_STRING" + } + } + } + } +} +``` + +:::note +The `appName` field is optional but recommended. It sets the application name reported to MongoDB in connection metadata, which helps identify which Fast Data workload is performing queries when inspecting Atlas metrics, `currentOp`, or slow query logs. +::: + +## Farm Data — Aggregation Persistence + +Farm Data uses MongoDB as its stateful persistence backend. The service stores intermediate aggregated documents in MongoDB between processing steps, enabling it to incrementally build the final output even when source events arrive out of order or from different streams. + +### How Farm Data uses MongoDB + +For each aggregation node defined in the aggregation graph, Farm Data automatically creates and manages a dedicated MongoDB collection named: + +```text +__sink__ +``` + +Where: + +- `__sink` is a constant prefix that signals the collection is used internally by Farm Data; +- `` is the value of the `id` configuration field identifying the aggregation process. This identifier **must be between 8 and 16 characters** and must comply with MongoDB [collection name restrictions](https://www.mongodb.com/docs/manual/reference/limits/#mongodb-limit-Restriction-on-Collection-Names); +- `` is the name of a node in the aggregation graph. + +### Required Indexes + +To support the read/write operations Farm Data performs on `__sink` collections, each collection must have the following indexes: + +1. **Unique index on the internal primary key:** + ```json + { "__pk": 1 } + ``` + This is a unique index supporting the internal primary key of the record. + +2. **Index on each `value` property** of the aggregation node (used for lookup by child nodes in the graph): + ```json + { "value.userId": 1 } + ``` + One index is needed per relevant `value.*` property. + +3. **Index on each `dependency.*.__pk` property** of the aggregation node (used for lookups from current towards children nodes): + ```json + { "dependency.posts.__pk": 1 } + ``` + One index is needed per dependency relationship. + +### IOPS Requirements + +Farm Data relies heavily on MongoDB's I/O capacity because it continuously reads and writes intermediate aggregated state. It is highly recommended to provision a MongoDB cluster capable of sustaining high I/O load: + +- **Minimum**: 2400 IOPS (1200 read + 1200 write) — corresponds roughly to a MongoDB Atlas M30 instance +- **Recommended**: higher IOPS values directly improve Farm Data throughput + +:::warning +Under-provisioning MongoDB IOPS is the most common cause of poor Farm Data performance. The service will not error — it will simply slow down proportionally to I/O saturation. +::: + +### Configuration Example + +```json +{ + "persistence": { + "type": "mongo", + "config": { + "url": "mongodb://localhost:27017/farm-data", + "database": "farm-data", + "appName": "eu.miaplatfor.farm-data.lakes" + } + } +} +``` + +For full persistence configuration details, see [Farm Data Configuration — Persistence](/products/fast_data_v2/farm_data/20_Configuration.mdx#persistence). + +## Kango — Output Sink + +Kango reads Kafka records and persists them into MongoDB collections. It acts as the final persistence step of a Fast Data pipeline, writing processed and aggregated data into the operational data store. + +### Write Modes + +Kango supports two write modes that control how documents are inserted or updated when a record for the same key already exists: + +| Mode | Behavior | +|---|---| +| `strict` *(default)* | Only fields from the `after` payload are **retained**. Insert operations act as _replace_ (unknown fields are discarded). Update operations _unset_ fields that existed in `before` but are absent from `after`. | +| `partial` | Fields from the `after` payload are **merged** onto the stored document. Insert operations act as _upserts_; updates apply only the changed fields. | + +### Required Indexes + +Kango requires a **unique index on all message key fields** for each target collection. This ensures document uniqueness — Kango uses this index to identify and upsert documents by their natural key. + +For example, if the message key is `{ "customerId": "123" }`, the target collection must have: + +```json +{ "customerId": 1 } +``` + +as a unique index. + +:::note +The unique index must cover all fields present in the Kafka message key, not just a subset. +::: + +For full configuration details, see [Kango Configuration](/products/fast_data_v2/kango/20_Configuration.mdx). + +## Stream Processor — Stateful Cache + +Stream Processor can optionally connect to MongoDB to enable stateful stream processing. When a MongoDB cache is configured, the processing function can store and retrieve per-key state across different events, enabling use cases like deduplication, aggregation, enrichment with persistent state, and event correlation. + +### Full CRUD with Optimistic Locking + +The MongoDB cache type supports all four operations: + +| Operation | Description | +|---|---| +| `get(key)` | Retrieves the value and version associated with the key. Returns `undefined` if not found. | +| `set(key, value)` | Creates a new cache entry. Throws `AlreadyExists` if the key already exists. | +| `update(key, value, version)` | Updates an existing entry using optimistic locking. Throws `NotFound` if absent; `ConcurrentModification` if the version doesn't match. | +| `delete(key)` | Removes the key and its value. Returns `undefined` if not found. | + +The `version` field returned by `get` must be passed to `update` to prevent concurrent modification issues when multiple instances process events in parallel. + +### Required Collection Setup + +Before using the MongoDB cache, the target collection must exist. No automatic index creation is performed by Stream Processor — ensure the collection is pre-created with appropriate indexes for your access patterns. + +### Configuration Example + +```json +{ + "caches": { + "customer-cache": { + "type": "mongodb", + "url": "mongodb://localhost:27017/fast-data", + "appName": "eu.miaplatform.fastdata.stream-processor", + "database": "fast-data", + "collection": "stream-processor-state" + } + } +} +``` + +For full cache configuration details, see [Stream Processor Configuration — Caches](/products/fast_data_v2/stream_processor/20_Configuration.mdx#caches-configuration-optional). + +## Mongezium — CDC Source + +Mongezium uses MongoDB as its data source. It listens to MongoDB [change streams](https://www.mongodb.com/docs/manual/changeStreams/) to capture every insert, update, and delete on configured collections, and publishes those change events as Fast Data messages onto Kafka topics. + +### Replica-Set Requirement + +MongoDB change streams require MongoDB to be running in **replica-set mode**. Standalone MongoDB instances are not supported. + +:::warning +If your MongoDB deployment is not in a replica-set, Mongezium cannot operate. This applies even in development environments — you can use a single-node replica-set. +::: + +### Required Privileges + +The MongoDB connection string used by Mongezium must grant: + +- Read access to the `oplog.rs` collection (to track change stream position via resume tokens) +- Read access to the `admin` database +- Permission to enable `changeStreamPreAndPostImages` on collections of the configured database + +### Resume Tokens + +Mongezium uses [resume tokens](https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens) to track its position in the change stream. The resume token of the last processed message is stored in the Kafka message header. At startup, Mongezium reads the latest Kafka message and resumes the change stream from that token. + +If the `oplog.rs` collection has been truncated and the resume token is no longer valid, Mongezium will open a new change stream from the current position (without performing a snapshot). To force a new snapshot in this situation, set the collection's `snapshot` field to `when_needed`. + +### Configuration Example (connections property) + +```json +{ + "connections": { + "mongodb": { + "type": "mongodb", + "config": { + "url": { + "type": "env", + "key": "MONGODB_CONNECTION_STRING" + } + } + } + } +} +``` + +:::warning +Defining MongoDB connection details directly in their respective configuration sections (rather than in the `connections` property) is **deprecated** for both Mongezium and Kango. Future versions will exclusively support the `connections` property. Migrate your configuration when upgrading to Mongezium ≥ v0.4.3 or Kango ≥ v0.5.2. +::: + +For full configuration details, see [Mongezium Configuration](/products/fast_data_v2/mongezium_cdc/20_Configuration.mdx). diff --git a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx index 4f134ee5de..e117dda75c 100644 --- a/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx +++ b/docs/products/fast_data_v2/stream_processor/20_Configuration.mdx @@ -182,12 +182,14 @@ use case cannot be supported. Each cache can be: -- **MongoDB Cache** (`type: "mongodb"`): Persistent cache backed by MongoDB +- **MongoDB Cache** (`type: "mongodb"`): Persistent cache backed by MongoDB, with full CRUD support and optimistic locking. - `url`: MongoDB connection string (supports secrets) - `collection`: collection name for cache storage - `database`: database name (optional) - `appName`: application name for MongoDB connection (optional - useful to track which program is carrying out queries on the database) + + For details on MongoDB cache capabilities, required collection setup, and optimistic locking behavior, see [Stream Processor — Stateful Cache](/products/fast_data_v2/mongodb.md#stream-processor--stateful-cache) in the MongoDB Reference. - **In-Memory Cache** (`type: "in-memory"`): Simple in-memory key-value storage. Use only when sharing the stream state across different service instances is not necessary. However, when it is possible use a sharable stream state. diff --git a/sidebars.json b/sidebars.json index 562838df1b..04fe50b828 100644 --- a/sidebars.json +++ b/sidebars.json @@ -2178,6 +2178,11 @@ "id": "products/fast_data_v2/kafka", "label": "Kafka", "type": "doc" + }, + { + "id": "products/fast_data_v2/mongodb", + "label": "MongoDB", + "type": "doc" } ] },