Airflow 여러개의 task 동시에 실행하기
구성도
Sequential Execution
설정 확인
- 메타스토어(core sql_alchemy_conn) 확인
- airflow config get-value core sql_alchemy_conn
sqlite:////home/airflow/airflow/airflow.db
- sqlite에서는 multiple write가 안된다는 거 기억하고 sequential execute가 되어야한다
- core executor 확인
- airflow config get-value core executor
SequentialExecutor
- sequential executor : 하나끝나면->하나시작 순서대로 실행하며, debugging할때 주로 쓴다.
DAG 작성
- parallel_dag.py : »와 []로 병렬 구성
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime default_args = { 'start_date': datetime(2020, 1, 1) } with DAG('parallel_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag: task_1 = BashOperator( task_id='task_1', bash_command='sleep 3' ) task_2 = BashOperator( task_id='task_2', bash_command='sleep 3' ) task_3 = BashOperator( task_id='task_3', bash_command='sleep 3' ) task_4 = BashOperator( task_id='task_4', bash_command='sleep 3' ) task_1 >> [task_2, task_3] >> task_4
Trigger
- 간트 차트를 보면 task가 순차적으로 실행된것을 볼 수 있다.
Parallel
설정 바꾸기
- sqlite에서 postgreSQL로 바꾼다 (multi write)
- local executor로 바꾼다
- task 하나가 subprocess가 된다. 즉 각 task가 하나의 머신에서 동작한다.
- 여러 개의 태스크를 병렬로 execute할 수 있다.
PostgreSQL 다운로드
- 다운로드
sudo apt install postgresql
- postgres 진입 및 사용자 설정
sudo -u postgres psql ALTER USER {id} PASSWORD '{password}';
- postgreSQL package 다운로드
pip install 'apache-airflow[postgres]'
### sql_alchemy_conn 수정 - airflow.cfg
sql_alchemy_conn = postgresql+psycopg2://{id}:{password}@localhost/{db_name}
- 수정 확인
airflow db check
executor 수정
- airflow.cfg
executor = LocalExecutor
변경 적용
- webserver, scheduler stop/start : ctrl-c로 stop
sudo lsof -i tcp:8080 | grep 'airflow' | awk '{print $2}' | xargs kill -9 rm /home/airflow/airflow/airflow-webserver.pid
- db init
airflow db init
- webserver, scheduler start
Trigger
- UI 확인 시 task2와 task3가 ‘동시에’ 실행되는 것을 볼 수 있다.