데이터엔지니어링 7

BigQuery Merge Query 설명 및 사용 사례

이번 글에서는 Merge 쿼리에 대한 설명과 Merge문의 대표적 사용 사례인 Merge쿼리를 사용해서 MySQL BigQuery 간에 데이터 웨어하우징에 사용하는 사례를 함께 기록해 보려고 합니다. 계속 변경되는 데이터 소스를 기존의 데이터셋에 정기적으로 통합해야 할 때 자주 사용됩니다. Merge 쿼리 BigQuery에서 MERGE 쿼리는 SQL의 MERGE 문을 기반으로 하며, 두 개의 테이블을 결합하여 대상 테이블에 소스 테이블의 데이터를 삽입, 업데이트, 또는 삭제하는데 사용됩니다. 이것은 일종의 "upsert" 작업으로 보일 수 있으며, 즉 존재하지 않는 행은 삽입하고 존재하는 행은 업데이트하는 기능을 제공합니다. 위에서 설명한 것처럼 조건에 따라 삽입, 업데이트, 또는 삭제가 가능하기 때문에..

ETL과 ELT의 차이, ELT가 더 가치있는 이유 그리고 EtLT

ETL과 ELT의 차이 ETL과 ELT의 차이는 데이터를 소스에서 타켓으로Ingestion하는 과정에서 Transafrom을 언제 하는지에 대한 차이입니다. ETL은 Extract-Transfrom-Load 순으로 진행됩니다. 반면에 ELT는 Extract-Load-Transform순으로 진행됩니다. 많은 글에서 ELT가 데이터를 Transform하지 않은 상태로 DataLake 또는 Data Warehouse에 데이터를 적재하지 않기 때문에 원시 데이터로 부터 다양한 가공이 가능해서 더 데이터를 잘 활용할 수 있다고 설명하는 부분에 집중합니다. 하지만, ELT의 패러다임은 단순히 Transformation 순서만 바뀐 것이 아니라 Ingestion layer와 Transformation layer를 나..

Airflow CeleryKubernetesExecutor 사용하기

CeleryKubernetesExecutor 는 CeleryExecutor와 KubernetesExecutor 를 동시에 사용합니다. 즉, CeleryExecutor에 필요한 broker인 Redis와 항상 실행중인 celery worker를 사용하는 동시에 필요에 따라서 KubernetesExecutor를 활용할 수 있는 구성입니다. 언제 사용하는 것이 좋은가? Airflow 공식문서에서는 아래와 같은 상황에서의 사용을 권장합니다. 다음의 경우에 CeleryKubernetesExecutor를 고려하는 것을 권장합니다: 최대 수준에서 스케줄링해야 하는 작업의 수가 Kubernetes 클러스터가 편안하게 처리할 수 있는 규모를 초과하는 경우 작업 중 비교적 작은 부분이 런타임 격리를 필요로 하는 경우 Ce..

Pandas pivot_table 예제 및 설명

Pandas pivot_table 예제 및 설명 Pandas의 pivot_table은 데이터를 요약하고 분석하기에 유용한 도구입니다. 피벗테이블을 사용하면 특정 칼럼의 Data들을 column으로 해서 특정 값을 aggregate하는 새로운 분석 테이블을 만들어서 데이터를 분석할 수 있습니다. (예를 들어, g특히, 여러 개의 column으로 group by 되어 있는 테이블에서 한 번 더 group by를 해서 aggregate할 때 유용합니다. 피벗테이블이란? Pivot table은 스프레드시트 프로그램(예: Excel, Google Sheets)에서 자주 사용되는 데이터 요약 기능 중 하나입니다. Pivot table은 원시 데이터를 기반으로 요약된 정보를 생성하는 것으로, 데이터를 쉽게 분석하고 ..

Airflow Scheduler 역할 및 성능 개선 정리

Scheduler 개요 Scheduler는 Airflow의 주요 컴포넌트 중 하나입니다. Airflow의 스케줄러는 모든 작업과 DAG를 모니터링하고, 해당 작업들의 의존성이 완료된 후에 작업 인스턴스를 트리거합니다. 내부적으로 스케줄러는 서브프로세스를 생성하여 지정된 DAG 디렉토리의 모든 DAG를 모니터링하고 동기화합니다. 기본적으로 1분마다 스케줄러는 DAG 파싱 결과를 수집하고, 활성화된 작업들이 트리거될 수 있는지 확인합니다. 즉, Scheduler가 실행되지 않으면 DAG을 실행시킬 수 없습니다. Scheduler의 역할은 크게 아래와 같습니다. DAG(Directed Acyclic Graph)의 실행 스케줄링: Airflow에서 DAG는 작업들의 연결을 정의한 것이며, scheduler는 D..

Airflow Task 우선순위 설정하기(Priority weights)

Task Priority Weights 이 글은 Airflow Task의 우선순위를 priority_weight 파라미터와 weight_rule 을 이용해서 관리하는 방법을 설명합니다. 최근에 많은 양의 DAG을 rerun해야하는 경우가 발생했는데 우선순위가 높은 DAG의 스케쥴이 뒤로 밀려서 불편함을 겪어서 Task우선순위를 설정하는 방법에 대해서 찾아보게 됐습니다. Airflow는 task의 우선순위를 결정하기 위해 task의 priority_weights를 사용합니다. priority_weight는 각 task에 대해 정의되어 있으며, Airflow scheduler가 task를 실행할 때 사용되고, weight_rule는 priority_weight을 계산하는 방법에 대한 설정 값입니다. 해당 값..

Airflow Error - Triggerer's async thread was blocked for xx seconds, likely by a badly-written

이슈 정리 Sentry에서 Trigger에 Triggerer's async thread was blocked for xx seconds, likely by a badly-written .. 하는 에러가 Sentry를 통해 많이 발생한 것을 확인. 이슈가 발생한 코드 async def block_watchdog(self): """ Watchdog loop that detects blocking (badly-written) triggers. Triggers should be well-behaved async coroutines and await whenever they need to wait; this loop tries to run every 100ms to see if there are badly-wri..

반응형