-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcode_04_BeamFlatMap.java
More file actions
34 lines (26 loc) · 1.17 KB
/
code_04_BeamFlatMap.java
File metadata and controls
34 lines (26 loc) · 1.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.example;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.transforms.Create;
import java.util.Arrays;
public class code_04_BeamFlatMap {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
// Create a PCollection with sentences and split them into words
pipeline.apply(Create.of("Hello World", "Apache Beam", "Java SDK"))
// Transformation - FlatMapElements to convert list of sentences to words
.apply(FlatMapElements
.into(TypeDescriptors.strings())
.via((String sentence) -> Arrays.asList(sentence.split(" "))))
//Print the element
.apply(MapElements.into(TypeDescriptors.voids())
.via((String word) -> {
System.out.println(word);
return null;
}));
// Run the pipeline
pipeline.run().waitUntilFinish();
}
}