Introduction
Transforms in Beam are represented by the PTransform object which takes an Input (PCollection) and applies transformations of some kind on the Input and produces an output which is another PCollection object.
The most common types of transformations are categorized into Element-Wise Transformation and Aggregation. The element-wise transformations are the most common ones like Filter, FlatMap, Keys, Map, ParDo etc. Aggregation transforms are GroupBy, GroupByKey, Min, Sum, Combines, Count, etc. The article explains with Python examples some of the transforms element-wise as well as aggregation. Let’s explore.
Element-Wise
Map
The first example is the ‘Map’ function example. The ‘Map’ function is not new to us I am sure, the Map function will be applied to each element in the input and produces a result. Let’s begin with a simple example where a list of names will be converted to upper case letters.
import apache_beam as beam
with beam.Pipeline() as pipeline:
names = (
pipeline
| 'Names' >> beam.Create(['John', 'Mike', 'Sam'])
| 'Upper' >> beam.Map(str.upper)
| beam.Map(print))
ParDo
The ParDo is the most important one in my opinion. ParDo function takes each element in PCollection performs the user-defined function on it which is ‘DoFn’ and produces the result.
import apache_beam as beam
class LowerCaseConversion(beam.DoFn):
def process(self, element):
return [element.lower()]
with beam.Pipeline() as pipeline:
names = (
pipeline
| 'Names' >> beam.Create(['John', 'Mike', 'Sam'])
| ‘Lower’ >> beam.ParDo(LowerCaseConversion())
| beam.Map(print))
Aggregation
The article explains two types of aggregation function by example, the ‘count’ aggregation, and ‘GroupByKey’ transform.
Count
The example is going to be the same, from the name list we will identify the total number of names in the list. The count has three types of variations
1. Globally
Count all the elements in the PCollection
import apache_beam as beam
with beam.Pipeline() as pipeline:
names = (
pipeline
| 'Names' >> beam.Create(['John', 'Mike', 'Sam', 'Sam'])
| 'Count' >> beam.combiners.Count.Globally()
| beam.Map(print))
Result: 4
2. PerKey
Count the elements for each unique key in PCollection
import apache_beam as beam
with beam.Pipeline() as pipeline:
names = (
pipeline
| 'Names' >> beam.Create([
('John' , 30), ('Sam', 40), ('John', 40), ('Mike', 25)])
| 'Count' >> beam.combiners.Count.PerKey()
| beam.Map(print))
3. PerElement
Counts the unique element in PCollection.
import apache_beam as beam
with beam.Pipeline() as pipeline:
names = (
pipeline
| 'Names' >> beam.Create(['John', 'Mike', 'Sam', 'Sam'])
| 'Count' >> beam.combiners.Count.PerElement()
| beam.Map(print))
GroupByKey
As the name suggests, GroupByKey works on Keyed collection and produces a collection where elements consist of a key and the elements associated with the Key.
import apache_beam as beam
with beam.Pipeline() as pipeline:
name_counts = (
pipeline
| 'Name Counts' >> beam.Create([
('John' , 30), ('Sam', 40), ('John', 40), ('Mike', 25)
])
| 'Counts' >> beam.GroupByKey()
| beam.MapTuple(lambda k, vs: (k, sorted(vs)))
| beam.Map(print))
Summary
The article explains what Transforms are about in Apache Beam and covered some basic transformation examples like Map, ParDo, Count, GroupByKey, it’s the 2nd part of the series from next articles more practical examples will be covered.