Computer Engineering/Data Engineering 18

Python Kafka Consumer 라이브러리 Faust PoC하기 (Feat. Django)

Kafka의 Consumer Application 사용 목적으로 몇 가지 Library를 살펴보다가 Faust라는 라이브러리를 발견했다. Consumer 안에서 Django ORM을 사용해보려고 했는데 다양한 Python Package와 integration을 지원한다고 해서 관심을 가지게 됐고, POC 해보면서 느낀 점을 정리해보려고 한다. 처음 살펴보는 라이브러리이고 아직 Kafka에 대한 지식이 많지 않아서 기록 목적으로 남기는 점 참고 부탁드립니다! 주요 살펴볼 부분은 아래와 같았다. 로컬 셋팅 및 간단한 예제 Django와 Integration 오프셋 관리 및 자동 재시도 및 오류 복구 Consumer Group 관리 및 배포 편의성 커뮤니티&사용사례 Local Setup 및 기본 테스트 로컬에..

Publish/Subscribe 와 Producer/Consumer 메시징 시스템

Publish/Subscribe(이하 Pub/Sub) 메시징 시스템과 Producer/Consumer 메시징 시스템에 대해서는 대부분 Kafka나 RabbitMQ 등 여러 메시징 시스템을 리서치 후 선택할 때 자주 본 개념일 것입니다. 하지만, Kafka 도입을 위해서 살펴볼 때 Pub/Sub 메시징 시스템인데도 불구하고 Producer와 Consumer라는 개념을 맞닥뜨리게 될 때 '내가 잘 못 알고 있었나?'라는 생각이 들면서 다시 찾아보게 됩니다. 결론부터 얘기하면 Kafka의 기본 아키텍처와 데이터 처리 모델에 주안해서 네이밍을 했기 때문입니다. Kafka는 본질적으로 Publish-Subscribe 모델을 기반으로 하지만, 그 구현 방식과 사용 사례가 전통적인 Publish-Subscribe ..

BigQuery Merge Query 설명 및 사용 사례

이번 글에서는 Merge 쿼리에 대한 설명과 Merge문의 대표적 사용 사례인 Merge쿼리를 사용해서 MySQL BigQuery 간에 데이터 웨어하우징에 사용하는 사례를 함께 기록해 보려고 합니다. 계속 변경되는 데이터 소스를 기존의 데이터셋에 정기적으로 통합해야 할 때 자주 사용됩니다. Merge 쿼리 BigQuery에서 MERGE 쿼리는 SQL의 MERGE 문을 기반으로 하며, 두 개의 테이블을 결합하여 대상 테이블에 소스 테이블의 데이터를 삽입, 업데이트, 또는 삭제하는데 사용됩니다. 이것은 일종의 "upsert" 작업으로 보일 수 있으며, 즉 존재하지 않는 행은 삽입하고 존재하는 행은 업데이트하는 기능을 제공합니다. 위에서 설명한 것처럼 조건에 따라 삽입, 업데이트, 또는 삭제가 가능하기 때문에..

Airflow KubernetesPodOperator 예제 코드 및 설명

KubernetesPodOperator는 Kuberntes cluster에서 Airflow가 실행중일 때 사용자가 원하는 docker image에서 task를 실행하는 task를 만드는 Operator입니다. Airflow는 여러 가지 서비스들을 Orchestration할 수 있다는 강점을 가지고 있습니다. 직접 데이터를 처리하는 서비스들을 여러 개 만들수도 있는데 이 때 각각의 서비스의 의존성이 다를 수 있는데 이 때 KubernetesPodOperator 를 사용하면 독립적인 컨테이너 환경에서 서비스를 실행할 수 있습니다. Install KubernetesPodOperator 를 위해 필요한 kubernetes provider 패키지 설치 pip install apache-airflow[cncf.ku..

초기 스타트업에서 Data Engineer는 어떤 일을 하나요?

이 글은 Medium 회사 블로그에 올린 글을 가져온 글 입니다! 안녕하세요. 저는 Verticah에서 Head of Data로 일하고 있는 Jordan입니다. 회사의 기술 블로그에 첫 번째 글로 초기 스타트업에서 데이터 엔지니어로 어떤 일을 하고 있는지를 소개해 보려고 합니다. 저희 팀은 현재 11명의 구성원으로 이루어진 Seed 투자를 받은 초기 스타트업으로 혁신 성장기업의 자금 조달 방식을 혁신하자는 비전 아래 Revenue Market이라는 미래의 매출을 판매할 수 있는 벤처 대출 플랫폼을 만들고 있습니다. Why an early-stage team needs a data guy 많은 회사들이 데이터 관련 직무를 회사가 어느 정도 성장한 후(A 시리즈 이후)에 채용합니다. 하지만, 데이터와 관련된..

Pandas DataFrame apply 성능 이슈 개선하기

Apply는 dataframe의 각 행이나 열에 User-defined function을 실행할 때 사용하는 흔한 옵션입니다. axis=1로 실행하게 되면 각 row에 대해서 연산을 수행하게 됩니다. 하지만, 데이터 사이즈가 큰 경우 메모리 접근 방식과 및 벡터화 되지 않은 연산으로 성능 문제가 발생하기 쉽습니다. 이 글에서는 성능 이슈가 발생하는 원인과 해결책을 간단하게 정리해봤습니다. apply 함수의 성능 이슈 원인 1. 메모리 접근 패턴 pandas DataFrame은 열(column)-기반의 데이터 저장 구조를 사용합니다. 따라서 열 단위로 데이터에 접근하는 것이 메모리에서 연속적이므로 빠릅니다. 그러나 apply를 사용하여 행(row) 단위로 함수를 적용할 때, 각 행의 데이터는 여러 열에서 ..

Pandas DataFrame 조인 및 병합 / pandas merge, join, concat,

Pandas는 Python에서 데이터 조작 및 분석을 위한 강력한 도구로, 특히 DataFrame 객체를 통해 테이블 형식의 데이터를 다루는 데 유용합니다. DataFrame을 조인하는 것은 데이터 분석 및 처리에서 핵심적인 작업 중 하나입니다. 이번 글에서는 Pandas의 DataFrame을 조인하는 다양한 방법과 그 활용법에 대해 정리해보려고 합니다. Concat 함수는 Pandas 라이브러리에서 제공되는 함수로, 여러 개의 DataFrame 또는 Series를 연결(concatenate)하는 데 사용됩니다. 주어진 axis에 따라 객체를 열 또는 인덱스를 기준으로 연결합니다. pd.concat( objs, axis=0, join="outer", ignore_index=False, keys=None,..

Airflow CeleryKubernetesExecutor 사용하기

CeleryKubernetesExecutor 는 CeleryExecutor와 KubernetesExecutor 를 동시에 사용합니다. 즉, CeleryExecutor에 필요한 broker인 Redis와 항상 실행중인 celery worker를 사용하는 동시에 필요에 따라서 KubernetesExecutor를 활용할 수 있는 구성입니다. 언제 사용하는 것이 좋은가? Airflow 공식문서에서는 아래와 같은 상황에서의 사용을 권장합니다. 다음의 경우에 CeleryKubernetesExecutor를 고려하는 것을 권장합니다: 최대 수준에서 스케줄링해야 하는 작업의 수가 Kubernetes 클러스터가 편안하게 처리할 수 있는 규모를 초과하는 경우 작업 중 비교적 작은 부분이 런타임 격리를 필요로 하는 경우 Ce..

Pandas pivot_table 예제 및 설명

Pandas pivot_table 예제 및 설명 Pandas의 pivot_table은 데이터를 요약하고 분석하기에 유용한 도구입니다. 피벗테이블을 사용하면 특정 칼럼의 Data들을 column으로 해서 특정 값을 aggregate하는 새로운 분석 테이블을 만들어서 데이터를 분석할 수 있습니다. (예를 들어, g특히, 여러 개의 column으로 group by 되어 있는 테이블에서 한 번 더 group by를 해서 aggregate할 때 유용합니다. 피벗테이블이란? Pivot table은 스프레드시트 프로그램(예: Excel, Google Sheets)에서 자주 사용되는 데이터 요약 기능 중 하나입니다. Pivot table은 원시 데이터를 기반으로 요약된 정보를 생성하는 것으로, 데이터를 쉽게 분석하고 ..

Airflow Scheduler 역할 및 성능 개선 정리

Scheduler 개요 Scheduler는 Airflow의 주요 컴포넌트 중 하나입니다. Airflow의 스케줄러는 모든 작업과 DAG를 모니터링하고, 해당 작업들의 의존성이 완료된 후에 작업 인스턴스를 트리거합니다. 내부적으로 스케줄러는 서브프로세스를 생성하여 지정된 DAG 디렉토리의 모든 DAG를 모니터링하고 동기화합니다. 기본적으로 1분마다 스케줄러는 DAG 파싱 결과를 수집하고, 활성화된 작업들이 트리거될 수 있는지 확인합니다. 즉, Scheduler가 실행되지 않으면 DAG을 실행시킬 수 없습니다. Scheduler의 역할은 크게 아래와 같습니다. DAG(Directed Acyclic Graph)의 실행 스케줄링: Airflow에서 DAG는 작업들의 연결을 정의한 것이며, scheduler는 D..

반응형