Replies: 3 comments
-
|
A possible solution is using multiple DAGs (every run triggers multiple dag run). But it is not practical in our case, as it would require launching numerous DAG runs each time, making it difficult to visualize dependencies and resulting in a poor user experience." |
Beta Was this translation helpful? Give feedback.
-
|
Additional context: This is a deep learning use case. Our DAG contains a complete pipeline including data preprocessing, feature engineering, and model training. Each time we trigger a DAG run, we need to selectively run specific stages—for example, sometimes only data cleaning, sometimes only model training with existing cleaned data, or sometimes the full pipeline from scratch." |
Beta Was this translation helpful? Give feedback.
-
|
DAG Versioning is not the right tool here — it is designed for schema/structural changes over time, not for runtime task selection. Dynamically generating a new DAG per run also bypasses Airflow's scheduler entirely, which leads to parse overhead and confusing audit history. For a 150-task ML pipeline where each run only executes a subset, there are three patterns worth considering: Pattern 1: ShortCircuitOperator at group boundaries (fastest fix)If the 10-minute wait is from evaluating individual skip conditions per task, move the skip decision upstream to a from airflow.operators.python import ShortCircuitOperator
from airflow.utils.task_group import TaskGroup
def should_run_preprocessing(**context) -> bool:
run_conf = context["dag_run"].conf or {}
return "preprocessing" in run_conf.get("stages", ["preprocessing", "training"])
with TaskGroup("preprocessing") as preprocessing_group:
gate = ShortCircuitOperator(
task_id="gate_preprocessing",
python_callable=should_run_preprocessing,
)
clean = PythonOperator(task_id="clean_data", ...)
features = PythonOperator(task_id="engineer_features", ...)
gate >> clean >> features
with TaskGroup("training") as training_group:
gate2 = ShortCircuitOperator(
task_id="gate_training",
python_callable=lambda **ctx: "training" in (ctx["dag_run"].conf or {}).get("stages", ["preprocessing", "training"]),
)
train = PythonOperator(task_id="train_model", ...)
gate2 >> train
preprocessing_group >> training_groupWhen Pattern 2: BranchPythonOperator for mutually exclusive pathsIf the stages are mutually exclusive (e.g. "only preprocessing" vs "only training" vs "full pipeline"), use branching: from airflow.operators.python import BranchPythonOperator
def choose_pipeline(**context):
stages = context["dag_run"].conf.get("stages", ["preprocessing", "training"])
if stages == ["preprocessing"]:
return "preprocessing.gate_preprocessing"
elif stages == ["training"]:
return "training.gate_training"
return ["preprocessing.gate_preprocessing", "training.gate_training"] # full pipeline
branch = BranchPythonOperator(
task_id="route_pipeline",
python_callable=choose_pipeline,
)Pattern 3: Dynamic Task Mapping (Airflow 2.3+ / 3.x, cleanest for ML)For your specific ML use case, dynamic task mapping lets Airflow create only the tasks that are actually needed at parse time: @task
def get_stages_to_run(**context) -> list[str]:
return context["dag_run"].conf.get("stages", ["preprocessing", "training"])
@task
def run_stage(stage_name: str):
stage_map = {
"preprocessing": run_preprocessing,
"training": run_training,
}
stage_map[stage_name]()
stages = get_stages_to_run()
run_stage.expand(stage_name=stages)With this, Airflow creates only the mapped task instances for the stages that were requested — no skip logic at all. Recommendation for your caseGiven that you have a deep learning pipeline with independent stages (data cleaning → feature engineering → model training):
DAG Versioning is worth enabling for schema management, but it should not be used as a mechanism for runtime task selection. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi Airflow community,
We're facing a challenge with a large DAG (150+ tasks) and would appreciate your advice on best practices.
Current Setup:
• We have a complex DAG with 150+ tasks
• Each DAG run only needs to execute a subset of these tasks
• Currently we use conditional logic to skip tasks that aren't needed
• The problem: When we want to run very downstream tasks, we have to wait for all upstream tasks to be evaluated and skipped, which takes 10+ minutes due to DAG complexity
Proposed Solution: We're considering using DAG Versioning combined with Dynamic DAG Generation:
Is this an appropriate use case for DAG Versioning? Is there a better pattern for this use case?
Thanks in advance!
Beta Was this translation helpful? Give feedback.
All reactions