Operating Airflow DAG Solely If One other DAG Is Profitable
Just lately, I’ve been attempting to coordinate two Airflow DAGs such that one would solely run — by itself hourly schedule — if the opposite DAG (working each day) has been profitable.
In at the moment’s tutorial I’ll stroll you thru the use case and exhibit find out how to obtain the specified behaviour in three other ways; two utilizing the ExternalTaskSensor
and one other one utilizing a personalized strategy with PythonOperator
.
Now let’s get began with our use case that includes two Airflow DAGs.
The primary DAG, my_daily_dag
, runs daily at 5AM UTC.
from datetime import datetime, timedelta
from pathlib import Pathfrom airflow.fashions import DAG
from airflow.operators.dummy import DummyOperator
with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'proprietor': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 5 * * *',
max_active_runs=1,
) as dag:
DummyOperator(task_id='dummy_task')
The second DAG, my_hourly_dag
, runs on an hourly foundation, between 6AM and 8PM UTC.
from datetime import datetime, timedelta
from pathlib import Pathfrom airflow.fashions import DAG
from airflow.operators.dummy import DummyOperator
with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'proprietor': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 6-20 * * *', # At :00 each hour between 6AM-8PM
max_active_runs=1,
) as dag:
DummyOperator(task_id='dummy_task')
In our use case, we wish my_hourly_dag
to run provided that my_daily_dag
has ran efficiently throughout the present date. If not, then my_hourly_dag
must be skipped. It is very important point out right here that we don’t wish to set off my_hourly_dag
as quickly as my_daily_dag
succeeds. That might be achievable with TriggerDagRun
…