-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathKsql Project Basic Operations
More file actions
144 lines (116 loc) · 3.95 KB
/
Ksql Project Basic Operations
File metadata and controls
144 lines (116 loc) · 3.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
Create Stream -
ksql> CREATE STREAM pageviews \
> (viewtime BIGINT, userid VARCHAR, pageid VARCHAR) \
> WITH (KAFKA_TOPIC='pageviews',KEY_FORMAT='KAFKA', PARTITIONS=2, VALUE_FORMAT='JSON');
SELECT userid, pageid, viewtime
FROM pageviews
WHERE pageid = 'homepage';
SHOW QUERIES;
CREATE TABLE user_profiles (
userid VARCHAR PRIMARY KEY,
username VARCHAR,
email VARCHAR
)
WITH (KAFKA_TOPIC='user_profiles_topic', VALUE_FORMAT='JSON', KEY='userid');
3. Create a Table
A table in ksqlDB represents a state derived from an event stream, i.e., a key-value store that represents the latest state of an entity. It's useful for capturing aggregates or updates over time.
sql
Copy code
CREATE TABLE table_name (
column1 TYPE1,
column2 TYPE2
)
WITH (KAFKA_TOPIC='kafka_topic_name', VALUE_FORMAT='format', KEY='key_column');
KEY: The column that will be used as the primary key in the table.
Example:
sql
Copy code
CREATE TABLE user_profiles (
userid VARCHAR PRIMARY KEY,
username VARCHAR,
email VARCHAR
)
WITH (KAFKA_TOPIC='user_profiles_topic', VALUE_FORMAT='JSON', KEY='userid');
7. Aggregations
ksqlDB allows you to aggregate data in streams, such as counting or summing events. Aggregations are usually done using windowing to group data over time periods.
Example:
sql
Copy code
SELECT pageid, COUNT(*) AS pageviews_count
FROM pageviews
WINDOW TUMBLING (SIZE 10 MINUTES)
GROUP BY pageid;
TUMBLING WINDOW: A fixed-size window, e.g., 10 minutes, where the count of pageid is calculated.
GROUP BY: Aggregates by the pageid field.
This query counts the number of pageviews for each pageid in 10-minute intervals.
8. Joining Streams and Tables
You can join streams with other streams or tables to enrich data.
Stream-to-Stream Join:
sql
Copy code
SELECT o.order_id, c.customer_name
FROM orders o
JOIN customers c
WITHIN 1 HOUR
ON o.customer_id = c.customer_id;
WITHIN: Defines the time window for joining streams.
Stream-to-Table Join:
sql
Copy code
SELECT o.order_id, u.username
FROM orders o
LEFT JOIN user_profiles u
ON o.userid = u.userid;
This query joins the orders stream with the user_profiles table based on userid, enriching the order information with the user’s profile details.
9. Windowed Aggregations
Windowing allows you to apply aggregations over time intervals (tumbling, hopping, or session windows).
TUMBLING WINDOW: Fixed, non-overlapping intervals.
HOPPING WINDOW: Fixed intervals, but can overlap.
SESSION WINDOW: Variable-size windows based on activity.
Example of Windowed Count:
sql
Copy code
SELECT pageid, COUNT(*)
FROM pageviews
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)
GROUP BY pageid;
This counts pageviews for each page every 5 minutes, but with a 1-minute overlap.
10. Persistent Queries
In ksqlDB, persistent queries run continuously in the background, processing incoming data in real-time.
Example: Create a Persistent Query
sql
Copy code
CREATE STREAM pageviews_homepage AS
SELECT userid, pageid, timestamp
FROM pageviews
WHERE pageid = 'homepage';
This creates a new stream pageviews_homepage, which will continuously capture events from the pageviews stream where pageid = 'homepage'.
11. Materialized Views
Materialized views are created from tables or aggregated streams, which are continuously updated with new data.
Example:
sql
Copy code
CREATE TABLE pageviews_per_user AS
SELECT userid, COUNT(*) AS views_count
FROM pageviews
GROUP BY userid;
This creates a materialized view (a table) that holds the count of pageviews per user. The view is continuously updated as new data arrives in the pageviews stream.
12. Drop a Stream or Table
When you no longer need a stream or table, you can drop it.
Example:
sql
Copy code
DROP STREAM pageviews_homepage;
DROP TABLE user_profiles;
13. Terminating Queries
To stop a persistent query, you need to terminate it.
Example:
List Queries:
sql
Copy code
SHOW QUERIES;
Terminate Query:
sql
Copy code
TERMINATE <query_id>;
Replace <query_id> with the actual query ID you want to terminate.