Computer Engineering/Data Engineering

Airflow Taskflow로 DAG refactoring하기

jordan.bae 2022. 11. 14. 19:00

 

Introduction

안녕하세요. 이번 글에서는 Taskflow를 사용해서 다른 외부 Operator가 아닌 파이썬 Operator로만 이뤄진 DAG을 리팩토링 한 경험을 공유하려고 합니다

 

Taskflow란

Taskflow란 Airflow2.0에서 출시된 concept으로 Operator가 아닌 파이썬 로직들로만 이뤄진 DAG의 경우 @task decorator를 활용해서 깔끔하게 로직을 관리할 수 있습니다. 실제로 DB의 incremental data나 API나 크롤링을 통해 진행되는 작은 배치들(수십 MB 이하)은 Pandas의 Dataframe을 통해서도 쉽고 빠르게 조작이 가능합니다.

Taskflow의 장점은 Xcoms을 사용하여 return 값을 전달해서 작업 간에 결과를 전달하기 편리합니다. 다른 점 들보다도 저는 코드의 흐름을 위에서 아래로 한 번에 읽을 수 있다는 점이 가장 마음에 들었습니다. 이 부분은 추후에 코드와 함께 자세히 설명드리겠습니다.

 

Code 예제

예제는 간단한 DAG으로 네이버에서 나스닥지수 데이터를 가져와서 간단한 transform후에 Load하는 코드로 데이터베이스에 Load하는 부분은 print문으로 대체합니다.

실제 실무에서도 주기적으로 다양한 데이터를 크롤링 or API로 가져와서 저장하는 경우에는 데이터의 사이즈가 크지 않기 때문에 사용할 수 있는 경우가 많을 것 같습니다.

 

기존 코드

개선 전 DAG

from datetime import datetime, timedelta
from typing import List

import requests
from pendulum import timezone
from airflow.models import DAG
from airflow.operators.python import PythonOperator


def etl_market_index(**context):
    def extract() -> List[dict]:
        url = 'https://finance.naver.com/world/worldDayListJson.naver'
        headers = {"referer": "https://finance.naver.com/world/sise.naver"}
        params = {
            "symbol": "NAS@IXIC",
            "fdtc": "0",
            "page": "1"
        }
        session = requests.Session()
        response = session.post(url, headers=headers, params=params)
        if response.status_code != 200:
            raise Exception("Failed to fetch data")

        return response.json()

    def transform(data: List[dict]) -> List[dict]:
        # 예시 날짜 데이터 포맷을 %Y-%m-%d로 변환
        for d in data:
            d["xymd"] = datetime.strptime(d["xymd"], "%Y%m%d").strftime("%Y-%m-%d")

        return data

    def load(data: List[dict]):
        print(data)

    data = extract()
    transformed_data = transform(data)
    load(transformed_data)


with DAG(
        "etl_market_index",
        schedule_interval="0 0 * * *",
        start_date=datetime(2022, 11, 13, tzinfo=timezone("Asia/Seoul")),
        default_args={
            'retries': 0,
            'retry_delay': timedelta(minutes=5),
            'provide_context': True,
            "depends_on_past": True,
            "trigger_rule": "all_success",
        },
        tags=["naver"],
) as dag_with_locations:
    etl_market_index = PythonOperator(
        task_id='etl_market_index',
        python_callable=etl_market_index,
    )

    etl_market_index

 

Taskflow로 개선한 코드

개선 후 DAG

 

from datetime import datetime, timedelta
from typing import List

import requests
from pendulum import timezone

from airflow.decorators import dag, task


@dag(
    "etl_market_index_with_workflow",
    schedule_interval="0 0 * * *",
    start_date=datetime(2022, 11, 13, tzinfo=timezone("Asia/Seoul")),
    default_args={
        'retries': 0,
        'retry_delay': timedelta(minutes=5),
        'provide_context': True,
        "depends_on_past": True,
        "trigger_rule": "all_success",
    },
    tags=["naver"],
)
def etl_market_index(**context):
    @task()
    def extract() -> List[dict]:
        url = 'https://finance.naver.com/world/worldDayListJson.naver'
        headers = {"referer": "https://finance.naver.com/world/sise.naver"}
        params = {
            "symbol": "NAS@IXIC",
            "fdtc": "0",
            "page": "1"
        }
        session = requests.Session()
        response = session.post(url, headers=headers, params=params)
        if response.status_code != 200:
            raise Exception("Failed to fetch data")

        return response.json()

    @task()
    def transform(data: List[dict]) -> List[dict]:
        # 예시 날짜 데이터 포맷을 %Y-%m-%d로 변환
        for d in data:
            d["xymd"] = datetime.strptime(d["xymd"], "%Y%m%d").strftime("%Y-%m-%d")

        return data

    @task()
    def load(data: List[dict]):
        print(data)

    data = extract()
    transformed_data = transform(data)
    load(transformed_data)


etl_market_index = etl_market_index()

 

Workflow 사용해서 얻은 개선점

코드의 가독성

  • 제가 봤을 때 가장 큰 장점은 코드의 가독성입니다. workflow를 사용하면 위에서 아래로 한 번에 코드를 읽으면 되지만 기존 코드는 DAG 정의를 보면서 순서에 맞게 파이썬 함수를 읽어 나가야 합니다. 위 아래를 왔다 갔다 하면 코드를 읽기 어렵고 컨텍스트를 유지하기 어렵습니다.

Task로 분리

  • 기존 코드도 Task를 분리하고 xcom을 push 하면 되지만 xcom을 사용하면 코드가 지저분해지는 경향이 있습니다. 기존 코드에서는 데이터가 크지 않기 때문에 하나의 main 함수가 각 ETL 함수를 내부에서 호출했기 때문에 항상 전체를 다시 돌려야 했습니다.

 

Limitation

  • Xcom에서 사용 중인 Serializer에서 serialize(직렬화)가 가능한 데이터를 return 해야 합니다. 기본값을 사용 시 pandas dataframe과 같은 자주 사용하는 데이터 타입의 직렬 화가 가능하지 않습니다.
    • 하지만, 중간 결과물을 temp table에 저장해서 이런 부분을 극복할 수 있습니다. 저는 코드의 가독성 측면에서 개선 포인트가 마음에 들어서 이 부분을 고려해 볼 수 있다고 생각합니다.
  • Xcom은 작은 value를 전달하기 위해서 만들어졌기 때문에 큰 데이터의 전송으로는 적절하지 않습니다.
    • custom xcom backend를 사용하면 어느 정도 크기까지는 가능합니다.
  • Task 간의 복잡한 종속성을 가진 경우는 적절하지 않을 수 있습니다.

 

정리

몇 가지 limitation이 있지만 workflow를 통해서 python operator들로 작성된 DAG을 보기 좋게 리팩토링할 수 있습니다. 코드가 복잡해지면 효과는 더욱 커진다고 생각합니다. 데이터가 크지 않은 DAG부터 리팩토링을 해본 후 장점이 있다고 느껴지면 dataframe을 사용하는 DAG 들에도 적용해 볼 만한 것 같습니다.

반응형