Airflow Xcoms로 task간 데이터 주고받기
May 2, 2022
Xcoms
Xcoms로 데이터 주고받는 DAG 구성하기
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from random import uniform
from datetime import datetime
default_args = {
'start_date' : datetime ( 2020 , 1 , 1 )
}
def _training_model ( ti ):
accuracy = uniform ( 0.1 , 10.0 )
print ( f 'model \' s accuracy: { accuracy } ' )
ti . xcom_push ( key = 'model_accuracy' , value = accuracy )
def _choose_best_model ( ti ):
print ( 'choose best model' )
accuracies = ti . xcom_pull ( key = 'model_accuracy' , task_ids = [
'processing_tasks.training_model_a' ,
'processing_tasks.training_model_b' ,
'processing_tasks.training_model_c'
])
print ( accuracies )
with DAG ( 'xcom_dag' , schedule_interval = '@daily' , default_args = default_args , catchup = False ) as dag :
downloading_data = BashOperator (
task_id = 'downloading_data' ,
bash_command = 'sleep 3'
do_xcom_push = False
)
with TaskGroup ( 'processing_tasks' ) as processing_tasks :
training_model_a = PythonOperator (
task_id = 'training_model_a' ,
python_callable = _training_model
)
training_model_b = PythonOperator (
task_id = 'training_model_b' ,
python_callable = _training_model
)
training_model_c = PythonOperator (
task_id = 'training_model_c' ,
python_callable = _training_model
)
choose_best_model = PythonOperator (
task_id = 'choose_best_model' ,
python_callable = _choose_best_model
)
downloading_data >> processing_tasks >> choose_best_model
ti.xcom_push(key='model_accuracy', value=accuracy) : xcom에 데이터 push (key-value형태의 JSON, pickle)
ti.xcom_pull(key='model_accuracy', task_ids=['task_id'..]) : 데이터 pull
do_xcom_push=False : 비어있는 value를 xcoms에 push하지 않도록 함
UI 확인
DAG trigger
[Admin]-[Xcoms]
태스크 processing_tasks 로부터 태스크 choose_best_model 이 데이터를 잘 pull 해온것을 print를 통해 logs에서 확인할 수 있다.