Airflow Xcoms로 task간 데이터 주고받기
Xcoms
Xcoms로 데이터 주고받는 DAG 구성하기
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from random import uniform
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
def _training_model(ti):
accuracy = uniform(0.1, 10.0)
print(f'model\'s accuracy: {accuracy}')
ti.xcom_push(key='model_accuracy', value=accuracy)
def _choose_best_model(ti):
print('choose best model')
accuracies = ti.xcom_pull(key='model_accuracy', task_ids=[
'processing_tasks.training_model_a',
'processing_tasks.training_model_b',
'processing_tasks.training_model_c'
])
print(accuracies)
with DAG('xcom_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
downloading_data = BashOperator(
task_id='downloading_data',
bash_command='sleep 3'
do_xcom_push=False
)
with TaskGroup('processing_tasks') as processing_tasks:
training_model_a = PythonOperator(
task_id='training_model_a',
python_callable=_training_model
)
training_model_b = PythonOperator(
task_id='training_model_b',
python_callable=_training_model
)
training_model_c = PythonOperator(
task_id='training_model_c',
python_callable=_training_model
)
choose_best_model = PythonOperator(
task_id='choose_best_model',
python_callable=_choose_best_model
)
downloading_data >> processing_tasks >> choose_best_model
ti.xcom_push(key='model_accuracy', value=accuracy)
: xcom에 데이터 push (key-value형태의 JSON, pickle)
ti.xcom_pull(key='model_accuracy', task_ids=['task_id'..])
: 데이터 pull
do_xcom_push=False
: 비어있는 value를 xcoms에 push하지 않도록 함
UI 확인
- DAG trigger
- [Admin]-[Xcoms]
- 태스크
processing_tasks
로부터 태스크 choose_best_model
이 데이터를 잘 pull 해온것을 print를 통해 logs에서 확인할 수 있다.