Airflow Xcoms로 task간 데이터 주고받기

Xcoms

Xcoms로 데이터 주고받는 DAG 구성하기

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup

from random import uniform
from datetime import datetime

default_args = {
    'start_date': datetime(2020, 1, 1)
}


def _training_model(ti):
    accuracy = uniform(0.1, 10.0)
    print(f'model\'s accuracy: {accuracy}')
    ti.xcom_push(key='model_accuracy', value=accuracy)


def _choose_best_model(ti):
    print('choose best model')
    accuracies = ti.xcom_pull(key='model_accuracy', task_ids=[
        'processing_tasks.training_model_a',
        'processing_tasks.training_model_b',
        'processing_tasks.training_model_c'
    ])
    print(accuracies)

with DAG('xcom_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:

    downloading_data = BashOperator(
        task_id='downloading_data',
        bash_command='sleep 3'
        do_xcom_push=False
    )

    with TaskGroup('processing_tasks') as processing_tasks:
        training_model_a = PythonOperator(
            task_id='training_model_a',
            python_callable=_training_model
        )

        training_model_b = PythonOperator(
            task_id='training_model_b',
            python_callable=_training_model
        )

        training_model_c = PythonOperator(
            task_id='training_model_c',
            python_callable=_training_model
        )

    choose_best_model = PythonOperator(
        task_id='choose_best_model',
        python_callable=_choose_best_model
    )

    downloading_data >> processing_tasks >> choose_best_model
  • ti.xcom_push(key='model_accuracy', value=accuracy) : xcom에 데이터 push (key-value형태의 JSON, pickle)
  • ti.xcom_pull(key='model_accuracy', task_ids=['task_id'..]) : 데이터 pull
  • do_xcom_push=False : 비어있는 value를 xcoms에 push하지 않도록 함

UI 확인

  • DAG trigger
    image
  • [Admin]-[Xcoms] image
  • 태스크 processing_tasks 로부터 태스크 choose_best_model 이 데이터를 잘 pull 해온것을 print를 통해 logs에서 확인할 수 있다.
    image