Computer Engineering/Data Engineering 16

BigQuery Merge Query 설명 및 사용 사례

이번 글에서는 Merge 쿼리에 대한 설명과 Merge문의 대표적 사용 사례인 Merge쿼리를 사용해서 MySQL BigQuery 간에 데이터 웨어하우징에 사용하는 사례를 함께 기록해 보려고 합니다. 계속 변경되는 데이터 소스를 기존의 데이터셋에 정기적으로 통합해야 할 때 자주 사용됩니다. Merge 쿼리 BigQuery에서 MERGE 쿼리는 SQL의 MERGE 문을 기반으로 하며, 두 개의 테이블을 결합하여 대상 테이블에 소스 테이블의 데이터를 삽입, 업데이트, 또는 삭제하는데 사용됩니다. 이것은 일종의 "upsert" 작업으로 보일 수 있으며, 즉 존재하지 않는 행은 삽입하고 존재하는 행은 업데이트하는 기능을 제공합니다. 위에서 설명한 것처럼 조건에 따라 삽입, 업데이트, 또는 삭제가 가능하기 때문에..

Airflow KubernetesPodOperator 예제 코드 및 설명

KubernetesPodOperator는 Kuberntes cluster에서 Airflow가 실행중일 때 사용자가 원하는 docker image에서 task를 실행하는 task를 만드는 Operator입니다. Airflow는 여러 가지 서비스들을 Orchestration할 수 있다는 강점을 가지고 있습니다. 직접 데이터를 처리하는 서비스들을 여러 개 만들수도 있는데 이 때 각각의 서비스의 의존성이 다를 수 있는데 이 때 KubernetesPodOperator 를 사용하면 독립적인 컨테이너 환경에서 서비스를 실행할 수 있습니다. Install KubernetesPodOperator 를 위해 필요한 kubernetes provider 패키지 설치 pip install apache-airflow[cncf.ku..

초기 스타트업에서 Data Engineer는 어떤 일을 하나요?

이 글은 Medium 회사 블로그에 올린 글을 가져온 글 입니다! 안녕하세요. 저는 Verticah에서 Head of Data로 일하고 있는 Jordan입니다. 회사의 기술 블로그에 첫 번째 글로 초기 스타트업에서 데이터 엔지니어로 어떤 일을 하고 있는지를 소개해 보려고 합니다. 저희 팀은 현재 11명의 구성원으로 이루어진 Seed 투자를 받은 초기 스타트업으로 혁신 성장기업의 자금 조달 방식을 혁신하자는 비전 아래 Revenue Market이라는 미래의 매출을 판매할 수 있는 벤처 대출 플랫폼을 만들고 있습니다. Why an early-stage team needs a data guy 많은 회사들이 데이터 관련 직무를 회사가 어느 정도 성장한 후(A 시리즈 이후)에 채용합니다. 하지만, 데이터와 관련된..

Pandas DataFrame apply 성능 이슈 개선하기

Apply는 dataframe의 각 행이나 열에 User-defined function을 실행할 때 사용하는 흔한 옵션입니다. axis=1로 실행하게 되면 각 row에 대해서 연산을 수행하게 됩니다. 하지만, 데이터 사이즈가 큰 경우 메모리 접근 방식과 및 벡터화 되지 않은 연산으로 성능 문제가 발생하기 쉽습니다. 이 글에서는 성능 이슈가 발생하는 원인과 해결책을 간단하게 정리해봤습니다. apply 함수의 성능 이슈 원인 1. 메모리 접근 패턴 pandas DataFrame은 열(column)-기반의 데이터 저장 구조를 사용합니다. 따라서 열 단위로 데이터에 접근하는 것이 메모리에서 연속적이므로 빠릅니다. 그러나 apply를 사용하여 행(row) 단위로 함수를 적용할 때, 각 행의 데이터는 여러 열에서 ..

Pandas DataFrame 조인 및 병합 / pandas merge, join, concat,

Pandas는 Python에서 데이터 조작 및 분석을 위한 강력한 도구로, 특히 DataFrame 객체를 통해 테이블 형식의 데이터를 다루는 데 유용합니다. DataFrame을 조인하는 것은 데이터 분석 및 처리에서 핵심적인 작업 중 하나입니다. 이번 글에서는 Pandas의 DataFrame을 조인하는 다양한 방법과 그 활용법에 대해 정리해보려고 합니다. Concat 함수는 Pandas 라이브러리에서 제공되는 함수로, 여러 개의 DataFrame 또는 Series를 연결(concatenate)하는 데 사용됩니다. 주어진 axis에 따라 객체를 열 또는 인덱스를 기준으로 연결합니다. pd.concat( objs, axis=0, join="outer", ignore_index=False, keys=None,..

Airflow CeleryKubernetesExecutor 사용하기

CeleryKubernetesExecutor 는 CeleryExecutor와 KubernetesExecutor 를 동시에 사용합니다. 즉, CeleryExecutor에 필요한 broker인 Redis와 항상 실행중인 celery worker를 사용하는 동시에 필요에 따라서 KubernetesExecutor를 활용할 수 있는 구성입니다. 언제 사용하는 것이 좋은가? Airflow 공식문서에서는 아래와 같은 상황에서의 사용을 권장합니다. 다음의 경우에 CeleryKubernetesExecutor를 고려하는 것을 권장합니다: 최대 수준에서 스케줄링해야 하는 작업의 수가 Kubernetes 클러스터가 편안하게 처리할 수 있는 규모를 초과하는 경우 작업 중 비교적 작은 부분이 런타임 격리를 필요로 하는 경우 Ce..

Pandas pivot_table 예제 및 설명

Pandas pivot_table 예제 및 설명 Pandas의 pivot_table은 데이터를 요약하고 분석하기에 유용한 도구입니다. 피벗테이블을 사용하면 특정 칼럼의 Data들을 column으로 해서 특정 값을 aggregate하는 새로운 분석 테이블을 만들어서 데이터를 분석할 수 있습니다. (예를 들어, g특히, 여러 개의 column으로 group by 되어 있는 테이블에서 한 번 더 group by를 해서 aggregate할 때 유용합니다. 피벗테이블이란? Pivot table은 스프레드시트 프로그램(예: Excel, Google Sheets)에서 자주 사용되는 데이터 요약 기능 중 하나입니다. Pivot table은 원시 데이터를 기반으로 요약된 정보를 생성하는 것으로, 데이터를 쉽게 분석하고 ..

Airflow Scheduler 역할 및 성능 개선 정리

Scheduler 개요 Scheduler는 Airflow의 주요 컴포넌트 중 하나입니다. Airflow의 스케줄러는 모든 작업과 DAG를 모니터링하고, 해당 작업들의 의존성이 완료된 후에 작업 인스턴스를 트리거합니다. 내부적으로 스케줄러는 서브프로세스를 생성하여 지정된 DAG 디렉토리의 모든 DAG를 모니터링하고 동기화합니다. 기본적으로 1분마다 스케줄러는 DAG 파싱 결과를 수집하고, 활성화된 작업들이 트리거될 수 있는지 확인합니다. 즉, Scheduler가 실행되지 않으면 DAG을 실행시킬 수 없습니다. Scheduler의 역할은 크게 아래와 같습니다. DAG(Directed Acyclic Graph)의 실행 스케줄링: Airflow에서 DAG는 작업들의 연결을 정의한 것이며, scheduler는 D..

Airflow Task 우선순위 설정하기(Priority weights)

Task Priority Weights 이 글은 Airflow Task의 우선순위를 priority_weight 파라미터와 weight_rule 을 이용해서 관리하는 방법을 설명합니다. 최근에 많은 양의 DAG을 rerun해야하는 경우가 발생했는데 우선순위가 높은 DAG의 스케쥴이 뒤로 밀려서 불편함을 겪어서 Task우선순위를 설정하는 방법에 대해서 찾아보게 됐습니다. Airflow는 task의 우선순위를 결정하기 위해 task의 priority_weights를 사용합니다. priority_weight는 각 task에 대해 정의되어 있으며, Airflow scheduler가 task를 실행할 때 사용되고, weight_rule는 priority_weight을 계산하는 방법에 대한 설정 값입니다. 해당 값..

Pandas dataframe 메모리 사용량 확인하기

Pandas의 dataframe 및 각 column의 메모리를 체크하는 방법은 매우 간단합니다. dataframe 전체 메모리 dataframe.info() 메서드를 이용하면 맨 아래 memory usage가 출력됩니다. >>> df.info() RangeIndex: 173511 entries, 0 to 173510 Data columns (total 47 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 id 173511 non-null int64 1 created_at 173511 non-null datetime64[ns] 2 updated_at 173511 non-null datetime64[ns] 3 deleted_..

반응형