이번 글에서는 Merge 쿼리에 대한 설명과 Merge문의 대표적 사용 사례인 Merge쿼리를 사용해서 MySQL <-> BigQuery 간에 데이터 웨어하우징에 사용하는 사례를 함께 기록해 보려고 합니다. 계속 변경되는 데이터 소스를 기존의 데이터셋에 정기적으로 통합해야 할 때 자주 사용됩니다.
Merge 쿼리
BigQuery에서 MERGE 쿼리는 SQL의 MERGE 문을 기반으로 하며, 두 개의 테이블을 결합하여 대상 테이블에 소스 테이블의 데이터를 삽입, 업데이트, 또는 삭제하는데 사용됩니다. 이것은 일종의 "upsert" 작업으로 보일 수 있으며, 즉 존재하지 않는 행은 삽입하고 존재하는 행은 업데이트하는 기능을 제공합니다.
위에서 설명한 것처럼 조건에 따라 삽입, 업데이트, 또는 삭제가 가능하기 때문에 CDC 데이터와 함께 사용하면 안성맞춤입니다. 자세한 이야기는 밑에서 다시 다뤄 보겠습니다.
Merge 쿼리 구조
MERGE INTO `target_table` USING `source_table`
ON `merge_condition`
WHEN MATCHED THEN
UPDATE SET `column1` = `value1`, `column2` = `value2`, ...
WHEN NOT MATCHED THEN
INSERT (`column1`, `column2`, ...) VALUES (`value1`, `value2`, ...)
- target_table: 업데이트하거나, 새로운 행을 삽입할 대상 테이블입니다.
- source_table: 대상 테이블에 통합할 데이터를 포함하는 소스 테이블입니다.
- merge_condition: ON 절에 명시된 조건으로, 두 테이블 간의 대응 행을 식별하는데 사용됩니다.
- WHEN MATCHED THEN: 대상 테이블과 소스 테이블에서 조건에 맞는 행이 매치될 경우 수행할 작업입니다.
- WHEN NOT MATCHED THEN: 대상 테이블에 소스 테이블의 해당 행이 없을 경우 수행할 작업입니다.
쿼리 구조에서 업데이트 내용에 해당하는 내용을 source table을 먼저 넣어 줘야 하는 작업이 추가되기는 하지만 BigQuery에서 한 번에 Load 하는 작업은 각 row를 순회하면서 DML을 실행시키는 것보다 효율적입니다.
Merge 쿼리로 RDBMS 변경 데이터를 BigQuery에 지속적으로 병합하기
AWS DMS를 사용하면 A아래와 같은 CDC데이터를 얻을 수 있습니다.
Op,dms_seq_id,id,name,date,city
I,20231104231028000000000000039599921,101,Smith,Bob,4-Jun-14,New York
U,20231104231028000000000000039599922,101,Smith,Bob,8-Oct-15,Los Angeles
U,20231104231028000000000000039599923,101,Smith,Bob,13-Mar-17,Dallas
D,20231104231028000000000000039599924,101,Smith,Bob,13-Mar-17,Dallas
맨 앞에 있는 I = Inset, U= Update, D= Delete 를 의미합니다.
위와 같은 데이터를 source table로 저장후 BigQuery에서 해당 테이블을 복제본인 target table로 merge 해주면 간단하게 변경된 Source RDBMS의 변경된 데이터를 BigQuery에 적용할 수 있습니다. 참고로 source_table을 위한 emp dataset을 테이블 자동 만료 속성과 함께 생성한 후 해당 dataset에 생성하면 관리가 편리합니다.
# 예시 테라폼 코드
resource "google_bigquery_dataset" "stage" {
project = var.gcp_project
dataset_id = "stage"
description = "for stage(temp) table. please remember table default_table_expiration_ms is just three days"
location = var.gcp_location
labels = var.labels
default_table_expiration_ms = var.stage_default_table_expiration_ms
}
(물론 디테일하게 다른 과정이 필요하긴합니다. 자세한 부분은 나중에 따로 포스팅할 기회가 있으면 하도록 하겠습니다.)
위의 예시에서 사용핳 수 있는 Merge Query template
# 위의 예시
statement = statement = f"""
MERGE `{target}` T USING `{source}` S
ON S.id=T.id
WHEN MATCHED AND S.Op = 'D' THEN DELETE
WHEN MATCHED AND S.Op = 'U' THEN UPDATE SET name=S.name, date=S.date, city=S.city
WHEN NOT MATCHED AND (S.Op = 'I' OR S.Op = 'U') THEN
INSERT (name, date, city) VALUES (S.name, S.date, S.city)
"""
# 이런식으로 templating해서 사용하면 편리합니다.
update_string = ",".join([f"{column}=S.{column}" for column in columns])
statement = f"""
MERGE `{target}` T USING `{source}` S
ON {' AND '.join([f"T.{key} = S.{key}" for key in unique_keys])}
WHEN MATCHED AND S.Op = 'D' THEN DELETE
WHEN MATCHED AND S.Op = 'U' THEN UPDATE SET {update_string}
WHEN NOT MATCHED AND (S.Op = 'I' OR S.Op = 'U') THEN
INSERT ({",".join(columns)}) VALUES ({",".join(columns)})
"""
WHEN NOT MATCHED AND (S.Op = 'I' OR S.Op = 'U') THEN 한 부분은 dms_seq_id로 같은 배치(파티션)에 적용되는 데이터 중 unique key가 같은 경우(즉, insert, update, delete가 여러 번 있었던 경우) 마지막 변경 분에 대해서만 source 테이블에 저장하기 때문입니다.
이번 글에서는 BigQuery Merge쿼리와 데이터웨어하우스로 빅쿼리를 사용할 때 소스 데이터를 복제할 때 Merge쿼리를 어떻게 같이 쓸 수 있는지 정리해 봤습니다.
'Computer Engineering > Data Engineering' 카테고리의 다른 글
Python Kafka Consumer 라이브러리 Faust PoC하기 (Feat. Django) (0) | 2024.04.13 |
---|---|
Publish/Subscribe 와 Producer/Consumer 메시징 시스템 (0) | 2024.04.10 |
Airflow KubernetesPodOperator 예제 코드 및 설명 (0) | 2023.07.22 |
초기 스타트업에서 Data Engineer는 어떤 일을 하나요? (0) | 2023.07.10 |
Pandas DataFrame apply 성능 이슈 개선하기 (0) | 2023.07.08 |