CeleryKubernetesExecutor 는 CeleryExecutor와 KubernetesExecutor 를 동시에 사용합니다. 즉, CeleryExecutor에 필요한 broker인 Redis와 항상 실행중인 celery worker를 사용하는 동시에 필요에 따라서 KubernetesExecutor를 활용할 수 있는 구성입니다.
언제 사용하는 것이 좋은가?
Airflow 공식문서에서는 아래와 같은 상황에서의 사용을 권장합니다.
다음의 경우에 CeleryKubernetesExecutor를 고려하는 것을 권장합니다:
- 최대 수준에서 스케줄링해야 하는 작업의 수가 Kubernetes 클러스터가 편안하게 처리할 수 있는 규모를 초과하는 경우
- 작업 중 비교적 작은 부분이 런타임 격리를 필요로 하는 경우
- Celery 워커에서 실행될 수 있는 많은 수의 작은 작업이 있지만, 사전에 정의된 환경에서 실행하는 것이 좋은 리소스 집약적인 작업이 있는 경우
이러한 경우에 CeleryKubernetesExecutor를 고려하는 것이 좋습니다.
- 공식문서: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/celery_kubernetes.html
실제 적용 사례
KubernetesExecutor를 사용하는 경우, 작업 시간이 상대적으로 짧지만 여러 개의 작업으로 나누어진 DAG에서는 pod 초기화 시간이 오래 걸려 전체 DAGRun의 실행 시간이 오래 걸릴 수 있는 문제가 발생할 수 있습니다. 예를 들어, 외부에서 여러 데이터를 전체 로드 방식으로 가져오는 작업이 있는데, 경우에 따라 실패할 수 있으며 전체 재실행이 아니라 일부만 재실행해야 하는 상황입니다. 이를 위해 10개의 페이지당 하나의 작업으로 총 300개의 작업으로 분할된 DAG이 있습니다.
한 Task당 코드 실행시간은 30초 정도이지만 Scheduler에서 Task를 예약하고 queue에 집어넣고 pod을 initializing하는데 걸리는 시간은 1분 30초이고, 사이트의 제약 상 동시에 2개씩만 실행할 수 있다고 가정하면 실행시간은 생각보다 길어집니다. 이런 경우에는 CeleryExecutor를 사용하면 git-sync 및 pod initializing시간을 아낄 수 있기 때문에 전체 실행시간이 많이 단축됩니다.
그리고 isolation된 환경에서 실행이 필요한 작업들만 task의 설정에 "queue": “kubernetes" 설정을 추가해주면 이 task들은 KubernetesExecutor 로 실행됩니다.
Code Example
kube_task= BashOperator(
task_id='Test_kubernetes_executor',
bash_command='echo Kubernetes',
queue = 'kubernetes'
)
celery_task = BashOperator(
task_id='Test_Celery_Executor',
bash_command='echo Celery',
)
이렇듯 간단한 작업이지만 여러개가 동시성을 제한을 가지고 실행되어야 하는경우에는 CeleryExecutor를 사용하는것도 효율적인 것 같습니다.
그렇기 때문에 공식문서에 소개된 사례외에도 CeleryKubernetesExecutor 를 사용해서 두 가지 Executor를 모두 사용하는것도 상황에 따라서 도움이 될 수 있을 것 같습니다.
'Computer Engineering > Data Engineering' 카테고리의 다른 글
Pandas DataFrame apply 성능 이슈 개선하기 (0) | 2023.07.08 |
---|---|
Pandas DataFrame 조인 및 병합 / pandas merge, join, concat, (0) | 2023.07.02 |
Pandas pivot_table 예제 및 설명 (0) | 2023.05.01 |
Airflow Scheduler 역할 및 성능 개선 정리 (0) | 2023.04.14 |
Airflow Task 우선순위 설정하기(Priority weights) (0) | 2023.04.06 |