Airflow TaskGroup 사용하기

TaskGroup이란?

  • Airflow 내에서 Task를 그룹화하고 모듈식 워크플로를 만들게 해주는 것
  • 이전에는 SubDAG을 썼으나, Airflow 2.0부터는 TaskGroup을 사용한다.

SubDag

  • parallel_dag.py
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.operators.subdag import SubDagOperator
    
    from subdags.subdag_parallel_dag import subdag_parallel_dag
    from datetime import datetime
    
    default_args = {
        'start_date': datetime(2020, 1, 1)
    }
    
    with DAG('parallel_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
    
        task_1 = BashOperator(
            task_id='task_1',
            bash_command='sleep 3'
        )
    
        processing = SubDagOperator(
            task_id='processing_tasks',
            subdag=subdag_parallel_dag('parallel_dag', 'processing_tasks', default_args)
        )
    
        task_4 = BashOperator(
            task_id='task_4',
            bash_command='sleep 3'
        )
    
        task_1 >> processing >> task_4
    
  • subdag_parallel_dag.py
  from airflow import DAG
  from airflow.operators.bash import BashOperator

  def subdag_parallel_dag(parent_dag_id, child_dag_id, default_args):
      with DAG(dag_id=f'{parent_dag_id}.{child_dag_id}', default_args=default_args) as dag:
          task_2 = BashOperator(
              task_id='task_2',
              bash_command='sleep 3'
          )

          task_3 = BashOperator(
              task_id='task_3',
              bash_command='sleep 3'
          )

          return dag
  • 파일 구조

    image

  • UI 확인

    image

SubDAG은 이제 쓰지 않는다

  • 그러나 SubDAG는 실제로 다른 DAG에 포함된(embedded) DAG였다. 이로 인해 성능 및 기능 문제가 모두 발생했다.
    • SubDAG가 트리거되면 SubDAG 및 하위 작업은 전체 SubDAG가 완료될 때까지 작업자 슬롯을 차지.(Sequential) 이로 인해 다른 작업 처리가 지연될 수 있으며 작업자 슬롯 수에 따라 교착 상태가 발생할 수 있다.
    • SubDAG에는 고유한 매개변수, 일정 및 활성화된 설정이 있다. 상위 DAG와 일치하지 않으면 예기치 않은 동작이 발생할 수 있다.

TaskGroups

  • parallel_dag.py
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.utils.task_group import TaskGroup
    
    from datetime import datetime
    
    default_args = {
        'start_date': datetime(2020, 1, 1)
    }
    
    with DAG('parallel_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
    
        task_1 = BashOperator(
            task_id='task_1',
            bash_command='sleep 3'
        )
    
        with TaskGroup('processing_tasks') as processing_tasks:
            task_2 = BashOperator(
                task_id='task_2',
                bash_command='sleep 3'
            )
    
            task_3 = BashOperator(
                task_id='task_3',
                bash_command='sleep 3'
            )
    
        task_4 = BashOperator(
            task_id='task_4',
            bash_command='sleep 3'
        )
    
        task_1 >> processing_tasks >> task_4
    
  • UI확인
    image
    image

Subgroups in Taskgroup

  • parallel_dag.py
  from airflow import DAG
  from airflow.operators.bash import BashOperator
  from airflow.utils.task_group import TaskGroup

  from datetime import datetime

  default_args = {
      'start_date': datetime(2020, 1, 1)
  }

  with DAG('parallel_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:

      task_1 = BashOperator(
          task_id='task_1',
          bash_command='sleep 3'
      )

      with TaskGroup('processing_tasks') as processing_tasks:
          task_2 = BashOperator(
              task_id='task_2',
              bash_command='sleep 3'
          )
          with TaskGroup('spark_tasks') as spark_tasks:
              task_3 = BashOperator(
                  task_id='task_3', # spark_tasks.task_3
                  bash_command='sleep 3'
              )
          with TaskGroup('flink_tasks') as flink_tasks:
              task_3 = BashOperator(
                  task_id='task_3',
                  bash_command='sleep 3'
              )

      task_4 = BashOperator(
          task_id='task_4',
          bash_command='sleep 3'
      )

      task_1 >> processing_tasks >> task_4
  • UI 확인
    image