airflow 8

Airflow CeleryKubernetesExecutor 사용하기

CeleryKubernetesExecutor 는 CeleryExecutor와 KubernetesExecutor 를 동시에 사용합니다. 즉, CeleryExecutor에 필요한 broker인 Redis와 항상 실행중인 celery worker를 사용하는 동시에 필요에 따라서 KubernetesExecutor를 활용할 수 있는 구성입니다. 언제 사용하는 것이 좋은가? Airflow 공식문서에서는 아래와 같은 상황에서의 사용을 권장합니다. 다음의 경우에 CeleryKubernetesExecutor를 고려하는 것을 권장합니다: 최대 수준에서 스케줄링해야 하는 작업의 수가 Kubernetes 클러스터가 편안하게 처리할 수 있는 규모를 초과하는 경우 작업 중 비교적 작은 부분이 런타임 격리를 필요로 하는 경우 Ce..

Airflow Scheduler 역할 및 성능 개선 정리

Scheduler 개요 Scheduler는 Airflow의 주요 컴포넌트 중 하나입니다. Airflow의 스케줄러는 모든 작업과 DAG를 모니터링하고, 해당 작업들의 의존성이 완료된 후에 작업 인스턴스를 트리거합니다. 내부적으로 스케줄러는 서브프로세스를 생성하여 지정된 DAG 디렉토리의 모든 DAG를 모니터링하고 동기화합니다. 기본적으로 1분마다 스케줄러는 DAG 파싱 결과를 수집하고, 활성화된 작업들이 트리거될 수 있는지 확인합니다. 즉, Scheduler가 실행되지 않으면 DAG을 실행시킬 수 없습니다. Scheduler의 역할은 크게 아래와 같습니다. DAG(Directed Acyclic Graph)의 실행 스케줄링: Airflow에서 DAG는 작업들의 연결을 정의한 것이며, scheduler는 D..

Airflow Task 우선순위 설정하기(Priority weights)

Task Priority Weights 이 글은 Airflow Task의 우선순위를 priority_weight 파라미터와 weight_rule 을 이용해서 관리하는 방법을 설명합니다. 최근에 많은 양의 DAG을 rerun해야하는 경우가 발생했는데 우선순위가 높은 DAG의 스케쥴이 뒤로 밀려서 불편함을 겪어서 Task우선순위를 설정하는 방법에 대해서 찾아보게 됐습니다. Airflow는 task의 우선순위를 결정하기 위해 task의 priority_weights를 사용합니다. priority_weight는 각 task에 대해 정의되어 있으며, Airflow scheduler가 task를 실행할 때 사용되고, weight_rule는 priority_weight을 계산하는 방법에 대한 설정 값입니다. 해당 값..

Airflow Error - Triggerer's async thread was blocked for xx seconds, likely by a badly-written

이슈 정리 Sentry에서 Trigger에 Triggerer's async thread was blocked for xx seconds, likely by a badly-written .. 하는 에러가 Sentry를 통해 많이 발생한 것을 확인. 이슈가 발생한 코드 async def block_watchdog(self): """ Watchdog loop that detects blocking (badly-written) triggers. Triggers should be well-behaved async coroutines and await whenever they need to wait; this loop tries to run every 100ms to see if there are badly-wri..

Airflow(에어플로우) could not queue task issue

Could not queue task issue GKE위에서 Airflow2.3.4 버젼에 kubernates executor의 scheduler에서 task를 queue에 넣지 못하는 이슈가 발생. 실질적으로 문제는 되지 않는데 그 이유는 이미 task가 queue로 들어갔기 때문이다. 코드를 디테일하게 살펴보지는 않았지만 대략적으로 에러 발생전 실해된 쿼리를 보면 lock을 획득과정에서 이슈가 있는 것 같다. SELECT pg_try_advisory_xact_lock(%(id)s) 해당 에러가 발생하는 코드는 아래 코드같은데 이 self.queued_tasks 컨테이너 안에 해당 key가 있다는건 lock 로직이 제대로 동작하지 않은 것으로 보인다. def queue_command( self, tas..

Airflow Taskflow로 DAG refactoring하기

Introduction 안녕하세요. 이번 글에서는 Taskflow를 사용해서 다른 외부 Operator가 아닌 파이썬 Operator로만 이뤄진 DAG을 리팩토링 한 경험을 공유하려고 합니다 Taskflow란 Taskflow란 Airflow2.0에서 출시된 concept으로 Operator가 아닌 파이썬 로직들로만 이뤄진 DAG의 경우 @task decorator를 활용해서 깔끔하게 로직을 관리할 수 있습니다. 실제로 DB의 incremental data나 API나 크롤링을 통해 진행되는 작은 배치들(수십 MB 이하)은 Pandas의 Dataframe을 통해서도 쉽고 빠르게 조작이 가능합니다. Taskflow의 장점은 Xcoms을 사용하여 return 값을 전달해서 작업 간에 결과를 전달하기 편리합니다...

Airflow Sensor 정리 (feat. S3 Sensor)

Concept Operator의 한 가지 type으로 wait for something to occur 이라는 한 가지 목적으로 만들어졌다. time-based로 어떤 이벤트 또는 조건이 성사되는 것을 기다린다. 여기서 something은 아래와 같은 것들이 있다. file의 생성여부 external event 그 밖의 다양한 기다릴 수 있는 조건 or event. 기본적으로 해당 event가 일어날 때 까지 기다리다가 발생하면 다음 downstream의 task가 동작할 수 있도록 하는 역할을한다. 즉, 어떤 파일이나 이벤트 기반으로 다음 task를 수행해야 할 때 사용할 수 있다. ex) loader라는 서비스에서 file을 지정된 위치에 저장한 후에 compactor라는 서비스에서 데이터를 comp..

Airflow k8s 로컬 개발환경 셋팅

Introduction 이 글의 목표는 local에서 kubernates를 docker container에서 실행시키기 위해서 kinder를 설치하고, helm를 이용해서 airflow를 설치하고 배포하는 방법을 정리하는 것입니다. Airflow는 Production에서 kubernates(이하 k8s)에 배포하는 것을 권장하기 때문에 local 환경에서도 가능한한 k8s에서 실행시키는 것이 좋다라고 생각합니다. 그래서 kind와 helm을 이용해서 airflow를 local에서 kubernates위에서 실행시키기 위한 셋업 절차를 정리한 것을 공유하려고 합니다. 대부분의 내용은 공식문서 를 참고하여 작성하였습니다. 또, 이 글은 Mac 사용자를 기준으로 한 글입니다. Requirements kind 로..

반응형