Skip to content

Commit 32e6662

Browse files
[docs] added Real-Time User Profile quickstart
1 parent c8ce79d commit 32e6662

1 file changed

Lines changed: 33 additions & 26 deletions

File tree

website/docs/quickstart/user_profile.md

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,20 @@ sidebar_position: 4
55

66
# Real-Time User Profile
77

8-
This tutorial demonstrates a real-time user profiling workflow using two core Apache Fluss features: the **Auto-Increment Column** and the **Aggregation Merge Engine**.
8+
This tutorial demonstrates how to build a real-time user profiling system using two core Apache Fluss features: the **Auto-Increment Column** and the **Aggregation Merge Engine**. You will learn how to automatically map high-cardinality string identifiers (like emails) to compact integer UIDs, and accumulate user click metrics directly in the storage layer keeping the Flink job entirely stateless.
99

1010
## How the System Works
1111

1212
### Core Concepts
1313

1414
- **Identity Mapping**: Incoming email strings are automatically mapped to compact `INT` UIDs using Fluss's auto-increment column, no manual ID management required.
15-
- **Storage-Level Aggregation**: Click counts are accumulated directly in the Fluss TabletServers via The Aggregation Merge Engine sums the clicks at the storage layer, with no windowing or state in Flink required.
15+
- **Storage-Level Aggregation**: Click counts are accumulated directly in the Fluss TabletServers via the Aggregation Merge Engine. The Flink job remains stateless and lightweight.
1616

1717
### Data Flow
1818

19-
1. **Ingestion**: Raw click events arrive with an email address, a click count, and a profile group.
19+
1. **Ingestion**: Raw click events arrive with an email address and a click count.
2020
2. **Mapping**: A Flink lookup join against `user_dict` resolves the email to a UID. If the email is new, the `insert-if-not-exists` hint instructs Fluss to generate a new UID automatically.
21-
3. **Aggregation**: Each event's click count is written to `user_profiles`. The Aggregation Merge Engine sums the clicks at the storage layer, with no windowing or state in Flink required.
21+
3. **Aggregation**: The resolved UID becomes the primary key in `user_profiles`. Each event's click count is summed at the storage layer via the Aggregation Merge Engine, no windowing or state in Flink required.
2222

2323
## Environment Setup
2424

@@ -177,7 +177,7 @@ USE CATALOG fluss_catalog;
177177
178178
## Step 2: Create the User Dictionary Table
179179
180-
Create the `user_dict` table to map emails to UIDs. The `auto-increment.fields` property instructs Fluss to automatically generate a unique `INT` UID for every new email it receives.
180+
Create the `user_dict` table to map email addresses to integer UIDs. The `auto-increment.fields` property instructs Fluss to automatically assign a unique `INT` UID for every new email it receives.
181181
182182
```sql
183183
CREATE TABLE user_dict (
@@ -191,13 +191,13 @@ CREATE TABLE user_dict (
191191
192192
## Step 3: Create the Aggregated Profile Table
193193
194-
Create the `user_profiles` table using the **Aggregation Merge Engine**. The `sum` aggregator on `total_clicks` means every incoming click count is accumulated directly at the storage layer, so the Flink job does not need to maintain any state.
194+
Create the `user_profiles` table using the **Aggregation Merge Engine**. Each user's UID is the primary key, and `total_clicks` accumulates their click activity directly at the storage layer via the `sum` aggregator.
195195
196196
```sql
197197
CREATE TABLE user_profiles (
198-
profile_id INT,
198+
uid INT,
199199
total_clicks BIGINT,
200-
PRIMARY KEY (profile_id) NOT ENFORCED
200+
PRIMARY KEY (uid) NOT ENFORCED
201201
) WITH (
202202
'table.merge-engine' = 'aggregation',
203203
'fields.total_clicks.agg' = 'sum'
@@ -208,29 +208,29 @@ CREATE TABLE user_profiles (
208208
209209
Create a temporary source table to simulate raw click events using the Faker connector.
210210
211+
:::note
212+
Java Faker's `numberBetween(min, max)` treats `max` as exclusive. The expressions below are set to produce click counts of 1–10 and a pool of 100 distinct simulated email users.
213+
:::
214+
211215
```sql
212216
CREATE TEMPORARY TABLE raw_events (
213-
email STRING,
214-
click_count INT,
215-
profile_group_id INT,
216-
proctime AS PROCTIME()
217+
email STRING,
218+
click_count INT,
219+
proctime AS PROCTIME()
217220
) WITH (
218-
'connector' = 'faker',
219-
'rows-per-second' = '1',
220-
'fields.email.expression' = '#{internet.emailAddress}',
221-
'fields.click_count.expression' = '#{number.numberBetween ''1'',''10''}',
222-
'fields.profile_group_id.expression' = '#{number.numberBetween ''1'',''5''}'
221+
'connector' = 'faker',
222+
'rows-per-second' = '1',
223+
'fields.email.expression' = '#{internet.emailAddress}',
224+
'fields.click_count.expression' = '#{number.numberBetween ''1'',''11''}'
223225
);
224226
```
225227
226-
Now run the pipeline. The `lookup.insert-if-not-exists` hint ensures that if an email is not found in `user_dict`, Fluss automatically generates a new UID for it on the fly.
227-
228-
Although this minimal quickstart does not use the generated `uid` in the final aggregation, the lookup join is still important because it demonstrates how Fluss automatically assigns and persists stable integer IDs for new email identifiers on first encounter.
228+
Now run the pipeline. The `lookup.insert-if-not-exists` hint ensures that if an email is not found in `user_dict`, Fluss generates a new `uid` for it automatically. The resolved `uid` becomes the primary key of `user_profiles`, making the dictionary mapping the central link between the two tables.
229229
230230
```sql
231231
INSERT INTO user_profiles
232232
SELECT
233-
e.profile_group_id,
233+
d.uid,
234234
CAST(e.click_count AS BIGINT)
235235
FROM raw_events AS e
236236
JOIN user_dict /*+ OPTIONS('lookup.insert-if-not-exists' = 'true') */
@@ -240,7 +240,7 @@ ON e.email = d.email;
240240
241241
## Step 5: Verify Results
242242
243-
Open a **second terminal**, change into the same working directory, re-run the export commands, and launch another SQL Client session to query the results while the pipeline runs.
243+
Open a **second terminal**, re-run the export commands, and launch another SQL Client session to query results while the pipeline runs.
244244
245245
```shell
246246
cd fluss-user-profile
@@ -257,6 +257,7 @@ CREATE CATALOG fluss_catalog WITH (
257257
'bootstrap.servers' = 'coordinator-server:9123'
258258
);
259259
```
260+
260261
```sql
261262
USE CATALOG fluss_catalog;
262263
```
@@ -268,18 +269,18 @@ SET 'sql-client.execution.result-mode' = 'tableau';
268269
```
269270
270271
```sql
271-
SELECT profile_id, total_clicks FROM user_profiles;
272+
SELECT uid, total_clicks FROM user_profiles;
272273
```
273274
274-
You should see 5 rows (one per profile group) with `total_clicks` increasing in real time as new events arrive.
275+
You should see rows appearing for each new user, with `total_clicks` accumulating in real time as more events arrive for the same email.
275276
276-
To verify the dictionary mapping is working:
277+
To verify that email-to-UID mapping is working correctly:
277278
278279
```sql
279280
SELECT * FROM user_dict LIMIT 10;
280281
```
281282
282-
Each email should have a compact `INT` uid automatically assigned by Fluss.
283+
Each email should have a unique compact `INT` uid automatically assigned by Fluss.
283284
284285
## Clean Up
285286
@@ -289,6 +290,12 @@ Exit the SQL Client by typing `exit;`, then stop the Docker containers.
289290
docker compose down -v
290291
```
291292
293+
## Architectural Benefits
294+
295+
- **Stateless Flink Jobs:** Offloading both the identity dictionary and the click aggregation to Fluss makes the Flink job lightweight, with fast checkpoints and minimal recovery time.
296+
- **Compact Storage:** Using auto-incremented `INT` UIDs instead of raw email strings reduces memory and storage footprint significantly.
297+
- **Exactly-Once Accuracy:** The **Undo Recovery** mechanism in the Fluss Flink connector ensures that replayed data during failovers does not result in double-counting.
298+
292299
## What's Next?
293300
294301
This quickstart demonstrates the core mechanics. For a deeper dive into real-time user profiling with bitmap-based unique visitor counting using the `rbm64` aggregator, see the [Real-Time Profiles blog post](/blog/realtime-profiles-fluss).

0 commit comments

Comments
 (0)