Cease Creating Dangerous DAGs — Optimize Your Airflow Atmosphere By Bettering Your Python Code | by Alvaro Leandro Cavalcante Carneiro | Jan, 2025

Apache Airflow is without doubt one of the hottest orchestration instruments within the information subject, powering workflows for corporations worldwide. Nonetheless, anybody who has already labored with Airflow in a manufacturing surroundings, particularly in a posh one, is aware of that it might probably sometimes current some issues and peculiar bugs.
Among the many many elements it’s essential handle in an Airflow surroundings, one crucial metric usually flies beneath the radar: DAG parse time. Monitoring and optimizing parse time is important to keep away from efficiency bottlenecks and make sure the right functioning of your orchestrations, as we’ll discover on this article.
That mentioned, this tutorial goals to introduce airflow-parse-bench
, an open-source instrument I developed to assist information engineers monitor and optimize their Airflow environments, offering insights to cut back code complexity and parse time.
Relating to Airflow, DAG parse time is commonly an missed metric. Parsing happens each time Airflow processes your Python recordsdata to construct the DAGs dynamically.
By default, all of your DAGs are parsed each 30 seconds — a frequency managed by the configuration variable min_file_process_interval. Which means that each 30 seconds, all of the Python code that’s current in your dags
folder is learn, imported, and processed to generate DAG objects containing the duties to be scheduled. Efficiently processed recordsdata are then added to the DAG Bag.
Two key Airflow parts deal with this course of:
Collectively, each parts (generally known as the dag processor) are executed by the Airflow Scheduler, guaranteeing that your DAG objects are up to date earlier than being triggered. Nonetheless, for scalability and safety causes, it’s also potential to run your dag processor as a separate element in your cluster.
In case your surroundings solely has a number of dozen DAGs, it’s unlikely that the parsing course of will trigger any sort of drawback. Nonetheless, it’s frequent to seek out manufacturing environments with a whole lot and even hundreds of DAGs. On this case, in case your parse time is simply too excessive, it might probably result in:
- Delay DAG scheduling.
- Improve useful resource utilization.
- Atmosphere heartbeat points.
- Scheduler failures.
- Extreme CPU and reminiscence utilization, losing assets.
Now, think about having an surroundings with a whole lot of DAGs containing unnecessarily advanced parsing logic. Small inefficiencies can rapidly flip into vital issues, affecting the soundness and efficiency of your complete Airflow setup.
When writing Airflow DAGs, there are some necessary finest practices to remember to create optimized code. Though yow will discover plenty of tutorials on the best way to enhance your DAGs, I’ll summarize a few of the key ideas that may considerably improve your DAG efficiency.
Restrict Prime-Stage Code
One of the frequent causes of excessive DAG parsing instances is inefficient or advanced top-level code. Prime-level code in an Airflow DAG file is executed each time the Scheduler parses the file. If this code consists of resource-intensive operations, comparable to database queries, API calls, or dynamic process era, it might probably considerably impression parsing efficiency.
The next code reveals an instance of a non-optimized DAG:
On this case, each time the file is parsed by the Scheduler, the top-level code is executed, making an API request and processing the DataFrame, which may considerably impression the parse time.
One other necessary issue contributing to gradual parsing is top-level imports. Each library imported on the prime stage is loaded into reminiscence throughout parsing, which might be time-consuming. To keep away from this, you may transfer imports into features or process definitions.
The next code reveals a greater model of the identical DAG:
Keep away from Xcoms and Variables in Prime-Stage Code
Nonetheless speaking about the identical matter, is especially fascinating to keep away from utilizing Xcoms and Variables in your top-level code. As acknowledged by Google documentation:
In case you are utilizing Variable.get() in prime stage code, each time the .py file is parsed, Airflow executes a Variable.get() which opens a session to the DB. This will dramatically decelerate parse instances.
To deal with this, think about using a JSON dictionary to retrieve a number of variables in a single database question, fairly than making a number of Variable.get()
calls. Alternatively, use Jinja templates, as variables retrieved this manner are solely processed throughout process execution, not throughout DAG parsing.
Take away Pointless DAGs
Though it appears apparent, it’s at all times necessary to recollect to periodically clear up pointless DAGs and recordsdata out of your surroundings:
- Take away unused DAGs: Test your
dags
folder and delete any recordsdata which might be not wanted. - Use
.airflowignore
: Specify the recordsdata Airflow ought to deliberately ignore, skipping parsing. - Assessment paused DAGs: Paused DAGs are nonetheless parsed by the Scheduler, consuming assets. If they’re not required, take into account eradicating or archiving them.
Change Airflow Configurations
Lastly, you might change some Airflow configurations to cut back the Scheduler useful resource utilization:
min_file_process_interval
: This setting controls how usually (in seconds) Airflow parses your DAG recordsdata. Growing it from the default 30 seconds can scale back the Scheduler’s load at the price of slower DAG updates.dag_dir_list_interval
: This determines how usually (in seconds) Airflow scans thedags
listing for brand new DAGs. In the event you deploy new DAGs occasionally, take into account rising this interval to cut back CPU utilization.
We’ve mentioned loads concerning the significance of making optimized DAGs to keep up a wholesome Airflow surroundings. However how do you really measure the parse time of your DAGs? Luckily, there are a number of methods to do that, relying in your Airflow deployment or working system.
For instance, in case you have a Cloud Composer deployment, you may simply retrieve a DAG parse report by executing the next command on Google CLI:
gcloud composer environments run $ENVIRONMENT_NAME
— location $LOCATION
dags report
Whereas retrieving parse metrics is easy, measuring the effectiveness of your code optimizations might be much less so. Each time you modify your code, it’s essential redeploy the up to date Python file to your cloud supplier, await the DAG to be parsed, after which extract a brand new report — a gradual and time-consuming course of.
One other potential method, in case you’re on Linux or Mac, is to run this command to measure the parse time domestically in your machine:
time python airflow/example_dags/instance.py
Nonetheless, whereas easy, this method isn’t sensible for systematically measuring and evaluating the parse instances of a number of DAGs.
To deal with these challenges, I created the
airflow-parse-bench
, a Python library that simplifies measuring and evaluating the parse instances of your DAGs utilizing Airflow’s native parse technique.
The airflow-parse-bench
instrument makes it simple to retailer parse instances, evaluate outcomes, and standardize comparisons throughout your DAGs.
Putting in the Library
Earlier than set up, it’s advisable to make use of a virtualenv to keep away from library conflicts. As soon as arrange, you may set up the package deal by operating the next command:
pip set up airflow-parse-bench
Be aware: This command solely installs the important dependencies (associated to Airflow and Airflow suppliers). You should manually set up any further libraries your DAGs rely upon.
For instance, if a DAG makes use of boto3
to work together with AWS, be sure that boto3
is put in in your surroundings. In any other case, you will encounter parse errors.
After that, it’s a necessity to initialize your Airflow database. This may be completed by executing the next command:
airflow db init
As well as, in case your DAGs use Airflow Variables, you should outline them domestically as effectively. Nonetheless, it’s not crucial to place actual values in your variables, because the precise values aren’t required for parsing functions:
airflow variables set MY_VARIABLE 'ANY TEST VALUE'
With out this, you’ll encounter an error like:
error: 'Variable MY_VARIABLE doesn't exist'
Utilizing the Device
After putting in the library, you may start measuring parse instances. For instance, suppose you’ve gotten a DAG file named dag_test.py
containing the non-optimized DAG code used within the instance above.
To measure its parse time, merely run:
airflow-parse-bench --path dag_test.py
This execution produces the next output:
As noticed, our DAG offered a parse time of 0.61 seconds. If I run the command once more, I’ll see some small variations, as parse instances can differ barely throughout runs as a result of system and environmental elements:
With a view to current a extra concise quantity, it’s potential to combination a number of executions by specifying the variety of iterations:
airflow-parse-bench --path dag_test.py --num-iterations 5
Though it takes a bit longer to complete, this calculates the common parse time throughout 5 executions.
Now, to judge the impression of the aforementioned optimizations, I changed the code in mydag_test.py
with the optimized model shared earlier. After executing the identical command, I received the next consequence:
As observed, simply making use of some good practices was able to lowering nearly 0.5 seconds within the DAG parse time, highlighting the significance of the adjustments we made!
There are different fascinating options that I believe it’s related to share.
As a reminder, in case you have any doubts or issues utilizing the instrument, you may entry the entire documentation on GitHub.
Apart from that, to view all of the parameters supported by the library, merely run:
airflow-parse-bench --help
Testing A number of DAGs
Usually, you doubtless have dozens of DAGs to check the parse instances. To deal with this use case, I created a folder named dags
and put 4 Python recordsdata inside it.
To measure the parse instances for all of the DAGs in a folder, it is simply essential to specify the folder path within the --path
parameter:
airflow-parse-bench --path my_path/dags
Working this command produces a desk summarizing the parse instances for all of the DAGs within the folder:
By default, the desk is sorted from the quickest to the slowest DAG. Nonetheless, you may reverse the order by utilizing the --order
parameter:
airflow-parse-bench --path my_path/dags --order desc
Skipping Unchanged DAGs
The --skip-unchanged
parameter might be particularly helpful throughout growth. Because the title suggests, this selection skips the parse execution for DAGs that have not been modified for the reason that final execution:
airflow-parse-bench --path my_path/dags --skip-unchanged
As proven beneath, when the DAGs stay unchanged, the output displays no distinction in parse instances:
Resetting the Database
All DAG info, together with metrics and historical past, is saved in a neighborhood SQLite database. If you wish to clear all saved information and begin contemporary, use the --reset-db
flag:
airflow-parse-bench --path my_path/dags --reset-db
This command resets the database and processes the DAGs as if it had been the primary execution.
Parse time is a vital metric for sustaining scalable and environment friendly Airflow environments, particularly as your orchestration necessities turn out to be more and more advanced.
Because of this, the airflow-parse-bench
library might be an necessary instrument for serving to information engineers create higher DAGs. By testing your DAGs’ parse time domestically, you may simply and rapidly discover your code bottleneck, making your dags sooner and extra performant.
Because the code is executed domestically, the produced parse time received’t be the identical because the one current in your Airflow cluster. Nonetheless, if you’ll be able to scale back the parse time in your native machine, the identical is perhaps reproduced in your cloud surroundings.
Lastly, this mission is open for collaboration! In case you have strategies, concepts, or enhancements, be at liberty to contribute on GitHub.