Computer Engineering/Data Engineering 18

Airflow Task 우선순위 설정하기(Priority weights)

Task Priority Weights 이 글은 Airflow Task의 우선순위를 priority_weight 파라미터와 weight_rule 을 이용해서 관리하는 방법을 설명합니다. 최근에 많은 양의 DAG을 rerun해야하는 경우가 발생했는데 우선순위가 높은 DAG의 스케쥴이 뒤로 밀려서 불편함을 겪어서 Task우선순위를 설정하는 방법에 대해서 찾아보게 됐습니다. Airflow는 task의 우선순위를 결정하기 위해 task의 priority_weights를 사용합니다. priority_weight는 각 task에 대해 정의되어 있으며, Airflow scheduler가 task를 실행할 때 사용되고, weight_rule는 priority_weight을 계산하는 방법에 대한 설정 값입니다. 해당 값..

Pandas dataframe 메모리 사용량 확인하기

Pandas의 dataframe 및 각 column의 메모리를 체크하는 방법은 매우 간단합니다. dataframe 전체 메모리 dataframe.info() 메서드를 이용하면 맨 아래 memory usage가 출력됩니다. >>> df.info() RangeIndex: 173511 entries, 0 to 173510 Data columns (total 47 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 id 173511 non-null int64 1 created_at 173511 non-null datetime64[ns] 2 updated_at 173511 non-null datetime64[ns] 3 deleted_..

Airflow Taskflow로 DAG refactoring하기

Introduction 안녕하세요. 이번 글에서는 Taskflow를 사용해서 다른 외부 Operator가 아닌 파이썬 Operator로만 이뤄진 DAG을 리팩토링 한 경험을 공유하려고 합니다 Taskflow란 Taskflow란 Airflow2.0에서 출시된 concept으로 Operator가 아닌 파이썬 로직들로만 이뤄진 DAG의 경우 @task decorator를 활용해서 깔끔하게 로직을 관리할 수 있습니다. 실제로 DB의 incremental data나 API나 크롤링을 통해 진행되는 작은 배치들(수십 MB 이하)은 Pandas의 Dataframe을 통해서도 쉽고 빠르게 조작이 가능합니다. Taskflow의 장점은 Xcoms을 사용하여 return 값을 전달해서 작업 간에 결과를 전달하기 편리합니다...

Pandas NaN이란 그리고 None 차이

NaN이란 not a number로 비어있는 결측치 데이터를 의미합니다. numpy와 pandas에서는 None이 아닌 NaN을 사용하는데 그 이유는 vectorized 연산이 가능하기 때문입니다. None을 넣으면 pandas를 사용해도 병렬 연산이 불가능해지기 때문에 결측치에 대해서는 np.nan을 사용해야합니다. import numpy as np s = pd.Series(['선화', '강호', np.nan, '소정', '우영']) # 병렬 연산이 가능 s += '테스트' s 0 선화테스트 1 강호테스트 2 NaN 3 소정테스트 4 우영테스트 dtype: object 타입 캐스팅 시 주의할 점. 그리고 np.nan은 다른 데이터들이 숫자인 경우는 float 타입 입니다. 그렇기 때문에 int로 typ..

Airflow Sensor 정리 (feat. S3 Sensor)

Concept Operator의 한 가지 type으로 wait for something to occur 이라는 한 가지 목적으로 만들어졌다. time-based로 어떤 이벤트 또는 조건이 성사되는 것을 기다린다. 여기서 something은 아래와 같은 것들이 있다. file의 생성여부 external event 그 밖의 다양한 기다릴 수 있는 조건 or event. 기본적으로 해당 event가 일어날 때 까지 기다리다가 발생하면 다음 downstream의 task가 동작할 수 있도록 하는 역할을한다. 즉, 어떤 파일이나 이벤트 기반으로 다음 task를 수행해야 할 때 사용할 수 있다. ex) loader라는 서비스에서 file을 지정된 위치에 저장한 후에 compactor라는 서비스에서 데이터를 comp..

Airflow k8s 로컬 개발환경 셋팅

Introduction 이 글의 목표는 local에서 kubernates를 docker container에서 실행시키기 위해서 kinder를 설치하고, helm를 이용해서 airflow를 설치하고 배포하는 방법을 정리하는 것입니다. Airflow는 Production에서 kubernates(이하 k8s)에 배포하는 것을 권장하기 때문에 local 환경에서도 가능한한 k8s에서 실행시키는 것이 좋다라고 생각합니다. 그래서 kind와 helm을 이용해서 airflow를 local에서 kubernates위에서 실행시키기 위한 셋업 절차를 정리한 것을 공유하려고 합니다. 대부분의 내용은 공식문서 를 참고하여 작성하였습니다. 또, 이 글은 Mac 사용자를 기준으로 한 글입니다. Requirements kind 로..

IntelliJ로 Spark 개발 환경 구축하기

Introduction 여러 클라우드 환경이나 컨테이너 환경에서 간단하게 Spark를 사용할 수 있지만 local 환경에서도 Spark를 실행시킬수 있는 환경을 만들어두면 가볍게 테스트 할 수 있는 것들이 많아서 Local 환경에서도 Spark application을 build하고 실행시킬수 있는 환경을 구축하는 과정을 기록해 두려고 합니다. Spark의 개발 환경을 셋팅하기 위해서 Scala 및 JDK를 설치해야합니다. Spark는 scala로 작성되었고, scala는 JVM위에서 동작하기 때문입니다. 그렇기 때문에 JDK, Scala를 먼저 설치해야 합니다. 1. JDK 설치하기 JDK는 Java를 사용하기 위해 필요한 모든 기능을 갖춘 Java용 SDK로 javac(컴파일러), jdb, javado..

Pandas Dataframe Type Casting 하기. (Feat. BigQuery)

Introduction 다른 데이터 Source에서 Pandas의 Dataframe으로 데이터를 Extract(추출)한 후에 Destination으로 데이터를 Load(적재)하는 경우에 데이터의 type을 casting해줘야 하는 경우가 많이 발생합니다. 타입에 관한 문제들을 겪다보면 단일 데이터베이스에서 ORM으로 DB에 CRUD를 하는 것이 얼마나 생산성이 높은지 느낄 수 있습니다. 특히, typing이 되어있는 데이터 소스가 아닌 경우(Web Page Crawling)하는 경우에 특히 이런 Needs들이 있습니다. 이런 경우에 Type casting이 필요한데 기본적인 Type casting과 나름의 시행착오를 겪은 부분들을 정리하려고 합니다. 특히, BigQuery에 Data를 load하는 경우에..

반응형