Airflow 여러개의 task 동시에 실행하기

구성도

image

Sequential Execution

설정 확인

  • 메타스토어(core sql_alchemy_conn) 확인
    • airflow config get-value core sql_alchemy_conn
    • sqlite:////home/airflow/airflow/airflow.db
    • sqlite에서는 multiple write가 안된다는 거 기억하고 sequential execute가 되어야한다
  • core executor 확인
    • airflow config get-value core executor
    • SequentialExecutor
    • sequential executor : 하나끝나면->하나시작 순서대로 실행하며, debugging할때 주로 쓴다.

      DAG 작성

  • parallel_dag.py : »와 []로 병렬 구성
      from airflow import DAG
      from airflow.operators.bash import BashOperator
    
      from datetime import datetime
    
      default_args = {
          'start_date': datetime(2020, 1, 1)
      }
    
      with DAG('parallel_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
    
          task_1 = BashOperator(
              task_id='task_1',
              bash_command='sleep 3'
          )
    
          task_2 = BashOperator(
              task_id='task_2',
              bash_command='sleep 3'
          )
    
          task_3 = BashOperator(
              task_id='task_3',
              bash_command='sleep 3'
          )
    
          task_4 = BashOperator(
              task_id='task_4',
              bash_command='sleep 3'
          )
    
          task_1 >> [task_2, task_3] >> task_4
    

Trigger

image
image

  • 간트 차트를 보면 task가 순차적으로 실행된것을 볼 수 있다.

Parallel

설정 바꾸기

  • sqlite에서 postgreSQL로 바꾼다 (multi write)
  • local executor로 바꾼다
    • task 하나가 subprocess가 된다. 즉 각 task가 하나의 머신에서 동작한다.
    • 여러 개의 태스크를 병렬로 execute할 수 있다.

      PostgreSQL 다운로드

  • 다운로드
    sudo apt install postgresql
  • postgres 진입 및 사용자 설정
      sudo -u postgres psql
      ALTER USER {id} PASSWORD '{password}';  
    
  • postgreSQL package 다운로드
    pip install 'apache-airflow[postgres]' ### sql_alchemy_conn 수정
  • airflow.cfg
    sql_alchemy_conn = postgresql+psycopg2://{id}:{password}@localhost/{db_name}
  • 수정 확인
    airflow db check

    executor 수정

  • airflow.cfg
    executor = LocalExecutor

    변경 적용

  • webserver, scheduler stop/start : ctrl-c로 stop
      sudo lsof -i tcp:8080 | grep 'airflow' | awk '{print $2}' | xargs kill -9
      rm /home/airflow/airflow/airflow-webserver.pid
    
  • db init airflow db init
  • webserver, scheduler start

Trigger

  • UI 확인 시 task2와 task3가 ‘동시에’ 실행되는 것을 볼 수 있다.
    image
    image