Monitor your Apache Airflow Workflows with Twilio

July 27, 2020
Written by
Mwangi Kabiru
Contributor
Opinions expressed by Twilio contributors are their own

Monitor Your Apache Airflow Workflows with Twilio

Airflow is an open source platform used to orchestrate workflows. It is perfect for Extract, Transform, Load tasks, data migration and data integration, among other jobs.

In this tutorial we will see how we can leverage Twilio’s Programmable Messaging to set up an alerting system for Airflow jobs.

Dag - Graph view

Airflow DAG (source: Apache Airflow)

Tutorial requirements

To follow this tutorial you need to have:

Set up Airflow to run locally

We are going to start off by cloning a repository that contains the docker image we shall use for this tutorial. The image is based on Python 3.7-slim-buster and will create Postgres and Redis containers. Postgres is used for the backend, while Redis is for the queue.

Installation & Setup

In this tutorial, I am going to use an existing docker-airflow image. If you prefer to install Airflow locally, please find detailed instructions here.

Enter the following commands in your terminal window to clone the docker-airflow repository:

$ git clone https://github.com/puckel/docker-airflow.git
$ cd docker-airflow

Run the following command to start an Airflow instance:

$ docker-compose -f docker-compose-LocalExecutor.yml up

If everything is set up correctly, you should see the Airflow web server UI on visiting the http://localhost:8080/ URL in your browser. Please note that the code base comes with a default tutorial entry that we are not going to use.

Airflow UI

Creating a DAG

Airflow jobs are authored as Directed Acyclic Graphs (DAGs). Let’s break down this term just in case you have never come across it. In this context, a graph is a finite set of vertices connected by edges. Each of these nodes represents a task within the DAG. The word ‘directed’ means that each vertex in the graph has a specific direction, while acyclic means that if you start at a node n and follow a set of vertices, you can never end back at n. Airflow allows you to schedule DAGs based on the frequency you want them to run; for example, daily, weekly, etc.

For the first example, we are going to fetch data from the Star Wars API (SWAPI) and then output a ‘Hello World’. Let’s get started.

Open a new terminal window and navigate to the docker-airflow folder, then enter the following command:

$ cd dags

This sub-folder is where our DAGs will live. Let us set up a new one, by creating a file hello.py in this sub-folder. Copy the following code into the file:

import json
import requests
from os.path import join as pjoin

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'swapi'
BASE_URL = 'https://swapi.dev/api/'

default_args = {
    'start_date': days_ago(2),
    'owner': 'Airflow',
    'catchup': False
}


def fetch_resource(resource):
    endpoint = BASE_URL + resource + '/'

    response = requests.get(endpoint)

    if not (200 <= response.status_code < 299):
        raise Exception(response.status_code, response.text)

    payload = response.json()

    return payload


with DAG(DAG_NAME, default_args=default_args, schedule_interval='@daily') as dag:
    resource = 'films'

    fetch_resource_task = PythonOperator(
        task_id=f'fetch_{resource}_resource_task',
        python_callable=fetch_resource,
        op_kwargs={'resource': resource},
        dag=dag)

    hello_world_task = BashOperator(
        task_id='second_task',
        bash_command='echo Hello World',
        dag=dag)

    fetch_resource_task >> hello_world_task

DAG_NAME is a variable we create to contain the name of the DAG. The default_args are arguments that are shared between different tasks. BASE_URL is the root URL for SWAPI. The fetch_resource function uses the requests library to query SWAPI. We expect it to return a HTTP 2xx status code if successful, otherwise we raise an exception. Note that the requests library is already installed in the docker image we are using.

The with statement is where we create our DAG. We pass its name, the default arguments, and the schedule interval. Our first task, fetch_resource_task, executes the fetch_resource function using Airflow’s PythonOperator. The second task, hello_world_task, uses the BashOperator to print ‘Hello World’.

If you visit http://localhost:8080/ in your browser and everything is set up successfully, you should see the DAG we just created, swapi, right above the tutorial DAG. You may need to refresh the page if you already had it open from before.

SWAPI DAG in Airflow UI

By default new DAGs will have the toggle button to the left of the name set to “Off”. Set it to “On” by clicking on the button. Now you can run the DAG by clicking the little play button on the right under the “Links” column. The “Recent Tasks” column monitors the status of the current run.

Once successful, the first circle under the “Recent Tasks” column will turn green. At that point, click on the green circle, then on the “second_task” entry at the top and then on “Log”. You should be able to see the following logs output. Confirm that you can see the “Hello World” that this task prints.

Airflow task log

The first task, which has the name fetch_films_resource_task, returns a data structure with Star Wars films. Feel free to review the logs of this task to see it.

Integrating the Project with Twilio

Honestly, the second task does not really fit as a follow up job after the first one. Yes, we saw the “Hello World”, but this task does nothing with the data that we fetched from SWAPI in the first task. It does not sound like a real life workflow. How about we write the data to .json files that could be used as fixtures for a database?

Add the following function just below fetch_resource:

def write_resource_in_file(resource, **context):
    payload = context['task_instance'].xcom_pull(task_ids='fetch_resource_task')

    with open(f'{BASE_FIXTURE_DIR}/{resource}.json', 'w+', encoding='utf-8') as file:
        json.dump(payload, file, ensure_ascii=False, indent=2)

Add the BASE_FIXTURE_DIR variable below the BASE_URL with the location where we are going to save our data files:

BASE_FIXTURE_DIR = pjoin('workflows', 'fixtures')

With the new function, let us update our DAG to look like this:

with DAG(DAG_NAME, default_args=default_args, schedule_interval='@daily') as dag:
    resources = ['films', 'starships', 'planets']

    for resource in resources:
        fetch_resource_task = PythonOperator(
            task_id=f'fetch_{resource}_resource_task',
            python_callable=fetch_resource,
            op_kwargs={'resource': resource},
            dag=dag)

        write_resource_in_file_task = PythonOperator(
            task_id=f'write_{resource}_resource_in_file_task',
            python_callable=write_resource_in_file,
            op_kwargs={'resource': resource},
            provide_context=True,
            dag=dag)

    fetch_resource_task >> write_resource_in_file_task

This is our new workflow, fetch_resource_task >> write_resource_in_file_task, and this is going to be repeated three times, for Star Wars films, starships and planets.

Make sure that the webserver is running, and then trigger the DAG. The second task in this DAG is expected to fail, and this is intentional. Take a look at the BASE_FIXTURE_DIRS variable: we do not have a folder called workflows nor a subfolder called fixtures. Refresh the page to load the status of the job and see the failure.

failed dag

Click on the red circle under “Recent Tasks” to see a detail of the failures.

failed tasks

Imagine this task was running at 1 am when you are asleep. Wouldn't it be nice to have a way to get a notification on your phone?

This is where we get to leverage Twilio’s programmable SMS feature. After you sign up for a trial account, get a free phone number. We can use this Twilio phone number to send SMS alerts to your personal phone.

Adding Twilio to the Airflow Docker image

To be able to send SMS notifications, the first thing that we need to do is install Twilio’s Python SDK in our Docker image.

First determine what is tag that you are running for the Airflow image:

$ docker images puckel/docker-airflow
REPOSITORY              TAG                 IMAGE ID            CREATED             SIZE
puckel/docker-airflow   1.10.9              3e408baf20fe        5 months ago        797MB

As you see, my local installation is using release 1.10.9 of the Docker image. You may have a different release. Now we can create a new version of this image that includes the twilio package:

$ docker run --name airflow-twilio -it puckel/docker-airflow:1.10.9 bash -c "pip install twilio"
$ docker commit airflow-twilio puckel/docker-airflow:twilio
$ docker rm airflow-twilio

The above commands start a temporary container with the name airflow-twilio, install the Twilio SDK for Python in it, and then save the updated image with the twilio tag. Finally the temporary container is removed. Make sure you replace 1.10.9 in the first command with your version if it is different.

Once the image is built, we can replace the image in the Docker compose file. Open docker-compose-LocalExecutor.yml in your text editor, look for the webserver container and edit the image as follows:

webserver:
    image: puckel/docker-airflow:twilio
    restart: always
    ...

Please note that we only make this change in the webserver container because that is where our code will run.

Creating a Callback Function that is called on Failure

Create an environment file, twilio.env, in the docker-airflow folder of the cloned repository. Store your Twilio Account SID and Auth Token here. Your twilio.env file should look something like this:

TWILIO_NUMBER=your_twilio_number
RECIPIENT_NUMBER=your_personal_number
TWILIO_ACCOUNT_SID=your_twilio_account_sid
TWILIO_AUTH_TOKEN=your_twilio_auth_token

For the Twilio and your own phone numbers, use the E.164 format. You can find your Twilio Account SID and Auth Token in the Project Info pane of the Console Dashboard page.

Twilio dashboard

Let's go back to the docker-compose-LocalExecutor.yml file, where we'll add the environment file right below the image line. Here is an excerpt of how this will look:

webserver:
    image: puckel/docker-airflow:twilio
    env_file:
        - twilio.env
    restart: always
    ...

After this is done, proceed to update the code in hello.py to look like this:

import json
import logging
import requests
import os
from os.path import join as pjoin

from twilio.rest import Client

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


log = logging.getLogger(__name__)

DAG_NAME = 'hello'
BASE_URL = 'https://swapi.dev/api/'
BASE_FIXTURE_DIR = pjoin('workflows', 'fixtures')


account_sid = os.getenv("TWILIO_ACCOUNT_SID")
auth_token = os.getenv("TWILIO_AUTH_TOKEN")
twilio_no = os.getenv("TWILIO_NUMBER")
recipient_no = os.getenv("RECIPIENT_NUMBER")

client = Client(account_sid, auth_token)


def on_failure(notify=True):
    def callback(context):
        if notify:
            dag_id = context['dag'].dag_id
            task_id = context['task_instance'].task_id
            body = 'Task {} in dag {} failed'.format(task_id, dag_id)
            client.messages.create(body=body, from_=twilio_no, to=recipient_no)
        else:
            log.warn('Failed but notification disabled')

    return callback


def fetch_resource(resource):
    endpoint = BASE_URL + resource + '/'

    response = requests.get(endpoint)

    if not (200 <= response.status_code < 299):
        raise Exception(response.status_code, response.text)

    payload = response.json()

    return payload


def write_resource_in_file(resource, **context):
    payload = context['task_instance'].xcom_pull(
        task_ids='fetch_resource_task')

    with open(f'{BASE_FIXTURE_DIR}/{resource}.json', 'w+', encoding='utf-8') as file:
        json.dump(payload, file, ensure_ascii=False, indent=2)


default_args = {
    'start_date': days_ago(2),
    'owner': 'Airflow',
    'catchup': False,
    'on_failure_callback': on_failure(notify=True)
}


with DAG(DAG_NAME, default_args=default_args, schedule_interval='@daily') as dag:
    resources = ['films', 'starships', 'planets']

    for resource in resources:
        fetch_resource_task = PythonOperator(
            task_id=f'fetch_{resource}_resource_task',
            python_callable=fetch_resource,
            op_kwargs={'resource': resource},
            dag=dag)

        write_resource_in_file_task = PythonOperator(
            task_id=f'write_{resource}_resource_in_file_task',
            python_callable=write_resource_in_file,
            op_kwargs={'resource': resource},
            provide_context=True,
            dag=dag)

        fetch_resource_task >> write_resource_in_file_task

The changes introduced in this version of hello.py are:

  1. We created a Twilio client class that we will use to send messages. Read more on sending SMS programmatically using Python and Twilio.
  2. We added a callback function on_failure in the default_args under on_failure_callback. When the function is called with notify=True, an SMS is sent to a selected phone number from our Twilio number.
  3. The callback is instantiated with context from the failed DAG. We can use this argument to send a more explicit SMS that helps us know what went wrong. In this case body = 'Task {} in DAG {} failed'.format(task_id, dag_id) does it for us.

Trying it Out

To apply the image and environment changes we’ve made to our running instance of Airflow, go back to the terminal that is running docker-compose and stop the process by pressing Ctrl-C. Then restart everything by running docker-compose again:

$ docker-compose -f docker-compose-LocalExecutor.yml up

Once the system is up again, visit http://localhost:8080/ on your web browser and make sure the hello DAG is switched to “on”. Run it one more time to see the write_resource_in_file_task task fail three times as before. Only this time, you will get an SMS that reports each failure:

sms notification

You are welcome to reword the message body in the callback to fit your desired text.

To complete this tutorial, and just in case you are wondering how to fix the code to not produce the error, you can change the definition of the BASE_FIXTURE_DIR constant to a valid directory:

BASE_FIXTURE_DIR = 'dags'

This should get all the tasks in the DAG to pass, and also you will have the data payloads obtained from SWAPI in the dags directory.

successful dags

More Use Cases

There are other use cases where you would consider SMS notifications for monitoring your Airflow DAGs. Here are some of them:

  • Airflow has an on_success_callback argument, which can be implemented in pretty much a similar way to the failure one. If you have a set of workflows that take some time to implement and you would like to be notified when they are successfully executed, this would come in handy. Also if you need to perform a manual action such as Acceptance Testing a service before promoting to production once the task is complete.
  • You can add an argument critical that receives a boolean in the on_failure function. For this context, critical failures are those that may result in service unavailability or an upstream breakdown that has multiple effects on downstream services. This lets you get notified only when critical failures occur. Additionally, based on the significance of a workflow, you can notify several people if it fails. If you are feeling adventurous and your team is in different timezones, you can alert specific individuals depending on when the failure occurs.

Mwangi Kabiru is a Data Engineer at Wellio and a Software Engineer at Andela. Reach out to him if you have any related questions or need clarification on topics covered in this tutorial.