-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcode_01_BeamWindowing.java
More file actions
36 lines (28 loc) · 1.39 KB
/
code_01_BeamWindowing.java
File metadata and controls
36 lines (28 loc) · 1.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.complexExamples;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.apache.beam.sdk.transforms.Create;
public class code_01_BeamWindowing {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
// Create a PCollection with timestamps
pipeline.apply(Create.timestamped(
TimestampedValue.of(1, org.joda.time.Instant.now()),
TimestampedValue.of(2, org.joda.time.Instant.now().plus(Duration.standardSeconds(10))),
TimestampedValue.of(3, org.joda.time.Instant.now().plus(Duration.standardSeconds(20)))))
.apply(Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(15)))) // Fixed windowing of 15 seconds
.apply(MapElements.into(TypeDescriptors.strings())
.via(Object::toString))
.apply(MapElements.into(TypeDescriptors.voids())
.via((String result) -> {
System.out.println(result);
return null;
}));
// Run the pipeline
pipeline.run().waitUntilFinish();
}
}