-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcode_06_BeamParDo.java
More file actions
40 lines (33 loc) · 1.42 KB
/
code_06_BeamParDo.java
File metadata and controls
40 lines (33 loc) · 1.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.example;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.transforms.Create;
public class code_06_BeamParDo {
public static void main(String[] args) {
// Create a pipeline
Pipeline pipeline = Pipeline.create();
// Create a PCollection of integers
PCollection<Integer> numbers = pipeline.apply(Create.of(1, 2, 3, 4, 5));
// Apply ParDo to process each element
PCollection<String> results = numbers.apply(ParDo.of(new DoFn<Integer, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Integer number = c.element(); // Get the current element (integer)
String result = "Number: " + number; // Create a string representation
c.output(result); // Output the result
}
}));
// Print the results
results.apply(MapElements.into(TypeDescriptor.of(Void.class))
.via((String result) -> {
System.out.println(result); // Print each result
return null;
}));
// Run the pipeline
pipeline.run().waitUntilFinish();
}
}