Airflow TaskGroup 사용하기
TaskGroup이란?
- Airflow 내에서 Task를 그룹화하고 모듈식 워크플로를 만들게 해주는 것
- 이전에는 SubDAG을 썼으나, Airflow 2.0부터는 TaskGroup을 사용한다.
SubDag
- parallel_dag.py
from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.subdag import SubDagOperator from subdags.subdag_parallel_dag import subdag_parallel_dag from datetime import datetime default_args = { 'start_date': datetime(2020, 1, 1) } with DAG('parallel_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag: task_1 = BashOperator( task_id='task_1', bash_command='sleep 3' ) processing = SubDagOperator( task_id='processing_tasks', subdag=subdag_parallel_dag('parallel_dag', 'processing_tasks', default_args) ) task_4 = BashOperator( task_id='task_4', bash_command='sleep 3' ) task_1 >> processing >> task_4
- subdag_parallel_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
def subdag_parallel_dag(parent_dag_id, child_dag_id, default_args):
with DAG(dag_id=f'{parent_dag_id}.{child_dag_id}', default_args=default_args) as dag:
task_2 = BashOperator(
task_id='task_2',
bash_command='sleep 3'
)
task_3 = BashOperator(
task_id='task_3',
bash_command='sleep 3'
)
return dag
-
파일 구조
-
UI 확인
SubDAG은 이제 쓰지 않는다
- 그러나 SubDAG는 실제로 다른 DAG에 포함된(embedded) DAG였다. 이로 인해 성능 및 기능 문제가 모두 발생했다.
- SubDAG가 트리거되면 SubDAG 및 하위 작업은 전체 SubDAG가 완료될 때까지 작업자 슬롯을 차지.(Sequential) 이로 인해 다른 작업 처리가 지연될 수 있으며 작업자 슬롯 수에 따라 교착 상태가 발생할 수 있다.
- SubDAG에는 고유한 매개변수, 일정 및 활성화된 설정이 있다. 상위 DAG와 일치하지 않으면 예기치 않은 동작이 발생할 수 있다.
TaskGroups
- parallel_dag.py
from airflow import DAG from airflow.operators.bash import BashOperator from airflow.utils.task_group import TaskGroup from datetime import datetime default_args = { 'start_date': datetime(2020, 1, 1) } with DAG('parallel_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag: task_1 = BashOperator( task_id='task_1', bash_command='sleep 3' ) with TaskGroup('processing_tasks') as processing_tasks: task_2 = BashOperator( task_id='task_2', bash_command='sleep 3' ) task_3 = BashOperator( task_id='task_3', bash_command='sleep 3' ) task_4 = BashOperator( task_id='task_4', bash_command='sleep 3' ) task_1 >> processing_tasks >> task_4
- UI확인
Subgroups in Taskgroup
- parallel_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
with DAG('parallel_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
task_1 = BashOperator(
task_id='task_1',
bash_command='sleep 3'
)
with TaskGroup('processing_tasks') as processing_tasks:
task_2 = BashOperator(
task_id='task_2',
bash_command='sleep 3'
)
with TaskGroup('spark_tasks') as spark_tasks:
task_3 = BashOperator(
task_id='task_3', # spark_tasks.task_3
bash_command='sleep 3'
)
with TaskGroup('flink_tasks') as flink_tasks:
task_3 = BashOperator(
task_id='task_3',
bash_command='sleep 3'
)
task_4 = BashOperator(
task_id='task_4',
bash_command='sleep 3'
)
task_1 >> processing_tasks >> task_4
- UI 확인