Computer Engineering/Data Engineering 15

Airflow KubernetesPodOperator 예제 코드 및 설명

KubernetesPodOperator는 Kuberntes cluster에서 Airflow가 실행중일 때 사용자가 원하는 docker image에서 task를 실행하는 task를 만드는 Operator입니다. Airflow는 여러 가지 서비스들을 Orchestration할 수 있다는 강점을 가지고 있습니다. 직접 데이터를 처리하는 서비스들을 여러 개 만들수도 있는데 이 때 각각의 서비스의 의존성이 다를 수 있는데 이 때 KubernetesPodOperator 를 사용하면 독립적인 컨테이너 환경에서 서비스를 실행할 수 있습니다. Install KubernetesPodOperator 를 위해 필요한 kubernetes provider 패키지 설치 pip install apache-airflow[cncf.ku..

초기 스타트업에서 Data Engineer는 어떤 일을 하나요?

이 글은 Medium 회사 블로그에 올린 글을 가져온 글 입니다! 안녕하세요. 저는 Verticah에서 Head of Data로 일하고 있는 Jordan입니다. 회사의 기술 블로그에 첫 번째 글로 초기 스타트업에서 데이터 엔지니어로 어떤 일을 하고 있는지를 소개해 보려고 합니다. 저희 팀은 현재 11명의 구성원으로 이루어진 Seed 투자를 받은 초기 스타트업으로 혁신 성장기업의 자금 조달 방식을 혁신하자는 비전 아래 Revenue Market이라는 미래의 매출을 판매할 수 있는 벤처 대출 플랫폼을 만들고 있습니다. Why an early-stage team needs a data guy 많은 회사들이 데이터 관련 직무를 회사가 어느 정도 성장한 후(A 시리즈 이후)에 채용합니다. 하지만, 데이터와 관련된..

Pandas DataFrame apply 성능 이슈 개선하기

Apply는 dataframe의 각 행이나 열에 User-defined function을 실행할 때 사용하는 흔한 옵션입니다. axis=1로 실행하게 되면 각 row에 대해서 연산을 수행하게 됩니다. 이 때 성능 문제가 흔히 발생됩니다. Pandas는 DataFrame은 내부적으로 Numpy 배열을 사용하여 열 단위로 데이터를 저장하고 보통 행의 수보다 열의 수는 훨씬 적은 경우가 많기 때문에 성능 이슈가 많이 발생하지 않습니다. apply 함수의 성능 이슈 반복적인 함수 호출: apply 함수는 각 행 또는 열에 대해 함수를 호출합니다. 이는 작업량이 많은 작업에서는 많은 반복 호출이 발생하므로 오버헤드가 발생할 수 있습니다. 데이터프레임의 크기가 클수록 이러한 반복 호출은 더욱 부담이 됩니다. 특히,..

Pandas DataFrame 조인 및 병합 / pandas merge, join, concat,

Pandas는 Python에서 데이터 조작 및 분석을 위한 강력한 도구로, 특히 DataFrame 객체를 통해 테이블 형식의 데이터를 다루는 데 유용합니다. DataFrame을 조인하는 것은 데이터 분석 및 처리에서 핵심적인 작업 중 하나입니다. 이번 글에서는 Pandas의 DataFrame을 조인하는 다양한 방법과 그 활용법에 대해 정리해보려고 합니다. Concat 함수는 Pandas 라이브러리에서 제공되는 함수로, 여러 개의 DataFrame 또는 Series를 연결(concatenate)하는 데 사용됩니다. 주어진 axis에 따라 객체를 열 또는 인덱스를 기준으로 연결합니다. pd.concat( objs, axis=0, join="outer", ignore_index=False, keys=None,..

Airflow CeleryKubernetesExecutor 사용하기

CeleryKubernetesExecutor 는 CeleryExecutor와 KubernetesExecutor 를 동시에 사용합니다. 즉, CeleryExecutor에 필요한 broker인 Redis와 항상 실행중인 celery worker를 사용하는 동시에 필요에 따라서 KubernetesExecutor를 활용할 수 있는 구성입니다. 언제 사용하는 것이 좋은가? Airflow 공식문서에서는 아래와 같은 상황에서의 사용을 권장합니다. 다음의 경우에 CeleryKubernetesExecutor를 고려하는 것을 권장합니다: 최대 수준에서 스케줄링해야 하는 작업의 수가 Kubernetes 클러스터가 편안하게 처리할 수 있는 규모를 초과하는 경우 작업 중 비교적 작은 부분이 런타임 격리를 필요로 하는 경우 Ce..

Pandas 피벗 테이블 예제 코드 및 설명

Pandas 피벗테이블 활용하기 Pandas의 피벗테이블은 데이터를 요약하고 분석하기에 유용한 도구입니다. 피벗테이블을 사용하면 특정 칼럼의 Data들을 column으로 해서 특정 값을 aggregate하는 새로운 분석 테이블을 만들어서 데이터를 분석할 수 있습니다. 특히, 여러 개의 column으로 group by 되어 있는 테이블에서 한 번 더 group by를 해서 aggregate할 때 유용합니다. 피벗테이블이란? Pivot table은 스프레드시트 프로그램(예: Excel, Google Sheets)에서 자주 사용되는 데이터 요약 기능 중 하나입니다. Pivot table은 원시 데이터를 기반으로 요약된 정보를 생성하는 것으로, 데이터를 쉽게 분석하고 시각화하는 데 도움이 됩니다. 사실 이렇게만..

Airflow Scheduler 역할 및 성능 개선 정리

Scheduler 개요 Scheduler는 Airflow의 주요 컴포넌트 중 하나입니다. Airflow의 스케줄러는 모든 작업과 DAG를 모니터링하고, 해당 작업들의 의존성이 완료된 후에 작업 인스턴스를 트리거합니다. 내부적으로 스케줄러는 서브프로세스를 생성하여 지정된 DAG 디렉토리의 모든 DAG를 모니터링하고 동기화합니다. 기본적으로 1분마다 스케줄러는 DAG 파싱 결과를 수집하고, 활성화된 작업들이 트리거될 수 있는지 확인합니다. 즉, Scheduler가 실행되지 않으면 DAG을 실행시킬 수 없습니다. Scheduler의 역할은 크게 아래와 같습니다. DAG(Directed Acyclic Graph)의 실행 스케줄링: Airflow에서 DAG는 작업들의 연결을 정의한 것이며, scheduler는 D..

Airflow Task 우선순위 설정하기(Priority weights)

Task Priority Weights 이 글은 Airflow Task의 우선순위를 priority_weight 파라미터와 weight_rule 을 이용해서 관리하는 방법을 설명합니다. 최근에 많은 양의 DAG을 rerun해야하는 경우가 발생했는데 우선순위가 높은 DAG의 스케쥴이 뒤로 밀려서 불편함을 겪어서 Task우선순위를 설정하는 방법에 대해서 찾아보게 됐습니다. Airflow는 task의 우선순위를 결정하기 위해 task의 priority_weights를 사용합니다. priority_weight는 각 task에 대해 정의되어 있으며, Airflow scheduler가 task를 실행할 때 사용되고, weight_rule는 priority_weight을 계산하는 방법에 대한 설정 값입니다. 해당 값..

Pandas dataframe 메모리 사용량 확인하기

Pandas의 dataframe 및 각 column의 메모리를 체크하는 방법은 매우 간단합니다. dataframe 전체 메모리 dataframe.info() 메서드를 이용하면 맨 아래 memory usage가 출력됩니다. >>> df.info() RangeIndex: 173511 entries, 0 to 173510 Data columns (total 47 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 id 173511 non-null int64 1 created_at 173511 non-null datetime64[ns] 2 updated_at 173511 non-null datetime64[ns] 3 deleted_..

Airflow Taskflow로 DAG refactoring하기

Introduction 안녕하세요. 이번 글에서는 Taskflow를 사용해서 다른 외부 Operator가 아닌 파이썬 Operator로만 이뤄진 DAG을 리팩토링 한 경험을 공유하려고 합니다 Taskflow란 Taskflow란 Airflow2.0에서 출시된 concept으로 Operator가 아닌 파이썬 로직들로만 이뤄진 DAG의 경우 @task decorator를 활용해서 깔끔하게 로직을 관리할 수 있습니다. 실제로 DB의 incremental data나 API나 크롤링을 통해 진행되는 작은 배치들(수십 MB 이하)은 Pandas의 Dataframe을 통해서도 쉽고 빠르게 조작이 가능합니다. Taskflow의 장점은 Xcoms을 사용하여 return 값을 전달해서 작업 간에 결과를 전달하기 편리합니다...

반응형