-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcode_01_wordcount.java
More file actions
70 lines (55 loc) · 2.46 KB
/
code_01_wordcount.java
File metadata and controls
70 lines (55 loc) · 2.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.usecases;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;
public class code_01_wordcount {
public static void main(String args[]){
System.out.println("Create an object of the Beam Pipeline");
Pipeline pipeline = Pipeline.create();
System.out.println("Reading the Input File");
PCollection<String> lines = pipeline.apply("Text File read",TextIO.read().from("src/main/java/datasets/word_count_file.txt"));
System.out.println("Printing Each Line");
lines.apply(MapElements
.into(TypeDescriptors.strings())
.via((String line) -> {
System.out.println("Read line: " + line);
return line;
}));
System.out.println("Splitting Each line into words");
PCollection<String> words = lines.apply(FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) ->
Arrays.asList(line.split("\\W+")) //Returns array of words.
));
System.out.println("Printing Each word");
words.apply(MapElements
.into(TypeDescriptors.strings())
.via((String word) -> {
System.out.println("Word: "+word);
return word;
})
);
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
System.out.println("Printing Each word and Count");
PCollection<String> keyValueString = wordCounts.apply(MapElements
.into(TypeDescriptors.strings())
.via((wordCount) -> {
String result = wordCount.getKey() + ": " + wordCount.getValue();
System.out.println(result);
return result;
}));
keyValueString.apply(
TextIO.write()
.to("src/main/java/datasets/output/word_count_output/output.txt")
.withNumShards(1) // Control the number of output files
);
pipeline.run().waitUntilFinish();
}
}