This post will discuss how to use the REST api in Airflow 2 to trigger the run of a DAG as well as pass parameters that can be used in the run. I will not go over how to get setup and install Airflow, but I will say that the documentation is pretty straight forward as long as you follow it step-by-step.
Step 1 - Enable the REST API
By default, airflow does not accept requests made to the API. However, it’s easy enough to turn on:
# auth_backend = airflow.api.auth.backend.deny_all
auth_backend = airflow.api.auth.backend.basic_auth
Above I am commenting out the original line, and including the basic auth scheme.
To be validated by the API, we simply need to pass an Authorization
header and the base64 encoded form of username:password
where username and password are for the user created in Airflow.
For example:
Above, I have blurred out a series of text, but that is the username:password
Base64 encoded. There are plenty of tools on the web that can encode this for you.
NOTE: You see the encoded information is prefaced by
Basic
.
Step 2: Test the API by Listing Dags
With above in place, we can list the dags in Airflow easily via /dags.
Because I am running locally, it’s as simple as a GET
request to http://localhost:8080/api/v1/dags
. Just remember to include the Authorization bits. The call to list the DAGs is also shown in the screenshot above.
Step 3: The DAG setup and configuration
Of course, if we are going to pass information to the DAG, we would expect the tasks to be able to consume and use that information. Below provides snippets of my DAG to help refer to the core pieces.
# airflow bits
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
# a function to read the parameters passed
def main_task(ti, **context):
# the sql
= """
SQL select count(a.dim_user_id) as total
from table1 a,
table2 b
where a.dim_account_id = b.id
and b.global_id = {}
and a.dim_site_id = {}
and a.message NOT IN ('JOINED', 'LEFT')
and a.activity_date between '{}' and '{}'
"""
# connect to redshift
= connect_redshift()
rs
# extract the parameters passed from the REST API Trigger
= context['dag_run'].conf['gid']
gid = context['dag_run'].conf['sid']
sid = context['dag_run'].conf['date_start']
date_start = context['dag_run'].conf['date_end']
date_end
# build the SQL and get the data
= SQL.format(gid, sid, date_start, date_end)
SQL = rs.redshift_to_pandas(SQL)
df
# do more things and
= {
args 'owner': 'brock',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['brocktibert@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=7),
'schedule_interval': None,
'provide_context': True
}
= DAG(
dag ='rest-trigger',
dag_id=args,
default_args=["brock"]
tags
)
= PythonOperator(task_id = "main_etl",
t1 = main_task,
python_callable = dag) dag
Let’s review:
- We are importing the airflow bits that we need.
- I am not showing all of my code, or even the real code I created for the client, but I am simply creating a function that reads from
**context
and extracts the parameters from the DAG’sconf
object, where those params are stored. This will make more sense in a moment. - I am adding
provide_context=True
to the args. - I use the function
main_task
in a PythonOperator and associate it to the DAG.
Note: The
dag_id
is really important. This is what is available airflow uses as the id for a DAG. Please be mindful of the values you use here.
Step 4. Trigger the dag
Earlier in the screenshot we saw that we can use a GET
request to /dags to get a simple list. We can use a POST
request to trigger the dag by name. Above, the DAG I want to trigger is called rest-trigger
.
Let’s review above:
- We are making a call to POSTing data to http://localhost:8080/api/v1/dags/rest-trigger/dagRuns. Note that we are including the dag name / dagRuns.
- The Authorization bits still need to be included and are not shown above, but are the same as the earlier screenshot.
- In the body, we are passing json and including the information that we want as inside the
conf
key. - I am also specifiying my own ID for the
dag_run_id
. This will be auto generated for us, but is helpful if we have systems/logic that sits above the Airflow API.
Review the Triggered Dag
With the DAG triggered, we can use the UI to review the process, but we can also use the REST API to poll the job for it’s status via a GET
call to http://localhost:8080/api/v1/dags/rest-trigger/dagRuns
where again, we are passing in the dag name. In this case, we are passing in rest-trigger
but you would use the name of your own dag.
Closing Remarks
This was a harder to get up and running than I would like to admit. In the end,
- If you hit issues, the logs can be helpful.
print
statements will be included in the logs, so if you need to, you can leverage that flow to identify hang ups. - The name of the dag drives a lot of the functionality, which is why I stressed this earlier in the post.
- From the web UI, you can access the REST API docs.
While not shown above, you can pass the parameters around your DAG. Below is a simple example of taking in a single parameter and passing it around via XCOM and the ti
parameter that was included but not used above.
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
= DAG(
dag ="trigger-me",
dag_id={"start_date": days_ago(2), "owner": "brock", "provide_context": True},
default_args=None
schedule_interval
)
def push(ti, **context):
# gets the parameter gid which was passed as a key in the json of conf
= context['dag_run'].conf['gid']
gid # use the task
# https://marclamberti.com/blog/airflow-xcom/
='global_id', value=gid)
ti.xcom_push(keyreturn gid
def pull(ti):
= ti.xcom_pull(key="global_id", task_ids=['pusher'])
gid2 print(gid2)
= PythonOperator(task_id = "pusher", python_callable=push, provide_context=True, dag=dag)
t1 = PythonOperator(task_id = "puller", python_callable=pull, provide_context=True, dag=dag)
t2
>> t2 t1
That’s it. I had to piece together above from a range of sources, so I hope this helps you (and my future self) if you need to explore this functionality.