Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ We believe in the open source community, we think it is important to give back a

The documentation site is built using [Docusaurus 2](https://v2.docusaurus.io/); to develop locally you need:

- Node 16+
- Node 24+

To setup node, please if possible try to use [nvm][nvm], so you can manage
multiple versions easily.
Expand Down
2 changes: 1 addition & 1 deletion docs/products/fast_data_v2/concepts.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ modification that occurred to the record. In particular, the message contains
- the value of the record before the modification occurred
(available in `UPDATE` and `DELETE` operations)
- the value of the record after the modification occurred
(available in `CREATE` and `UPDATE` operations)
(available in `READ`, `CREATE` and `UPDATE` operations)
- an optional metadata describing the data source that contains
the record and when the modification occurred on it
- an optional metadata describing when the modification was
Expand Down
171 changes: 18 additions & 153 deletions docs/products/fast_data_v2/farm_data/20_Configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -62,50 +62,7 @@ is an object where each key can represent a specific Kafka consumer configuratio
with a default of 500ms and a minimum of 0. In case the need to change this value should arise,
we recommend to assign the same value to all the consumers.

Additional properties for Kafka consumer configuration can be found in the [librdkafka](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html) documentation.

:::warning
When configuring the service, it is important to configure the following Kafka consumer properties:

```text
queued.min.messages
queued.max.messages.kbytes
```
Since they regulate the amount of memory consumed by the service, these values should be
tuned depending on the resources (CPU and memory) that can assigned to the service
and the size of the underlying database. Indeed, those external factors affect the processing
throughput; therefore, retaining more messages in the consumer fetch queue than
the amount the service can process may risk to waste service memory.

For example, when using a MongoDB M30 instance with a maximum of 2400 IOPS for read+write,
we recommend to start by setting these values to:

```text
queued.min.messages=1000
queued.max.messages.kbytes=16384
```
:::

:::info
When configuring consumers, it is important to know that different configurations (e.g. `group.id`)
would trigger the creation of different consumer clients. This may be necessary to enable
the same stream to be aggregated in different manners.

Please, notice that the instantiation of additional consumers may require to increase service
memory requests and limits, since these consumers would have their own internal fetch queue.
:::

:::note
The following Kafka consumer properties are not configurable:

- `allow.auto.create.topics` → `"false"`
- `enable.auto.commit` → `"false"`

The first parameter is included to enforce user responsibility over topics creation,
so that the proper configurations, such as number of partitions, replication factor
and retention policy are set. In addition, the latter property disables the driver
auto-commit feature in favour of an ad-hoc internal logic.
:::
Additional information about Kafka consumer configuration can be found in the [Kafka reference page](/products/fast_data_v2/kafka.md#consumer-configuration) or in the [librdkafka](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html) documentation.

### Producer

Expand All @@ -115,23 +72,7 @@ The `producer` configuration defines how the system produces data. Currently,
* `connectionName`: The name of the Kafka connection to use from the `connections` map.
* `topic`: The name of the Kafka topic to which the producer will send messages.

Additional properties for Kafka producer configuration can be found in the [librdkafka](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html) documentation.

:::note
Kafka producer is configured to compress messages by default using [`snappy`](https://en.wikipedia.org/wiki/Snappy_(compression)) algorithm.
This enables reducing the disk space consumed on the Kafka broker.

Furthermore, the following properties are not configurable:

- `allow.auto.create.topics` → `"false"`
- `enable.idempotence` → `"true"`
- `acks` → `"all"`

The first parameter is included to enforce user responsibility over topics creation,
so that the proper configurations, such as number of partitions, replication factor
and retention policy are set. In addition, the latter properties ensure that
no duplicated messages are produced on Kafka brokers.
:::
Additional properties for Kafka producer configuration can be found in the [Kafka reference page](/products/fast_data_v2/kafka.md#producer-configuration) or in the [librdkafka](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html) documentation.

### Processor

Expand All @@ -144,34 +85,9 @@ The `persistence` configuration defines how aggregated data is stored.
Currently, `PersistenceConfig` can only be of type `mongo`.
The `MongoConfig` requires a `url`, which is a `Secret`, and can optionally specify
a `database`.
For proper functioning, sink collections need to be created. The `indexer` CLI can
be run to automatically create the necessary indexes.

Example of connection configuration:

```json
{
// other properties
"persistence": {
"type": "mongo",
"config": {
"url": "mongodb://localhost:27017/farm-data",
"database": "/farm-data",
"appName": "eu.miaplatfor.farm-data.lakes"
}
},
// other properties
}
```

:::warning
For proper functioning, sink collections need to be created.

Farm Data service heavily relies on the persistence layer to cache the current stream
state. Consequently, it is highly recommended to configure MongoDB instance to
actually sustain a heavy IO load, setting maximum IOPS to at least a value of 2400 IOPS (1200 read + 1200 write),
though higher values would only benefit the service throughput.

:::
For detailed information on Farm Data's MongoDB persistence model, required indexes, IOPS requirements, and collection naming conventions, see [Farm Data — Aggregation Persistence](/products/fast_data_v2/mongodb.md#farm-data--aggregation-persistence) in the MongoDB Reference.

#### Internal Updates

Expand Down Expand Up @@ -221,32 +137,9 @@ For more details on how to configure it, please refer to the [Workloads Configur

### Recommended Kafka Configuration

When configuring Kafka consumer, it is advised to set appropriate values for
constraining the consumer internal queue. In this manner:

- the maximum amount of memory employed by the service can be finely tuned to avoid
wasting resources, since only the number of messages that can effectively be
processed in real-time should be pulled in memory;
- it is ensured that consumer continuously poll the broker to avoid it exiting the
consumer group, since a lower number of buffered messages can trigger a new fetch to
replenish it;
For a full explanation of the consumer queue tuning parameters, see [Consumer Queue Tuning](/products/fast_data_v2/kafka.md#consumer-queue-tuning) in the Kafka Reference.

The main values to tune are:

- `queued.max.messages.kbytes`: maximum number of kilobytes of queued pre-fetched
messages in the local consumer queue;
- `queued.min.messages`: minimum number of messages per topic+partition _librdkafka_
tries to maintain in the local consumer queue;

It is recommended to set `queued.min.messages` to a value greater, but close to the
average message consumption rate. It is possible to observer:

- `kafka_consumer_rx_msgs_total` → messages read
- `farm_data_processed_msg` → total number of processed messages

to check the average values.

For _Farm Data_ service, an example of configuration can be the following one:
**Starting values for Farm Data input consumers:**

```json
{
Expand All @@ -255,30 +148,20 @@ For _Farm Data_ service, an example of configuration can be the following one:
}
```

Another important property that might need to be tuned is `fetch.message.max.bytes`,
which however should be set only in case `queued.max.messages.kbytes` is set to
a value lower than `1024`.
For a MongoDB M30 cluster (2400 IOPS), a more specific recommendation is:

```json
{
"queued.min.messages": "1000",
"queued.max.messages.kbytes": "16384"
}
```

#### Internal Updates Consumer

`internal-updates` consumer requires an **ad-hoc consumer configuration** due
to its messages' uniqueness. In fact, `internal-update` messages are very small
(in the bytes range), but they trigger a larger computation that may require different
milliseconds to complete.

Due to the combination of these factors, using the default queue parameters or even the ones
adopted for the input streams is not recommended. Indeed, the Kafka consumer tries
to fetch and buffer a large amount of events, since they are small, but it takes a considered
amount of time to clear them from the queue. This prevents the consumer from fetching newer
messages within the constraint set by `max.poll.interval.ms` (a default interval of 5 minutes).
Once that time elapses, the consumer instance is considered dead by the Kafka broker and
forces it to leave the group, triggering a restart of the service since its events stream has
terminated.

To prevent this unwanted situation that hinders the advancement of events processing, it
has been observed that modifying the consumer parameters can improve the stability of the
service itself. Thus, below are provided the recommended configuration to apply to the
Kafka consumer of the `internal-updates` configuration:
The `internal-updates` consumer requires a **dedicated configuration** because its messages are very small (bytes range) but trigger heavier computations. Using the same queue parameters as the regular input consumers causes the consumer to fail to poll within `max.poll.interval.ms` (default: 5 minutes), resulting in eviction from the consumer group and a service restart.

Apply the following configuration to the `internal-updates` consumer:

```json
{
Expand All @@ -288,26 +171,8 @@ Kafka consumer of the `internal-updates` configuration:
}
```

As it can be observed, here also `fetch.message.max.bytes` parameter has changed, since
it governs how many bytes are fetched per topic+partition the first time the consumer
connects. Consequently, leaving the default value of 1MB would lead to a behavior where
the service starts, aggregates events for about 5 minutes and then it restarts because
it has been forced out of the consumer group.

:::danger

When a consumer instance is forced out of its consumer group, such instance may not have
the chance to commit the work it has already carried out. Thus, setting
the proper values is fundamental to guaranteed service stability and progress in
consuming messages from Kafka topic.

:::

:::note

`queued.max.messages.kbytes` value use **KBytes** unit, whereas `fetch.message.max.bytes`
use **bytes** unit. Thus, the latter appears larger, though it isn't.

`queued.max.messages.kbytes` uses **KB** units; `fetch.message.max.bytes` uses **bytes**. Despite the apparent difference in magnitude, both values above are intentional.
:::

## Kubernetes
Expand Down
96 changes: 4 additions & 92 deletions docs/products/fast_data_v2/farm_data/30_Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@ sidebar_label: Usage
To use the plugin, the following requirements must be met:

- Kafka connection must have permission to read and write the topics declared in the configuration file;
- Kafka topics must exist on the Kafka cluster, with the appropriate number of partitions (which constrain service replicas), retention and replication factor;
- MongoDB collections must be defined on the MongoDB cluster with the necessary indexes;
to aid in the generation of them it is possible to exploit provided tools, such as
the `indexer` command of `the_world_outside` CLI. This command connects to the
configured persistence layer and, analyzing the aggregation graph, automatically
generate the recommended indexes for your use case.
- Kafka topics must exist on the Kafka cluster with the appropriate configuration (partitions, retention, replication factor); see [Topics](/products/fast_data_v2/kafka.md#topics) in the Kafka Reference;
- MongoDB collections must be defined on the MongoDB cluster with the necessary indexes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this no more, Farm Data can automatically create sink collections and also the proper indexes for them on the basis of the aggregation graph configured


## Messages Spec

Expand Down Expand Up @@ -176,93 +172,9 @@ Output messages are compliant with [Fast Data message format](/products/fast_dat
As explained in the [Configuration page](/products/fast_data_v2/farm_data/20_Configuration.mdx), Farm Data service
processes events using a stateful model and therefore it needs to store intermediate
results on persistent storage system.
Currently, Farm Data supports only MongoDB as external persistence system.
Currently, Farm Data supports only MongoDB as external persistence system.

### MongoDB Persistence

When using MongoDB as persistence storage, Farm Data service needs the details
for connecting to a MongoDB cluster, which allows the service to create the
needed collections and store there the intermediate aggregated documents.

In fact, the service creates on the selected database a collection for each
aggregation node (processing unit), which will store the intermediate results.
These collections are named following the pattern below:

```text
__sink_<aggregation_id>_<aggregation_node_name>
```

where:

- `__sink` is a constant prefix to signal that the collection is used internally;
- `<aggregation_id>` is the value of configuration field `id` that identifies the
specific aggregation process that is employing such collection.
Please, beware that this identifier MUST be between 8 and 16 characters and it
should satisfy MongoDB [collection name restrictions](https://www.mongodb.com/docs/manual/reference/limits/#mongodb-limit-Restriction-on-Collection-Names);
- `<aggregation_node_name>` is the name of a node in the aggregation graph;

To support read/write operations over `__sink` collections, each of them should have
the indexes defined:

- a **unique** index supporting the internal primary key of the record
```json
{
"__pk": 1
}
```
- an index for each `value` property of the current aggregation node, which
is employed for lookup operations by children nodes (in the aggregation graph).
For example:
```json
{
"value.userId": 1
}
```
- an index for each `dependency.*.__pk` property of the current aggregation node, which
is employed for lookup operations by current towards children nodes
(in the aggregation graph).
For example:
```json
{
"dependency.posts.__pk": 1
}
```

### Indexer CLI

To aid in the creation of `__sink` collections alongside their indexes, the
`the_world_outside` CLI is provided. It reads a Farm Data configuration file,
connects to the defined persistence storage and generate the needed collections
and their indexes. When generating indexes, it accounts also for complex filters,
so that lookup queries are correctly supported.

Secrets resolution is handled automatically. The secret just need to be reachable
from the CLI, either from an environmental variable or from a file.

The CLI can be installed using the following command:

```shell
npm install -g @mia-platform-internal/the-world-outside
```

and launched as show below:

```shell
two indexer -c <path-to-config-folder>
```

In the `-c` argument it is necessary to give the path to a directory folder which
holds the Farm Data configuration file (either named `farm-data.json` or `config.json`).

:::warning

In order to use `two` CLI it is necessary to own a Mia-Platform subscription and
have access to the private repository.

Furthermore, since the CLI directly connects to your MongoDB cluster, please ensure that
you can connect to it from your machine.

:::
For complete details on how Farm Data uses MongoDB — including collection naming conventions, required indexes, IOPS requirements, and configuration examples — see [Farm Data — Aggregation Persistence](/products/fast_data_v2/mongodb.md#farm-data--aggregation-persistence) in the MongoDB Reference.

## Aggregation Graph

Expand Down
Loading
Loading