-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcode_01_BeamCreate_Objects.java
More file actions
56 lines (45 loc) · 1.67 KB
/
code_01_BeamCreate_Objects.java
File metadata and controls
56 lines (45 loc) · 1.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.example;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
public class code_01_BeamCreate_Objects {
public static class MyClass implements Serializable {
public int id;
public String name;
public MyClass() {} // Default constructor for serialization
public MyClass(int id, String name) {
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "ID: "+id + " and Name: " + name;
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create();
List<MyClass> myObjects = Arrays.asList(
new MyClass(1, "Alice"),
new MyClass(2, "Bob"),
new MyClass(3, "Charlie")
);
// Apply the Create-transform with SerializableCoder
PCollection<MyClass> myClassPCollection = p.apply(Create.of(myObjects)
.withCoder(SerializableCoder.of(MyClass.class)));
// Process each element of the PCollection
myClassPCollection.apply(ParDo.of(new DoFn<MyClass, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
MyClass obj = c.element();
System.out.println(obj); // Using overridden toString method
}
}));
p.run().waitUntilFinish();
}
}