Trigger Airflow DAGs via the REST API

This post will discuss how to use the REST api in Airflow 2 to trigger the run of a DAG as well as pass parameters that can be used in the run.
Data Management
Author

Brock Tibert

Published

August 25, 2021

Data Management

This post will discuss how to use the REST api in Airflow 2 to trigger the run of a DAG as well as pass parameters that can be used in the run. I will not go over how to get setup and install Airflow, but I will say that the documentation is pretty straight forward as long as you follow it step-by-step.

Step 1 - Enable the REST API

By default, airflow does not accept requests made to the API. However, it’s easy enough to turn on:

# auth_backend = airflow.api.auth.backend.deny_all
auth_backend = airflow.api.auth.backend.basic_auth

Above I am commenting out the original line, and including the basic auth scheme.

To be validated by the API, we simply need to pass an Authorization header and the base64 encoded form of username:password where username and password are for the user created in Airflow.

For example:

Example

Above, I have blurred out a series of text, but that is the username:password Base64 encoded. There are plenty of tools on the web that can encode this for you.

NOTE: You see the encoded information is prefaced by Basic.

Step 2: Test the API by Listing Dags

With above in place, we can list the dags in Airflow easily via /dags.

Because I am running locally, it’s as simple as a GET request to http://localhost:8080/api/v1/dags. Just remember to include the Authorization bits. The call to list the DAGs is also shown in the screenshot above.

Step 3: The DAG setup and configuration

Of course, if we are going to pass information to the DAG, we would expect the tasks to be able to consume and use that information. Below provides snippets of my DAG to help refer to the core pieces.

# airflow bits
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

# a function to read the parameters passed
def main_task(ti, **context):
    # the sql
    SQL = """
    select count(a.dim_user_id) as total
    from table1 a,
        table2 b
    where a.dim_account_id = b.id
    and   b.global_id = {}
    and   a.dim_site_id = {}
    and   a.message NOT IN ('JOINED', 'LEFT')
    and   a.activity_date between '{}' and '{}'
    """
    
    # connect to redshift
    rs = connect_redshift()  
    
    # extract the parameters passed from the REST API Trigger 
    gid = context['dag_run'].conf['gid']
    sid = context['dag_run'].conf['sid']
    date_start = context['dag_run'].conf['date_start']
    date_end = context['dag_run'].conf['date_end']
    
    # build the SQL and get the data
    SQL = SQL.format(gid, sid, date_start, date_end)
    df = rs.redshift_to_pandas(SQL)
    
    # do more things and 


args = {
    'owner': 'brock',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['brocktibert@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=7),
    'schedule_interval': None,
    'provide_context': True
}

dag = DAG(
    dag_id='rest-trigger',
    default_args=args,
    tags=["brock"]
)

t1 = PythonOperator(task_id = "main_etl", 
                    python_callable = main_task,
                    dag = dag)

Let’s review:

  1. We are importing the airflow bits that we need.
  2. I am not showing all of my code, or even the real code I created for the client, but I am simply creating a function that reads from **context and extracts the parameters from the DAG’s conf object, where those params are stored. This will make more sense in a moment.
  3. I am adding provide_context=True to the args.
  4. I use the function main_task in a PythonOperator and associate it to the DAG.

Note: The dag_id is really important. This is what is available airflow uses as the id for a DAG. Please be mindful of the values you use here.

Step 4. Trigger the dag

Earlier in the screenshot we saw that we can use a GET request to /dags to get a simple list. We can use a POST request to trigger the dag by name. Above, the DAG I want to trigger is called rest-trigger.

Example

Let’s review above:

  1. We are making a call to POSTing data to http://localhost:8080/api/v1/dags/rest-trigger/dagRuns. Note that we are including the dag name / dagRuns.
  2. The Authorization bits still need to be included and are not shown above, but are the same as the earlier screenshot.
  3. In the body, we are passing json and including the information that we want as inside the conf key.
  4. I am also specifiying my own ID for the dag_run_id. This will be auto generated for us, but is helpful if we have systems/logic that sits above the Airflow API.

Review the Triggered Dag

With the DAG triggered, we can use the UI to review the process, but we can also use the REST API to poll the job for it’s status via a GET call to http://localhost:8080/api/v1/dags/rest-trigger/dagRuns where again, we are passing in the dag name. In this case, we are passing in rest-trigger but you would use the name of your own dag.

Example

Closing Remarks

This was a harder to get up and running than I would like to admit. In the end,

  • If you hit issues, the logs can be helpful. print statements will be included in the logs, so if you need to, you can leverage that flow to identify hang ups.
  • The name of the dag drives a lot of the functionality, which is why I stressed this earlier in the post.
  • From the web UI, you can access the REST API docs.

While not shown above, you can pass the parameters around your DAG. Below is a simple example of taking in a single parameter and passing it around via XCOM and the ti parameter that was included but not used above.

from datetime import timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago


dag = DAG(
    dag_id="trigger-me",
    default_args={"start_date": days_ago(2), "owner": "brock", "provide_context": True},
    schedule_interval=None
)


def push(ti, **context):
    # gets the parameter gid which was passed as a key in the json of conf
    gid = context['dag_run'].conf['gid']
    # use the task
    # https://marclamberti.com/blog/airflow-xcom/
    ti.xcom_push(key='global_id', value=gid)
    return gid

def pull(ti):
    gid2 = ti.xcom_pull(key="global_id", task_ids=['pusher'])
    print(gid2)

t1 = PythonOperator(task_id = "pusher", python_callable=push, provide_context=True, dag=dag)
t2 = PythonOperator(task_id = "puller", python_callable=pull, provide_context=True, dag=dag)

t1 >> t2

That’s it. I had to piece together above from a range of sources, so I hope this helps you (and my future self) if you need to explore this functionality.