Writing DAGs
Use the TaskFlow API (@task decorator) for Python tasks — it's cleaner than classic operators. Use branching for conditional paths, sensors for waiting on external events, and trigger rules to control when tasks run despite upstream failures.
Explain Like I'm 12
Writing a DAG is like programming a robot to do your chores. The TaskFlow API lets you write each chore as a simple function. Branching is telling the robot "if it's raining, do laundry inside; otherwise, hang it outside." Sensors are the robot waiting by the mailbox until the mail arrives. Trigger rules tell the robot "clean up the kitchen even if dinner burned."
TaskFlow API (Modern Way)
The TaskFlow API (Airflow 2.0+) uses Python decorators. It's cleaner than classic operators and handles XCom automatically.
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example"]
)
def my_etl():
@task
def extract():
"""Pull data from API"""
return {"users": 100, "revenue": 5000}
@task
def transform(raw_data: dict):
"""Clean and enrich"""
raw_data["avg_revenue"] = raw_data["revenue"] / raw_data["users"]
return raw_data
@task
def load(clean_data: dict):
"""Write to warehouse"""
print(f"Loading: {clean_data}")
# Dependencies are inferred from function calls!
raw = extract()
clean = transform(raw)
load(clean)
my_etl() # Instantiate the DAG
@task) for Python functions. Use classic operators (BashOperator, SqlOperator) for non-Python work. You can mix both in the same DAG.
Branching — Conditional Paths
Use @task.branch to choose which downstream path to follow at runtime.
@task.branch
def check_data_quality(row_count: int):
if row_count > 0:
return "process_data" # task_id to run
else:
return "send_alert" # skip process_data, run alert
@task
def process_data():
print("Processing...")
@task
def send_alert():
print("No data! Alerting team.")
count = extract()
check_data_quality(count) >> [process_data(), send_alert()]
task_id of the path to follow. All other branches are skipped. Downstream tasks after the branch can use trigger_rule="none_failed_min_one_success" to rejoin paths.
Sensors — Wait for Events
Sensors are special operators that wait until a condition is true before allowing downstream tasks to run.
from airflow.sensors.filesystem import FileSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
# Wait for a file to appear
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/data/incoming/daily_export.csv",
poke_interval=60, # Check every 60 seconds
timeout=3600, # Give up after 1 hour
mode="poke" # or "reschedule" to free the worker slot
)
# Wait for S3 object
wait_for_s3 = S3KeySensor(
task_id="wait_for_s3",
bucket_key="s3://my-bucket/data/{{ ds }}/export.parquet",
mode="reschedule" # Frees the slot between pokes
)
mode="poke" holds the worker slot while waiting (blocks others). mode="reschedule" releases the slot between checks. Always use reschedule in production for long waits.
Trigger Rules
By default, a task runs only when ALL upstream tasks succeed. Trigger rules let you change this.
| Rule | Runs when | Use case |
|---|---|---|
all_success | All upstreams succeeded (default) | Normal flow |
all_failed | All upstreams failed | Error-only cleanup |
one_success | At least one upstream succeeded | Fan-in after branch |
one_failed | At least one upstream failed | Alert on partial failure |
none_failed | No upstreams failed (success or skipped) | Rejoin after branching |
all_done | All upstreams finished (any state) | Cleanup tasks that must always run |
from airflow.utils.trigger_rule import TriggerRule
cleanup = PythonOperator(
task_id="cleanup",
python_callable=cleanup_temp_files,
trigger_rule=TriggerRule.ALL_DONE # Always runs
)
Dynamic DAGs
Since DAGs are Python, you can generate tasks dynamically with loops.
tables = ["users", "orders", "products", "events"]
@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def ingest_all_tables():
@task
def extract_table(table_name: str):
print(f"Extracting {table_name}")
return f"s3://bucket/{table_name}.parquet"
@task
def load_table(table_name: str, path: str):
print(f"Loading {table_name} from {path}")
for table in tables:
path = extract_table(table)
load_table(table, path)
ingest_all_tables()
@task
def get_tables():
return ["users", "orders", "products"]
@task
def process(table: str):
print(f"Processing {table}")
tables = get_tables()
process.expand(table=tables) # Creates N parallel tasks at runtime
Best Practices
- Idempotent tasks. Running a task twice with the same input should produce the same result. Use
INSERT ... ON CONFLICT, notINSERTalone. - Atomic tasks. Each task does one thing. Don't combine extract+transform in one task — if transform fails, you have to re-extract.
- No top-level code in DAG files. Don't run queries or API calls at DAG parse time. Put logic inside operators/tasks.
- Use Connections, not hardcoded credentials. Store secrets in Airflow Connections or a secrets backend.
- Template with Jinja. Use
{{ ds }},{{ data_interval_start }}for dates instead ofdatetime.now(). - Set timeouts. Always set
execution_timeouton tasks anddagrun_timeouton DAGs to prevent runaway jobs. - Test locally. Use
airflow tasks test dag_id task_id 2024-01-01to run a single task without the scheduler.
Test Yourself
Q: What is the TaskFlow API and why is it preferred?
@task and @dag decorators to define DAGs as Python functions. Preferred because: (1) cleaner syntax, (2) automatic XCom handling (return values are pushed/pulled automatically), (3) dependencies inferred from function calls.Q: What trigger_rule would you use for a cleanup task that must always run?
trigger_rule="all_done" — runs after all upstreams finish, regardless of their state (success, failed, or skipped).Q: Why should tasks be idempotent?
Interview Questions
Q: How do you handle a DAG that needs to process 1000 files, each independently?
process.expand(file=get_file_list()). This creates N parallel tasks at runtime based on the actual file count. Before 2.3, you'd generate tasks in a loop at DAG parse time or use a pool to limit concurrency.Q: What is the difference between poke and reschedule mode for sensors?
poke: the sensor occupies a worker slot for its entire wait time, sleeping between checks. reschedule: the sensor releases the worker slot between checks, only occupying it during the actual poke. Use reschedule for long waits to avoid blocking workers.Q: How do you pass large datasets between tasks?
Q: How would you test a DAG before deploying to production?
python dags/my_dag.py — syntax check (imports succeed, no parse errors). (2) airflow dags test my_dag 2024-01-01 — run the entire DAG locally. (3) airflow tasks test my_dag task_id 2024-01-01 — test a single task. (4) Write unit tests with pytest, mocking external dependencies. (5) Use DAG validation tests to check for cycles, missing owners, etc.