Computer Engineering/Data Engineering 10

Pandas 피벗 테이블 예제 코드 및 설명

Pandas 피벗테이블 활용하기 Pandas의 피벗테이블은 데이터를 요약하고 분석하기에 유용한 도구입니다. 피벗테이블을 사용하면 특정 칼럼의 Data들을 column으로 해서 특정 값을 aggregate하는 새로운 분석 테이블을 만들어서 데이터를 분석할 수 있습니다. 특히, 여러 개의 column으로 group by 되어 있는 테이블에서 한 번 더 group by를 해서 aggregate할 때 유용합니다. 피벗테이블이란? Pivot table은 스프레드시트 프로그램(예: Excel, Google Sheets)에서 자주 사용되는 데이터 요약 기능 중 하나입니다. Pivot table은 원시 데이터를 기반으로 요약된 정보를 생성하는 것으로, 데이터를 쉽게 분석하고 시각화하는 데 도움이 됩니다. 사실 이렇게만..

Airflow Scheduler 역할 및 성능 개선 정리

Scheduler 개요 Scheduler는 Airflow의 주요 컴포넌트 중 하나입니다. Airflow의 스케줄러는 모든 작업과 DAG를 모니터링하고, 해당 작업들의 의존성이 완료된 후에 작업 인스턴스를 트리거합니다. 내부적으로 스케줄러는 서브프로세스를 생성하여 지정된 DAG 디렉토리의 모든 DAG를 모니터링하고 동기화합니다. 기본적으로 1분마다 스케줄러는 DAG 파싱 결과를 수집하고, 활성화된 작업들이 트리거될 수 있는지 확인합니다. 즉, Scheduler가 실행되지 않으면 DAG을 실행시킬 수 없습니다. Scheduler의 역할은 크게 아래와 같습니다. DAG(Directed Acyclic Graph)의 실행 스케줄링: Airflow에서 DAG는 작업들의 연결을 정의한 것이며, scheduler는 D..

Airflow Task 우선순위 설정하기(Priority weights)

Task Priority Weights 이 글은 Airflow Task의 우선순위를 priority_weight 파라미터와 weight_rule 을 이용해서 관리하는 방법을 설명합니다. 최근에 많은 양의 DAG을 rerun해야하는 경우가 발생했는데 우선순위가 높은 DAG의 스케쥴이 뒤로 밀려서 불편함을 겪어서 Task우선순위를 설정하는 방법에 대해서 찾아보게 됐습니다. Airflow는 task의 우선순위를 결정하기 위해 task의 priority_weights를 사용합니다. priority_weight는 각 task에 대해 정의되어 있으며, Airflow scheduler가 task를 실행할 때 사용되고, weight_rule는 priority_weight을 계산하는 방법에 대한 설정 값입니다. 해당 값..

Pandas dataframe 메모리 사용량 확인하기

Pandas의 dataframe 및 각 column의 메모리를 체크하는 방법은 매우 간단합니다. dataframe 전체 메모리 dataframe.info() 메서드를 이용하면 맨 아래 memory usage가 출력됩니다. >>> df.info() RangeIndex: 173511 entries, 0 to 173510 Data columns (total 47 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 id 173511 non-null int64 1 created_at 173511 non-null datetime64[ns] 2 updated_at 173511 non-null datetime64[ns] 3 deleted_..

Airflow Taskflow로 DAG refactoring하기

Introduction 안녕하세요. 이번 글에서는 Taskflow를 사용해서 다른 외부 Operator가 아닌 파이썬 Operator로만 이뤄진 DAG을 리팩토링 한 경험을 공유하려고 합니다 Taskflow란 Taskflow란 Airflow2.0에서 출시된 concept으로 Operator가 아닌 파이썬 로직들로만 이뤄진 DAG의 경우 @task decorator를 활용해서 깔끔하게 로직을 관리할 수 있습니다. 실제로 DB의 incremental data나 API나 크롤링을 통해 진행되는 작은 배치들(수십 MB 이하)은 Pandas의 Dataframe을 통해서도 쉽고 빠르게 조작이 가능합니다. Taskflow의 장점은 Xcoms을 사용하여 return 값을 전달해서 작업 간에 결과를 전달하기 편리합니다...

Pandas NaN이란 그리고 None 차이

NaN이란 not a number로 비어있는 결측치 데이터를 의미합니다. numpy와 pandas에서는 None이 아닌 NaN을 사용하는데 그 이유는 vectorized 연산이 가능하기 때문입니다. None을 넣으면 pandas를 사용해도 병렬 연산이 불가능해지기 때문에 결측치에 대해서는 np.nan을 사용해야합니다. import numpy as np s = pd.Series(['선화', '강호', np.nan, '소정', '우영']) # 병렬 연산이 가능 s += '테스트' s 0 선화테스트 1 강호테스트 2 NaN 3 소정테스트 4 우영테스트 dtype: object 타입 캐스팅 시 주의할 점. 그리고 np.nan은 다른 데이터들이 숫자인 경우는 float 타입 입니다. 그렇기 때문에 int로 typ..

Airflow Sensor 정리 (feat. S3 Sensor)

Concept Operator의 한 가지 type으로 wait for something to occur 이라는 한 가지 목적으로 만들어졌다. time-based로 어떤 이벤트 또는 조건이 성사되는 것을 기다린다. 여기서 something은 아래와 같은 것들이 있다. file의 생성여부 external event 그 밖의 다양한 기다릴 수 있는 조건 or event. 기본적으로 해당 event가 일어날 때 까지 기다리다가 발생하면 다음 downstream의 task가 동작할 수 있도록 하는 역할을한다. 즉, 어떤 파일이나 이벤트 기반으로 다음 task를 수행해야 할 때 사용할 수 있다. ex) loader라는 서비스에서 file을 지정된 위치에 저장한 후에 compactor라는 서비스에서 데이터를 comp..

Airflow k8s 로컬 개발환경 셋팅

Introduction 이 글의 목표는 local에서 kubernates를 docker container에서 실행시키기 위해서 kinder를 설치하고, helm를 이용해서 airflow를 설치하고 배포하는 방법을 정리하는 것입니다. Airflow는 Production에서 kubernates(이하 k8s)에 배포하는 것을 권장하기 때문에 local 환경에서도 가능한한 k8s에서 실행시키는 것이 좋다라고 생각합니다. 그래서 kind와 helm을 이용해서 airflow를 local에서 kubernates위에서 실행시키기 위한 셋업 절차를 정리한 것을 공유하려고 합니다. 대부분의 내용은 공식문서 를 참고하여 작성하였습니다. 또, 이 글은 Mac 사용자를 기준으로 한 글입니다. Requirements kind 로..

IntelliJ로 Spark 개발 환경 구축하기

Introduction 여러 클라우드 환경이나 컨테이너 환경에서 간단하게 Spark를 사용할 수 있지만 local 환경에서도 Spark를 실행시킬수 있는 환경을 만들어두면 가볍게 테스트 할 수 있는 것들이 많아서 Local 환경에서도 Spark application을 build하고 실행시킬수 있는 환경을 구축하는 과정을 기록해 두려고 합니다. Spark의 개발 환경을 셋팅하기 위해서 Scala 및 JDK를 설치해야합니다. Spark는 scala로 작성되었고, scala는 JVM위에서 동작하기 때문입니다. 그렇기 때문에 JDK, Scala를 먼저 설치해야 합니다. 1. JDK 설치하기 JDK는 Java를 사용하기 위해 필요한 모든 기능을 갖춘 Java용 SDK로 javac(컴파일러), jdb, javado..

Pandas Dataframe Type Casting 하기. (Feat. BigQuery)

Introduction 다른 데이터 Source에서 Pandas의 Dataframe으로 데이터를 Extract(추출)한 후에 Destination으로 데이터를 Load(적재)하는 경우에 데이터의 type을 casting해줘야 하는 경우가 많이 발생합니다. 타입에 관한 문제들을 겪다보면 단일 데이터베이스에서 ORM으로 DB에 CRUD를 하는 것이 얼마나 생산성이 높은지 느낄 수 있습니다. 특히, typing이 되어있는 데이터 소스가 아닌 경우(Web Page Crawling)하는 경우에 특히 이런 Needs들이 있습니다. 이런 경우에 Type casting이 필요한데 기본적인 Type casting과 나름의 시행착오를 겪은 부분들을 정리하려고 합니다. 특히, BigQuery에 Data를 load하는 경우에..

반응형