airflow 4

Airflow(에어플로우) could not queue task issue

Could not queue task issue GKE위에서 Airflow2.3.4 버젼에 kubernates executor의 scheduler에서 task를 queue에 넣지 못하는 이슈가 발생. 실질적으로 문제는 되지 않는데 그 이유는 이미 task가 queue로 들어갔기 때문이다. 코드를 디테일하게 살펴보지는 않았지만 대략적으로 에러 발생전 실해된 쿼리를 보면 lock을 획득과정에서 이슈가 있는 것 같다. SELECT pg_try_advisory_xact_lock(%(id)s) 해당 에러가 발생하는 코드는 아래 코드같은데 이 self.queued_tasks 컨테이너 안에 해당 key가 있다는건 lock 로직이 제대로 동작하지 않은 것으로 보인다. def queue_command( self, tas..

Airflow Taskflow로 DAG refactoring하기

Introduction 안녕하세요. 이번 글에서는 Taskflow를 사용해서 다른 외부 Operator가 아닌 파이썬 Operator로만 이뤄진 DAG을 리팩토링 한 경험을 공유하려고 합니다 Taskflow란 Taskflow란 Airflow2.0에서 출시된 concept으로 Operator가 아닌 파이썬 로직들로만 이뤄진 DAG의 경우 @task decorator를 활용해서 깔끔하게 로직을 관리할 수 있습니다. 실제로 DB의 incremental data나 API나 크롤링을 통해 진행되는 작은 배치들(수십 MB 이하)은 Pandas의 Dataframe을 통해서도 쉽고 빠르게 조작이 가능합니다. Taskflow의 장점은 Xcoms을 사용하여 return 값을 전달해서 작업 간에 결과를 전달하기 편리합니다...

Airflow Sensor 정리 (feat. S3 Sensor)

Concept Operator의 한 가지 type으로 wait for something to occur 이라는 한 가지 목적으로 만들어졌다. time-based로 어떤 이벤트 또는 조건이 성사되는 것을 기다린다. 여기서 something은 아래와 같은 것들이 있다. file의 생성여부 external event 그 밖의 다양한 기다릴 수 있는 조건 or event. 기본적으로 해당 event가 일어날 때 까지 기다리다가 발생하면 다음 downstream의 task가 동작할 수 있도록 하는 역할을한다. 즉, 어떤 파일이나 이벤트 기반으로 다음 task를 수행해야 할 때 사용할 수 있다. ex) loader라는 서비스에서 file을 지정된 위치에 저장한 후에 compactor라는 서비스에서 데이터를 comp..

Airflow k8s 로컬 개발환경 셋팅

Introduction 이 글의 목표는 local에서 kubernates를 docker container에서 실행시키기 위해서 kinder를 설치하고, helm를 이용해서 airflow를 설치하고 배포하는 방법을 정리하는 것입니다. Airflow는 Production에서 kubernates(이하 k8s)에 배포하는 것을 권장하기 때문에 local 환경에서도 가능한한 k8s에서 실행시키는 것이 좋다라고 생각합니다. 그래서 kind와 helm을 이용해서 airflow를 local에서 kubernates위에서 실행시키기 위한 셋업 절차를 정리한 것을 공유하려고 합니다. 대부분의 내용은 공식문서 를 참고하여 작성하였습니다. 또, 이 글은 Mac 사용자를 기준으로 한 글입니다. Requirements kind 로..

반응형