-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcode_05_BeamGroupByKey.java
More file actions
41 lines (33 loc) · 1.49 KB
/
code_05_BeamGroupByKey.java
File metadata and controls
41 lines (33 loc) · 1.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.example;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.transforms.Create;
public class code_05_BeamGroupByKey {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
// Create a PCollection of key-value pairs
pipeline.apply(Create.of(
KV.of("Category1", 1),
KV.of("Category1", 2),
KV.of("Category2", 3),
KV.of("Category2", 4)))
// Creating transformation - GroupbyKey
.apply(GroupByKey.create())
// Applying transformation to consolidate each key-value pair to key-listofvalues
.apply(MapElements.into(TypeDescriptors.strings())
.via((KV<String, Iterable<Integer>> grouped) -> {
return grouped.getKey() + ": " + grouped.getValue();
}))
// Print the elements of the result
.apply(MapElements.into(TypeDescriptors.voids())
.via((String result) -> {
System.out.println(result);
return null;
}));
// Run the pipeline
pipeline.run().waitUntilFinish();
}
}