Operators In Apache Airflow

Introduction

The article explains briefly what Operators in Airflow are and the different types of Operators with example. In a DAG the tasks are executed through Operators, in Airflow multiple Operators together form a workflow and we can also define the execution order of various Operators.

There are various in-built Operators in Airflow for performing specific tasks like PythonOperator which can be used to run a Python function, SimpleHTTPOperator can be used to invoke a REST API and handle responses, EmailOperator used to send an email and to interact with Databases there are several operators like MySQLOperator for MySQL, SqlliteOperator to interact with SQLite, PostgresOperator, OracleOperator.

Let’s understand by a use-case. Say the organization receives a CSV file every day on S3, then aggregation and normalization are of the CSV file is required say using Pandas and saves the data to the SQLite and after processing, email the engineering team about the completion. Upon looking closely, we can say that each step is a task in a DAG and all of them can be handled by specific operators in Airflow.

Types of Operators

Operators in Airflow fall under three categories Action, Transfer, and Sensor. Let’s explore each of them and what types of Operators falls under these categories

Action Operator

The Action Operators in Airflow are the Operators which are used to perform some action, like trigger HTTP request using SimpleHTTPOperator or execute a Python function using PythonOperator or trigger an email using the EmailOperator. The naming convention in Airflow is very clean, simply by looking at the name of Operator we can identify under what category the Operator is. The action operators reside under the module “airflow.operators”

An example of Action Operator is SimpleHTTPOperator, let’s invoke a REST service and handle the response

with DAG('getitems', start_date=datetime(2022, 1, 1), schedule_interval="*/5 * * * *") as dag:
    task = SimpleHttpOperator(
        task_id='items',
        method='GET',
        http_conn_id='get_items',
        endpoint='/items',
        headers={"Content-Type": "application/json"},
        response_check= lambda response: True if 200 in response.status_code else  False, dag=dag)

task

Transfer Operator

The main function of Transfer Operator is used to transfer from Source to Destination, like for transferring S3 to Redshift “S3ToRedshiftOperator” can be used, for transferring data from S3 to Google Cloud Storage “S3ToGCSOperator” can be used, similarly from transferring data from Oracle to GCS “OracleToGCSOperator” should be used. For transferring data from the Local file system to GCS “LocalFilesystemToGCSOperator” can be utilized.

with DAG(‘s3redshift’, start_date=datetime(2022, 1, 1), schedule_interval=" 0 15 * 12 *") as dag:
task = S3ToRedshiftOperator(
        s3_bucket=S3_BUCKET_CONFIG,
        s3_key=S3_KEY_CONFIG,
        schema="PUBLIC",
        table=Test,
        copy_options=['csv'],
        task_id='s3ToRedshitTask',
    )
task

Sensor

The sensor in Airflow is designed to wait for a successful response to receive or wait until retries time out, it’s like a blocking call we cannot move to the next task until the sensor execution is completed. For example, SqlSensor executes SQL Statement repeatedly say every 30 seconds until the condition is satisfied, HttpSensor waits until the response from the Rest API is received.

with DAG('getitems', start_date=datetime(2022, 1, 1), schedule_interval="*/5 * * * *") as dag:
    task = HttpSensor(
        task_id='items',
        method='GET',
        http_conn_id='get_items',
        endpoint='/items',
        poke_interval = 10,
        response_check= lambda response: True if 200 in response.status_code else  False, dag=dag)

task

Complete Example

An example is very simple a simple PythonOperator which will invoke a Python function that adds two numbers repeatedly every 5 minutes, after execution will verify in logs in Airflow UI

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator


def add():
    val = 1 + 2
    print("Adding Numbers:", val)


with DAG('addition_dag', start_date=datetime(2022, 1, 1), schedule_interval="*/15 * * * *") as dag:
    task = PythonOperator(
        task_id='add',
        python_callable=add,
        dag=dag)
 
task

Airflow UI

DAG execution is successful, let’s check logs.

Summary

In the article, we covered the basics of Operators, the types of Operators with examples,s and one complete example of PythonOperator. All this is groundwork, from the next article onwards more complex examples will be covered like Transfers, hooks, and Sensors.