Exploring Transforms In Apache Beam

Introduction

Transforms in Beam are represented by the PTransform object which takes an Input (PCollection) and applies transformations of some kind on the Input and produces an output which is another PCollection object.

The most common types of transformations are categorized into Element-Wise Transformation and Aggregation. The element-wise transformations are the most common ones like Filter, FlatMap, Keys, Map, ParDo etc. Aggregation transforms are GroupBy, GroupByKey, Min, Sum, Combines, Count, etc. The article explains with Python examples some of the transforms element-wise as well as aggregation. Let’s explore.

Element-Wise
 

Map

The first example is the ‘Map’ function example. The ‘Map’ function is not new to us I am sure, the Map function will be applied to each element in the input and produces a result. Let’s begin with a simple example where a list of names will be converted to upper case letters.

import apache_beam as beam

with beam.Pipeline() as pipeline:
    names = (
      pipeline
      | 'Names' >> beam.Create(['John', 'Mike', 'Sam'])
      | 'Upper' >> beam.Map(str.upper)
      | beam.Map(print))

ParDo

The ParDo is the most important one in my opinion. ParDo function takes each element in PCollection performs the user-defined function on it which is ‘DoFn’ and produces the result.

import apache_beam as beam

class LowerCaseConversion(beam.DoFn):
    
    def process(self, element):
        return [element.lower()]
    

with beam.Pipeline() as pipeline:
    names = (
      pipeline
      | 'Names' >> beam.Create(['John', 'Mike', 'Sam'])
      | ‘Lower’ >> beam.ParDo(LowerCaseConversion())
      | beam.Map(print))

Aggregation

The article explains two types of aggregation function by example, the ‘count’ aggregation, and ‘GroupByKey’ transform.

Count

The example is going to be the same, from the name list we will identify the total number of names in the list. The count has three types of variations

1. Globally

Count all the elements in the PCollection

import apache_beam as beam
with beam.Pipeline() as pipeline:
    names = (
      pipeline
      | 'Names' >> beam.Create(['John', 'Mike', 'Sam', 'Sam'])
      | 'Count' >> beam.combiners.Count.Globally()
      | beam.Map(print))

Result: 4

2. PerKey

Count the elements for each unique key in PCollection

import apache_beam as beam

with beam.Pipeline() as pipeline:
    names = (
      pipeline
      | 'Names' >> beam.Create([
          ('John' , 30), ('Sam', 40), ('John', 40), ('Mike', 25)])
      | 'Count' >> beam.combiners.Count.PerKey()
      | beam.Map(print))

3. PerElement

Counts the unique element in PCollection.

import apache_beam as beam


with beam.Pipeline() as pipeline:
    names = (
      pipeline
      | 'Names' >> beam.Create(['John', 'Mike', 'Sam', 'Sam'])
      | 'Count' >> beam.combiners.Count.PerElement()
      | beam.Map(print))

GroupByKey

As the name suggests, GroupByKey works on Keyed collection and produces a collection where elements consist of a key and the elements associated with the Key.

import apache_beam as beam

with beam.Pipeline() as pipeline:
    name_counts = (
      pipeline
      | 'Name Counts' >> beam.Create([
          ('John' , 30), ('Sam', 40), ('John', 40), ('Mike', 25)
        ])
      | 'Counts' >> beam.GroupByKey()
      | beam.MapTuple(lambda k, vs: (k, sorted(vs))) 
      | beam.Map(print))

Summary

The article explains what Transforms are about in Apache Beam and covered some basic transformation examples like Map, ParDo, Count, GroupByKey, it’s the 2nd part of the series from next articles more practical examples will be covered.