Kafka의 Consumer Application 사용 목적으로 몇 가지 Library를 살펴보다가 Faust라는 라이브러리를 발견했다. Consumer 안에서 Django ORM을 사용해보려고 했는데 다양한 Python Package와 integration을 지원한다고 해서 관심을 가지게 됐고, POC 해보면서 느낀 점을 정리해보려고 한다.
처음 살펴보는 라이브러리이고 아직 Kafka에 대한 지식이 많지 않아서 기록 목적으로 남기는 점 참고 부탁드립니다!
주요 살펴볼 부분은 아래와 같았다.
- 로컬 셋팅 및 간단한 예제
- Django와 Integration
- 오프셋 관리 및 자동 재시도 및 오류 복구
- Consumer Group 관리 및 배포 편의성
- 커뮤니티&사용사례
Local Setup 및 기본 테스트
로컬에서 테스트를 잘 수행해 볼 수 있는지도 기술스택을 선택할 때 굉장히 중요한 요소이다.
Local에서는 docker-compose로 간단하게 zookeeper와 kafka를 구성한 후 Django project와 같이 사용해 볼 목적이라서 기존 Django Docker-compose 파일에 zooker와 kafka를 아래와 같이 추가로 구성했다.
version: '3'
services:
django app & db....
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka # kafka가 외부에 노출되는 호스트명
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # kafka가 zookeeper에 커넥션하기 위한 대상을 지정
volumes:
- kafka_data:/var/lib/kafka/data
당장 튼튼하게 구성하기보다는 테스트가 주요 목적이라 image는 인터넷에 많이 보이는 image를 이용했다.
- zookeeper: https://github.com/wurstmeister/zookeeper-docker
- kakfa: https://github.com/wurstmeister/kafka-docker
위와 같이 구성하고 아래와 같이 코드를 구성했을 때 잘 동작했다.
Producer
# kafka 컨테이너 id 체크
$ docker container ls
# kafka 컨테이너에 접속
$ docker exec -it 6846965eb5c5 bash
# topic 생성
root@087fec08d995:/# kafka-topics.sh --create --topic user --bootstrap-server 127.0.0.1:9092
Created topic user.
# message produce
kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic user
>{"id": 1, "name": "jordan"}
>{"id": 1, "name": "jordan"}
Consumer
설치
- Python docker 이미지에 faust-streaming==0.11.0 만 추가
샘플 코드
#consumers.py
import faust
app = faust.App('sample_consumer', broker="kafka://kafka:9092")
class User(faust.Record):
id: str
name: int
topic = app.topic('user', value_type=User)
@app.agent(topic)
async def add_user(users):
async for user in users:
# process infinite stream of orders.
print(f'Add User for {user.id}: {user.name}')
실행
- 위와 같은 방법으로 django app container에 접속
root@1176cdf350ac:/app# faust -A consumers worker -l info
┌ƒaµS† v0.11.0┬──────────────────────────────────────────┐
│ id │ sample_consumer │
│ transport │ [URL('kafka://kafka:9092')] │
│ store │ memory: │
│ web │ <http://localhost:6066/> │
│ log │ -stderr- (info) │
│ pid │ 32 │
│ hostname │ 1176cdf350ac │
│ platform │ CPython 3.11.9 (Linux aarch64) │
│ + │ Cython (GCC 12.2.0) │
│ drivers │ │
│ transport │ aiokafka=0.10.0 │
│ web │ aiohttp=3.9.4 │
│ datadir │ /app/sample_consumer-data │
│ appdir │ /app/sample_consumer-data/v1 │
└─────────────┴──────────────────────────────────────────┘
[2024-04-13 06:24:21,543] [32] [INFO] [^Worker]: Starting...
[2024-04-13 06:24:21,548] [32] [INFO] [^-App]: Starting...
[2024-04-13 06:24:21,548] [32] [INFO] [^--Monitor]: Starting...
[2024-04-13 06:24:21,548] [32] [INFO] [^--Producer]: Starting...
[2024-04-13 06:24:21,548] [32] [INFO] [^---ProducerBuffer]: Starting...
[2024-04-13 06:24:21,565] [32] [INFO] [^--CacheBackend]: Starting...
[2024-04-13 06:24:21,565] [32] [INFO] [^--Web]: Starting...
[2024-04-13 06:24:21,565] [32] [INFO] [^---Server]: Starting...
[2024-04-13 06:24:21,565] [32] [INFO] [^--Consumer]: Starting...
[2024-04-13 06:24:21,566] [32] [INFO] [^---AIOKafkaConsumerThread]: Starting...
[2024-04-13 06:24:21,572] [32] [INFO] [^--LeaderAssignor]: Starting...
[2024-04-13 06:24:21,572] [32] [INFO] [^--Producer]: Creating topic 'sample_consumer-__assignor-__leader'
[2024-04-13 06:24:21,575] [32] [INFO] [^--ReplyConsumer]: Starting...
[2024-04-13 06:24:21,576] [32] [INFO] [^--AgentManager]: Starting...
[2024-04-13 06:24:21,576] [32] [INFO] [^---Agent: consumers.add_user]: Starting...
[2024-04-13 06:24:21,576] [32] [INFO] [^----OneForOneSupervisor: (1@0xffff8afddb10)]: Starting...
[2024-04-13 06:24:21,577] [32] [INFO] [^---Conductor]: Starting...
[2024-04-13 06:24:21,577] [32] [INFO] [^--TableManager]: Starting...
[2024-04-13 06:24:21,577] [32] [INFO] [^---Conductor]: Waiting for agents to start...
[2024-04-13 06:24:21,577] [32] [INFO] [^---Conductor]: Waiting for tables to be registered...
[2024-04-13 06:24:22,580] [32] [INFO] [^---Recovery]: Starting...
[2024-04-13 06:24:22,582] [32] [INFO] [^--Producer]: Creating topic 'sample_consumer-__assignor-__leader'
[2024-04-13 06:24:22,589] [32] [INFO] Updating subscribed topics to:
┌Requested Subscription───────────────┐
│ topic name │
├─────────────────────────────────────┤
│ sample_consumer-__assignor-__leader │
│ user │
└─────────────────────────────────────┘
[2024-04-13 06:24:22,591] [32] [INFO] Subscribed to topic(s):
┌Final Subscription───────────────────┐
│ topic name │
├─────────────────────────────────────┤
│ sample_consumer-__assignor-__leader │
│ user │
└─────────────────────────────────────┘
[2024-04-13 06:24:22,607] [32] [INFO] Discovered coordinator 1001 for group sample_consumer
[2024-04-13 06:24:22,607] [32] [INFO] Revoking previously assigned partitions set() for group sample_consumer
[2024-04-13 06:24:22,609] [32] [INFO] (Re-)joining group sample_consumer
[2024-04-13 06:24:22,618] [32] [INFO] Joined group 'sample_consumer' (generation 3) with member_id faust-0.11.0-409b5584-b142-4607-bf81-647ebd061a8c
[2024-04-13 06:24:22,618] [32] [INFO] Elected group leader -- performing partition assignments using faust
[2024-04-13 06:24:22,625] [32] [INFO] Successfully synced group sample_consumer with generation 3
[2024-04-13 06:24:22,626] [32] [INFO] Setting newly assigned partitions
┌Topic Partition Set──────────────────┬────────────┐
│ topic │ partitions │
├─────────────────────────────────────┼────────────┤
│ sample_consumer-__assignor-__leader │ {0} │
│ user │ {0} │
└─────────────────────────────────────┴────────────┘ for group sample_consumer
[2024-04-13 06:24:22,627] [32] [INFO] Executing _on_partitions_assigned
[2024-04-13 06:24:22,627] [32] [INFO] generation id 3 app consumers id 3
[2024-04-13 06:24:22,628] [32] [INFO] [^---Recovery]: Seek stream partitions to committed offsets.
[2024-04-13 06:24:22,631] [32] [INFO] [^---Recovery]: Resuming flow...
[2024-04-13 06:24:22,631] [32] [INFO] [^---Fetcher]: Starting...
[2024-04-13 06:24:22,631] [32] [INFO] [^---Recovery]: Worker ready
[2024-04-13 06:24:22,631] [32] [INFO] [^Worker]: Ready
[2024-04-13 06:24:22,646] [32] [WARNING] Add User for 1: jordan
[2024-04-13 06:24:22,647] [32] [WARNING] Add User for 1: jordan
<aside> 💡 참고 sample_consumer-__assignor-__leader는 Faust의 내부 토픽으로 ****Consumer Group의 메타데이터 관리, Rebalancing 정보 저장등 목적으로 사용되는 토픽이라고 합니다.
</aside>
Integration with Django
간단하게 사용할 목적이라면 기존 사용하던 Django container에 faust패키지만 추가한 후 해당 이미지에서 consumer 코드에 아래 부분만 추가하면 Django와 integration 할 수 있다.
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings.local_settings")
django.setup()
Django ORM을 consumer 코드를 사용하면서 생각보다 불편했던 점은 Django ORM이
비동기 context 내에서는 실행되지 않도록 설계되어 있다 보니 모든 ORM을 async로 변경해줘야 하는데 아직 async 관련 인터페이스들이 부족하다 보니 이 과정이 코드도 많아지고, 번거로웠다…. 개인적으로 ORM을 사용함으로써 코드 가독성과 양이 많이 줄어서 장점이라고 생각했는데 이 부분은 상당히 아쉬웠다…
그나마 async를 지원하는 메서드로 로직 처리가 가능하다면 괜찮을 것 같다.
import os
import django
import faust
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings.local_settings")
django.setup()
from user.models import User
app = faust.App('sample_consumer', broker="kafka://kafka:9092")
class User(faust.Record):
id: str
name: int
topic = app.topic('user', value_type=User)
@app.agent(topic)
async def add_user(users):
async for user in users:
# process infinite stream of orders.
print(f'Add User for {user.id}: {user.name}')
await User.objects.acreate(name=user.name)
다양한 Django app에서 agents를 생성 후 상단에 django 관련 설정 실행 없이 Django와 integration 할 수 있는 예제도 github에 공개되어 있다.
오프셋 관리 및 자동 재시도 및 오류 복구
Kafka Consumer에서 오프셋관리 및 실패했을 때 재시도하는 부분은 가장 중요한 부분 중 하나다.
broker에 commit
Consumer의 commit은 백그라운드에서 2.8초마다 실행된다. commit interval은 [broker_commit_interval](<https://faust-streaming.github.io/faust/userguide/settings.html#id31>) 값을 이용해서 조정할 수 있다.
Agent가 실패했을 때
Agent가 실패했을 때도 해당 event는 완료된 것으로 처리된다. 이렇게 구현한 이유에 대해서는 문서에서 조금 더 자세히 설명하고 있다. 그렇기 때문에 오류가 발생하는 경우 topic에서 이벤트 순서가 바뀌지 않도록 재시도하는 로직을 추가하거나 크래시의 경우 오류를 기록하고 스트림을 재생성하고 재처리하는 것을 권장하고 있다.
- 재시도하는 로직이 내부에 구현되어 있지는 않고, retry library를 활용해서 사용하면 될 것 같다.
- 크래시에 의해서 실패하는 경우는 Dead letter topic이라던지, 외부 데이터 저장소에 기록한다던지 해서 수정 후 재생성 및 재처리가 필요할 것 같다.
Consumer Group 관리 및 배포 편의성
Faust에서는 app의 application instance이자 Consumer Group은이다. App 내에서 agent라는 개념을 통해서 consumer group 같이 사용할 수 있다. 또, 병렬 처리 같은 경우는 agent에 concurrency값를 이용해서 병렬 처리할 수 있다.
하나의 processd에서 여러 app 실행 (여러 consumer group)
# Run multiple application instances in the same process
>> app1 = faust.App('demo1')
>> app2 = faust.App('demo2')
하나의 app에서 multi-agents로 여러 agent가 같은 topic 처리
# ...위 동일
# 하나의 app에 2개의 agent
@app.agent(topic)
async def add_user(users):
async for user in users:
print(f'Add User for {user.id}: {user.name}')
@app.agent(topic)
async def add_log(users):
async for user in users:
print(f'Add User Log for {user.id}: {user.name}')
# producer
>{"id": 1, "name": "jordan"}
>{"id": 1, "name": "jordan"}
# consumer
[2024-04-13 12:44:47,585] [50] [WARNING] Add User for 1: jordan
[2024-04-13 12:44:47,586] [50] [WARNING] Add User Log for 1: jordan
[2024-04-13 12:44:59,581] [50] [WARNING] Add User for 1: jordan
[2024-04-13 12:44:59,581] [50] [WARNING] Add User Log for 1: jordan
배포 편의성은 여러 app을 만들어서 배포하기도 편하고, app의 같은 topic 여러 agent를 붙여도 consumer group과 같은 동작을 하도록 구현이 가능해서 파이썬에서 flask나 celery 등을 사용해 본 사람이라면 K8s든지 어떤 형태던지 쉽게 배포가 가능할 것 같다.
커뮤니티&사용사례
Faust는 Python에서 Kafka 스트림 처리를 위해 사용되는 라이브러리로 Celery 프로젝트의 작성자인 Robinhood 팀에 의해 개발되었으며, Python에서 비동기 프로그래밍을 가능하게 하는 asyncio 라이브러리를 기반으로 한다. 사실 Github이나 Stackoverflow를 봤을 때는 활발하게 개발이 이뤄지고 있지는 않은 것 같다. Python의 다양한 라이브러리와 같이 쓸 수 있다는 점과 비동기 구현으로 고성능이라는 점에서 살펴봤지만 이슈가 발생했을 때 관련 이슈를 찾아보거나 기능 지원이 잘 될 거라고 생각하기는 쉽지 않을 것 같다.
Stackoverflow
'Computer Engineering > Data Engineering' 카테고리의 다른 글
Publish/Subscribe 와 Producer/Consumer 메시징 시스템 (0) | 2024.04.10 |
---|---|
BigQuery Merge Query 설명 및 사용 사례 (0) | 2023.11.05 |
Airflow KubernetesPodOperator 예제 코드 및 설명 (0) | 2023.07.22 |
초기 스타트업에서 Data Engineer는 어떤 일을 하나요? (0) | 2023.07.10 |
Pandas DataFrame apply 성능 이슈 개선하기 (0) | 2023.07.08 |