-
Pipeline Basics
- Concept: The Pipeline represents a series of data processing steps. It is used to define and execute your data processing workflow.
- Code:
Pipeline p = Pipeline.create();
- Explanation:
Pipeline.create()initializes a new pipeline. This is the entry point for all Beam operations. You apply transforms to this pipeline to process data.
-
Creating a PCollection
- Concept: PCollection is a distributed dataset that can be processed in parallel. You can create a PCollection from an in-memory collection or other data sources.
- Code:
PCollection<String> words = p.apply("Creating Strings", Create.of("Hello", "World"));
- Explanation:
p.apply("Creating Strings", Create.of("Hello", "World"))creates a PCollection named "Creating Strings" from an in-memory list of strings: "Hello" and "World".
"Creating Strings" is a label used for debugging and visualization purposes.
-
Applying Transformations
- Concept: Transformations modify the data in a PCollection. Common transformations include
MapElements,FlatMapElements, andFilter. - Code:
PCollection<String> uppercasedWords = words.apply("Applying Transformations", MapElements .into(TypeDescriptor.of(String.class)) .via((String word) -> word.toUpperCase()) );
- Explanation:
MapElementsapplies a function to each element of a PCollection.
TypeDescriptor.of(String.class)specifies the output type of the transformation.
.via((String word) -> word.toUpperCase())defines the transformation function, converting each word to uppercase.
- Concept: Transformations modify the data in a PCollection. Common transformations include
-
Printing Elements
- Concept: You can print elements of a PCollection using
MapElementswith aVoidreturn type. - Code:
uppercasedWords.apply("Printing Strings", MapElements .into(TypeDescriptor.of(Void.class)) .via((String word) -> { System.out.println(word); return null; }) );
- Explanation:
MapElements.into(TypeDescriptor.of(Void.class))is used to apply a function where the result type isVoid(i.e., no output from the transformation).
.via((String word) -> { System.out.println(word); return null; })prints each element to the console and returnsnullsince the result type isVoid.
- Concept: You can print elements of a PCollection using
-
Running the Pipeline
- Concept: To execute the pipeline, you need to call the
runmethod. - Code:
p.run().waitUntilFinish();
- Explanation:
p.run()starts the execution of the pipeline.
.waitUntilFinish()waits for the pipeline to complete.
- Concept: To execute the pipeline, you need to call the
-
apply()
- Concept:
apply()is a method used to apply a transformation to aPCollection. It accepts a transform and returns a newPCollectionwith the results of the transformation. - Usage: In the snippet
p.apply("Creating Strings", Create.of("Hello", "World")), theapplymethod is used to apply theCreatetransform to the pipeline, which creates aPCollection.
- Concept:
-
via()
- via() defines the process (input transformation).
- Concept:
via()defines the transformation logic for how elements in thePCollectionare processed. - Usage: In the snippet
.via((String word) -> word.toUpperCase()), thevia()method is used to specify the transformation logic, in this case converting each string to uppercase.
-
TypeDescriptor.of()
- Concept:
TypeDescriptor.of()is used to specify the type of elements in thePCollectionafter a transformation. - Usage: In the snippet
TypeDescriptor.of(String.class), it defines the type of the output of the transformation, indicating that the elements will beStringobjects.
- Concept:
-
into()
- into() defines the output type (result structure).
- Concept:
into()is a method used to specify the target type of the transformation’s output. - Usage: In the snippet
MapElements.into(TypeDescriptor.of(String.class)),into()specifies that the transformation will output aPCollectionof strings.
-
Create.of()
- Concept:
Create.of()is a Beam transform used to create aPCollectionfrom an in-memory data source like a list or array. - Usage: In the snippet
Create.of("Hello", "World"), it creates aPCollectionwith the elements "Hello" and "World".
- Concept:
-
waitUntilFinish()
- Concept:
waitUntilFinish()waits for the pipeline to finish executing before proceeding further. - Usage: In the snippet
p.run().waitUntilFinish(), it ensures the pipeline completes its execution before the program continues.
- Concept:
- Purpose: Represents a specific type of data.
- Usage: Used to define the type of elements in a
PCollectionor the output from a transformation. - Example: When you need to specify a specific class type, such as
StringorInteger.
- Purpose: A utility class that provides common
TypeDescriptorinstances for convenience. - Usage: Provides predefined
TypeDescriptorinstances for common types likeString,Integer, etc. - Example: When you want a shorthand way to specify common types, like
String.
TypeDescriptor: Allows you to create a new type descriptor for any class you specify.TypeDescriptors: Offers predefined instances ofTypeDescriptorfor commonly used types, making it easier and quicker to use.
- Use
TypeDescriptor: When you need to specify a custom or less common type that is not provided byTypeDescriptors. - Use
TypeDescriptors: For convenience with common types, where predefined instances are readily available.
TypeDescriptor: Provides a method to create a new type descriptor for any class.TypeDescriptors: Provides a set of predefined type descriptors for frequently used types, offering a convenient shorthand.
TypeDescriptors.strings(): ForStringtype.TypeDescriptors.integers(): ForIntegertype.TypeDescriptors.longs(): ForLongtype.TypeDescriptors.doubles(): ForDoubletype.TypeDescriptors.floats(): ForFloattype.TypeDescriptors.booleans(): ForBooleantype.TypeDescriptors.bytes(): ForBytetype.TypeDescriptors.kvs(TypeDescriptor<K>, TypeDescriptor<V>): ForKV<K, V>type.TypeDescriptors.iterables(TypeDescriptor<T>): ForIterable<T>type.TypeDescriptors.maps(TypeDescriptor<K>, TypeDescriptor<V>): ForMap<K, V>type.TypeDescriptors.pcollections(TypeDescriptor<T>): ForPCollection<T>type.TypeDescriptors.list(TypeDescriptor<T>): ForList<T>type.TypeDescriptors.sets(TypeDescriptor<T>): ForSet<T>type.TypeDescriptor.of(Class<T>): For any custom or complex type.TypeDescriptor.of(ParameterizedType): For parameterized types likeList<String>orMap<String, Integer>.
-
TypeDescriptors.strings(): ForStringtype.
Equivalent toTypeDescriptor.of(String.class). -
TypeDescriptors.integers(): ForIntegertype.
Equivalent toTypeDescriptor.of(Integer.class). -
TypeDescriptors.longs(): ForLongtype.
Equivalent toTypeDescriptor.of(Long.class). -
TypeDescriptors.doubles(): ForDoubletype.
Equivalent toTypeDescriptor.of(Double.class). -
TypeDescriptors.floats(): ForFloattype.
Equivalent toTypeDescriptor.of(Float.class). -
TypeDescriptors.booleans(): ForBooleantype.
Equivalent toTypeDescriptor.of(Boolean.class). -
TypeDescriptors.bytes(): ForBytetype.
Equivalent toTypeDescriptor.of(Byte.class). -
TypeDescriptors.kvs(TypeDescriptor<K>, TypeDescriptor<V>): ForKV<K, V>type. -
TypeDescriptors.iterables(TypeDescriptor<T>): ForIterable<T>type. -
TypeDescriptors.maps(TypeDescriptor<K>, TypeDescriptor<V>): ForMap<K, V>type. -
TypeDescriptors.pcollections(TypeDescriptor<T>): ForPCollection<T>type. -
TypeDescriptors.list(TypeDescriptor<T>): ForList<T>type. -
TypeDescriptors.sets(TypeDescriptor<T>): ForSet<T>type. -
TypeDescriptor.of(Class<T>): For any custom or complex type. -
TypeDescriptor.of(ParameterizedType): For parameterized types likeList<String>orMap<String, Integer>.
-
What It Is:
- A
PCollectionViewis a special type ofPCollectionused to share static data across multiple elements in aPCollection. - Think of it as a way to pass small, read-only data to your processing steps. It’s like having a shared reference that multiple parts of your pipeline can access.
- A
-
Usage:
- It’s typically used when you want to use a small amount of data (like configuration values or constants) in your transformations.
- What It Is:
View.asSingleton()is a method that converts aPCollectioninto aPCollectionViewthat contains exactly one element.- This is useful when you know your
PCollectionwill have only one element, and you need to use that single value across your entire pipeline.
-
What It Is:
DoFnstands for "Do Function." It is a base class in Apache Beam used to define custom transformations applied to elements of aPCollection.- You subclass
DoFnand override itsprocessElementmethod to specify how each element should be transformed.
-
Usage:
- Use
DoFnwhen you need custom logic for processing each element in aPCollection.
- Use
-
What It Is:
@ProcessElementis an annotation that marks a method in aDoFnsubclass as the method that processes individual elements of thePCollection.
-
Usage:
- This method contains the logic for processing each element and is called once for each element in the
PCollection.
- This method contains the logic for processing each element and is called once for each element in the
-
What It Is:
ProcessContextis an object provided to the@ProcessElementmethod. It gives access to the current element being processed and provides methods to output results.
-
Key Methods:
c.element(): Retrieves the current element from the inputPCollection.c.output(...): Outputs one or more results from the@ProcessElementmethod.
-
What It Is:
ParDois a transform that applies aDoFnto each element of aPCollection. It is used for parallel processing of elements.
-
Usage:
- Use
ParDowhen you want to apply custom processing logic to each element in aPCollection. It can output zero or more results for each input element.
- Use
-
What It Is:
c.output()is a method provided by theProcessContextobject inside the@ProcessElementmethod of aDoFn.- It allows you to emit results from the
DoFnfor each element processed.
-
Usage:
- Emitting Results: Use
c.output()to send processed data to the next stage in the pipeline. - Multiple Outputs: You can call
c.output()multiple times to emit multiple results for a single input element.
- Emitting Results: Use
-
Parameters:
- Single Element: Pass a single element to
c.output()to emit it as part of thePCollectionbeing processed. - Iterable: You can also pass an
Iterabletoc.output()if you need to emit multiple elements at once.
- Single Element: Pass a single element to
package com.example;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.transforms.Create;
public class code_06_BeamParDo {
public static void main(String[] args) {
// Create a pipeline
Pipeline pipeline = Pipeline.create();
// Create a PCollection of integers
PCollection<Integer> numbers = pipeline.apply(Create.of(1, 2, 3, 4, 5));
// Apply ParDo to process each element
PCollection<String> results = numbers.apply(ParDo.of(new DoFn<Integer, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Integer number = c.element(); // Get the current element (integer)
String result = "Number: " + number; // Create a string representation
c.output(result); // Output the result
}
}));
// Print the results
results.apply(MapElements.into(TypeDescriptor.of(Void.class))
.via((String result) -> {
System.out.println(result); // Print each result
return null;
}));
// Run the pipeline
pipeline.run().waitUntilFinish();
}
}ParDo.of(new DoFn<input, output>() {
@ProcessElement
public void processElement(ProcessContext c) {
// c.element() returns an element of type input
// c.output(outputElement) emits an element of type output
}
});-
DoFn<Integer, String>: Defines a custom transformation where eachIntegerinput element is transformed into aString. -
@ProcessElementMethod:processElement(ProcessContext c):c.element(): Retrieves the current integer.c.output(result): Outputs the transformed string.
-
ParDo.of(...): Applies the customDoFnto each element of thePCollection. -
MapElements: Converts the resultingPCollection<String>to print each result. -
pipeline.run().waitUntilFinish(): Executes the pipeline and waits for it to complete.
-
Without
{}:- Use for single-line expressions.
- No need for
returnkeyword.
.via((String word) -> word.toUpperCase());
-
With
{}:- Use for multiple statements or complex logic.
- Explicit
returnkeyword.
.via((String word) -> { String upperCaseWord = word.toUpperCase(); System.out.println(upperCaseWord); return upperCaseWord; });
| Transform | Method | Purpose | Sample Syntax |
|---|---|---|---|
MapElements |
via |
Applies a function to each element and transforms it. | java pipeline.apply(MapElements.into(TypeDescriptor.<br/>of(OutputType.class))<br/>.via((InputType element) -> transformedElement)) |
FlatMapElements |
via |
Applies a function that produces zero or more output elements for each input element. | java pipeline.apply(FlatMapElements.into(TypeDescriptor.of(OutputType.class)).via((InputType element) -> Arrays.asList(outputElements))) |
ParDo |
withOutputType |
Specifies the type of output elements. | java pipeline.apply(ParDo.of(new DoFn<InputType, OutputType>() { ... }).withOutputType(TypeDescriptor.of(OutputType.class))) |
GroupByKey |
by |
Extracts the key from KV elements for grouping. |
java pipeline.apply(GroupByKey.create()) |
Count.perElement |
by |
Counts occurrences of each distinct element. | java pipeline.apply(Count.perElement()) |
Combine |
withInputType |
Specifies the type of input elements for combining. | java pipeline.apply(Combine.globally(new YourCombineFn()).withInputType(TypeDescriptor.of(InputType.class))) |
TextIO.read |
from |
Specifies the file path or pattern for reading input data. | java pipeline.apply(TextIO.read().from("path/to/input/file")) |
TextIO.write |
to |
Specifies the file path or pattern for writing output data. | java pipeline.apply(TextIO.write().to("path/to/output/file")) |
Create |
withCoder |
Specifies the coder for the created PCollection. |
java pipeline.apply(Create.of(elements).withCoder(Coder.of(ElementType.class))) |
-
via:- Purpose: Transform each element.
- Template:
pipeline.apply(MapElements .into(TypeDescriptor.of(OutputType.class)) .via((InputType element) -> transformedElement));
- Example: Convert strings to uppercase:
PCollection<String> uppercased = lines.apply(MapElements .into(TypeDescriptor.of(String.class)) .via((String line) -> line.toUpperCase()));
-
FlatMapElements:- Purpose: Produce zero or more elements per input element.
- Template:
pipeline.apply(FlatMapElements .into(TypeDescriptor.of(OutputType.class)) .via((InputType element) -> Arrays.asList(outputElements)));
- Example: Split lines into words:
PCollection<String> words = lines.apply(FlatMapElements .into(TypeDescriptor.of(String.class)) .via((String line) -> Arrays.asList(line.split("\\W+"))));
-
withOutputType:- Purpose: Define the type of output elements.
- Template:
pipeline.apply(ParDo.of(new DoFn<InputType, OutputType>() { ... }) .withOutputType(TypeDescriptor.of(OutputType.class)));
- Example: Specify output type in
ParDo:PCollection<String> processed = lines.apply(ParDo.of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().toUpperCase()); } }).withOutputType(TypeDescriptor.of(String.class)));
-
by:- Purpose: Key extraction for grouping or counting.
- Template:
pipeline.apply(GroupByKey.create());
- Example: Count elements per key:
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
-
withInputType:- Purpose: Define input type for combining.
- Template:
pipeline.apply(Combine.globally(new YourCombineFn()) .withInputType(TypeDescriptor.of(InputType.class)));
- Example: Combine elements globally:
PCollection<Integer> sum = numbers.apply(Combine.globally(new SumFn()) .withInputType(TypeDescriptor.of(Integer.class)));
-
from:- Purpose: Specify input file path or pattern.
- Template:
pipeline.apply(TextIO.read().from("path/to/input/file"));
- Example: Read from a text file:
PCollection<String> lines = pipeline.apply(TextIO.read().from("input.txt"));
-
to:- Purpose: Specify output file path or pattern.
- Template:
pipeline.apply(TextIO.write().to("path/to/output/file"));
- Example: Write to a text file:
words.apply(TextIO.write().to("output.txt"));
-
withCoder:- Purpose: Define how elements are serialized/deserialized.
- Template:
pipeline.apply(Create.of(elements) .withCoder(Coder.of(ElementType.class)));
- Example: Specify a coder for
Create:PCollection<MyType> pcollection = pipeline.apply(Create.of(myElements) .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())));
NOTE:
- To run the java file from Maven -
mvn exec:java -D"exec.mainClass"="com.complexExamples.code_03_BeamPipeline"