Critical PySpark Functions

Introduction

PySpark is a Python API for Spark and Apache Spark. It is an analytics engine used for processing huge amounts of data. The article covers the important functions useful for operations in PySpark after data import. There are many such functions in PySpark. This article focuses on 5 of them

  • select
  • filter
  • groupBy
  • orderBy
  • when

Data Setup

We will work on a Kaggle dataset that provides YouTube video trending statistics, URL: https://www.kaggle.com/datasnaek/youtube-new and the file we are using is ‘USvideos.csv’. The columns of the dataset are

Critical PySpark Functions

Spark Session creation and Spark DataFrame creation is as follows

from pyspark.sql import SparkSession

sc = SparkSession.builder.master("local[*]")\
        .appName('basic_function')\
        .getOrCreate()

df = sc.read.csv('USvideos.csv')
df = sc.read.option('header', 'true').csv('USvideos.csv')
df.show(2)

Critical PySpark Functions

PySpark Functions

1. select ()

The select function helps in selecting only the required columns. The dataset has 16 columns out of which we want to select 3 columns, the select function should be used for that. Always remember the PySpark DataFrame are Immutable, the functions after execution should always be stored in a new DataFrame.

selected_df = df.select('title', 'channel_title' ,'likes', 'dislikes')
selected_df.show(10)

2. filter ()

The filter function is used for filtering the rows based on a given condition.

selected_df.filter(selected_df.channel_title == 'Vox').show()

PySpark filter function can further filter based on multiple conditions. In the above DataFrame we can filter with ‘channel_title’ as ‘Vox’ and the likes should be more than 20K. Before that let’s take a total count of the DataFrame using the count() function

selected_df.filter((selected_df.channel_title == 'Vox')).count() # returns 193
selected_df.filter((selected_df.channel_title == 'Vox') & (selected_df.likes > 20000)).show(5)

selected_df.filter((selected_df.channel_title == 'Vox') & (selected_df.likes > 20000)).count()

returns 44, after applying another filtering criteria no of rows reduced from 193 to 44.

isNull and isNotNull functions are used to identify the null values in DataFrame, these functions can be combined with the filter function for filtering the null columns.

df.filter((df.title.isNotNull()) & (df.likes.isNotNull()))

3. groupBy ()

The groupBy function is used to collect similar data into groups and helps in executing aggregate functions on the grouped data.

grouped_df = selected_df.groupBy('title')
grouped_df.count().show()

For identifying maximum likes on each title, the max function can be chained with groupBy

selected_df.groupBy('title').max('likes').show() 

4. orderBy ()

The orderBy function is used for sorting the DataFrame based on the column.

selected_df.orderBy('likes', ascending=False).show()

In case orderBy needs to be applied on multiple columns, say orderBy ‘likes’ and ‘dislikes’ in descending order

columns = ['likes', 'dislikes']
selected_df.orderBy(columns, ascending=False).show(10)

5. when

The “when” function takes the condition and applies the condition on the DataFrame and returns values. In the example, we select the “title” based on the number of likes, here we are providing the condition as more than a million likes.

selected_df = selected_df.orderBy(columns, ascending=False)
selected_df.select("title", when(selected_df.likes >= 100000, df.likes).alias("Million Views")).show()

Summary

The article covered the 5 most important functions of PySpark after Data Import, there are lot more functions in PySpark which will be covered in the upcoming articles on PySpark.