Computer Engineering/Data Engineering

Airflow CeleryKubernetesExecutor 사용하기

jordan.bae 2023. 6. 16. 21:44

Airflow logo

CeleryKubernetesExecutor 는 CeleryExecutor와 KubernetesExecutor 를 동시에 사용합니다. 즉, CeleryExecutor에 필요한 broker인 Redis와 항상 실행중인 celery worker를 사용하는 동시에 필요에 따라서 KubernetesExecutor를 활용할 수 있는 구성입니다.

 

언제 사용하는 것이 좋은가?

Airflow 공식문서에서는 아래와 같은 상황에서의 사용을 권장합니다.

다음의 경우에 CeleryKubernetesExecutor를 고려하는 것을 권장합니다:

  1. 최대 수준에서 스케줄링해야 하는 작업의 수가 Kubernetes 클러스터가 편안하게 처리할 수 있는 규모를 초과하는 경우
  2. 작업 중 비교적 작은 부분이 런타임 격리를 필요로 하는 경우
  3. Celery 워커에서 실행될 수 있는 많은 수의 작은 작업이 있지만, 사전에 정의된 환경에서 실행하는 것이 좋은 리소스 집약적인 작업이 있는 경우

이러한 경우에 CeleryKubernetesExecutor를 고려하는 것이 좋습니다.

 

실제 적용 사례

KubernetesExecutor를 사용하는 경우, 작업 시간이 상대적으로 짧지만 여러 개의 작업으로 나누어진 DAG에서는 pod 초기화 시간이 오래 걸려 전체 DAGRun의 실행 시간이 오래 걸릴 수 있는 문제가 발생할 수 있습니다. 예를 들어, 외부에서 여러 데이터를 전체 로드 방식으로 가져오는 작업이 있는데, 경우에 따라 실패할 수 있으며 전체 재실행이 아니라 일부만 재실행해야 하는 상황입니다. 이를 위해 10개의 페이지당 하나의 작업으로 총 300개의 작업으로 분할된 DAG이 있습니다.

한 Task당 코드 실행시간은 30초 정도이지만 Scheduler에서 Task를 예약하고 queue에 집어넣고 pod을 initializing하는데 걸리는 시간은 1분 30초이고, 사이트의 제약 상 동시에 2개씩만 실행할 수 있다고 가정하면 실행시간은 생각보다 길어집니다. 이런 경우에는 CeleryExecutor를 사용하면 git-sync 및 pod initializing시간을 아낄 수 있기 때문에 전체 실행시간이 많이 단축됩니다.

그리고 isolation된 환경에서 실행이 필요한 작업들만 task의 설정에 "queue": “kubernetes" 설정을 추가해주면 이 task들은 KubernetesExecutor 로 실행됩니다.

Code Example

kube_task= BashOperator(
        task_id='Test_kubernetes_executor',
        bash_command='echo Kubernetes',
        queue = 'kubernetes'
    )
celery_task = BashOperator(
        task_id='Test_Celery_Executor',
        bash_command='echo Celery',
    )

이렇듯 간단한 작업이지만 여러개가 동시성을 제한을 가지고 실행되어야 하는경우에는 CeleryExecutor를 사용하는것도 효율적인 것 같습니다.

그렇기 때문에 공식문서에 소개된 사례외에도 CeleryKubernetesExecutor 를 사용해서 두 가지 Executor를 모두 사용하는것도 상황에 따라서 도움이 될 수 있을 것 같습니다.

반응형