Scheduler 개요
Scheduler는 Airflow의 주요 컴포넌트 중 하나입니다.
Airflow의 스케줄러는 모든 작업과 DAG를 모니터링하고, 해당 작업들의 의존성이 완료된 후에 작업 인스턴스를 트리거합니다. 내부적으로 스케줄러는 서브프로세스를 생성하여 지정된 DAG 디렉토리의 모든 DAG를 모니터링하고 동기화합니다. 기본적으로 1분마다 스케줄러는 DAG 파싱 결과를 수집하고, 활성화된 작업들이 트리거될 수 있는지 확인합니다.
즉, Scheduler가 실행되지 않으면 DAG을 실행시킬 수 없습니다.
Scheduler의 역할은 크게 아래와 같습니다.
- DAG(Directed Acyclic Graph)의 실행 스케줄링: Airflow에서 DAG는 작업들의 연결을 정의한 것이며, scheduler는 DAG의 실행 스케줄링을 관리합니다. 스케줄러는 DAG가 실행되는 빈도, 실행 시간, 실행 지연 등을 관리합니다.
- 작업(task) 인스턴스화: 스케줄러는 DAG에서 정의한 작업들을 실제로 실행 가능한 작업 인스턴스로 변환합니다. 작업 인스턴스는 실행을 위한 실행 컨텍스트와 함께 생성됩니다.
- 작업간 의존성 관리: DAG에서 작업 간의 의존성을 관리합니다. 예를 들어, 하나의 작업이 실행되기 전에 이전 작업이 모두 완료되어야 하는 경우, 스케줄러는 작업 간의 의존성을 고려하여 DAG의 실행을 관리합니다.
- 실행 중인 작업 모니터링: 스케줄러는 실행 중인 작업을 모니터링하고 작업의 상태를 추적합니다. 작업이 성공적으로 완료되면 스케줄러는 DAG의 다음 작업을 실행합니다. 그러나 작업이 실패하면 스케줄러는 재시도(retry) 또는 DAG의 실행을 중단하는 등의 조치를 취할 수 있습니다.
- 스케줄러 상태 관리: 스케줄러는 DAG와 작업의 실행 상태를 추적하고 로그를 기록합니다. 이를 통해 사용자는 DAG와 작업의 실행 상태를 모니터링하고 문제를 파악할 수 있습니다.
작동 원리에 대해서 살펴보기에 앞서 Scheduler와 관련된 몇몇 사항에 대해서 살펴봅니다.
Schedule, execution_date
Scheduler는 해당 기간의 끝에 DagRun을 스케쥴링(Trigger)합니다.
만약 hourly로 설정이 되어 있고 execution_time이 2023-01-00 00:00:00 이라면 해당 스케쥴의 DagRun은 2023-02-00 00:00:00에 실행이 됩니다.
Task Priority
스케줄러는 고 처리량을 목표로 설계되었습니다. 이는 가능한 한 빨리 작업을 스케줄링하기 위한 결정된 설계 결정입니다. 스케줄러는 풀에서 사용 가능한 빈 슬롯 수를 확인하고 한 번에 해당 수의 작업 인스턴스를 예약합니다. 이는 작업 우선순위가 큐 슬롯보다 예약된 작업이 더 많은 경우에만 영향을 미칩니다. 따라서 동일한 배치를 공유하는 경우 낮은 우선순위 작업이 높은 우선순위 작업보다 먼저 예약될 수있는 경우가 있습니다.
자세한 논의
→ https://github.com/apache/airflow/discussions/28809
DAG File Processing
Airflow 스케줄러는 DAG 폴더에 포함 된 Python 파일을 DAG 객체로 변환하고 예약할 작업을 포함하는 프로세스를 시작하는 책임을 지도록 구성할 수 있습니다.
Running Multiple Scheduler
Airflow는 성능 및 내구성을 위해 동시에 여러 개의 스케줄러를 실행할 수 있습니다.
Multiple Scheduler를 활용하면 metadata database에서 데이터를 가져오는 부분에 장점을 가지고 있습니다.
스케줄러는 이제 직렬화 된 DAG 표현을 사용하여 스케줄링 결정을 내리며, 스케줄링 루프의 개요는 다음과 같습니다.
- 새로운 DagRun이 필요한 DAG를 확인하고 DagRun을 생성합니다.
- 스케줄링 가능한 TaskInstances 또는 완료된 DagRuns의 일괄 처리를 검토합니다.
- 스케줄링 가능한 TaskInstances를 선택하고 Pool 제한 및 기타 동시성 제한을 고려하여 실행을 위해 enqueue합니다.
Requirements
PostgreSQL 10+ 또는 MySQL 8+를 사용하는 경우 추가 설정이나 구성 옵션 없이 여러 개의 스케줄러 복사본을 실행할 수 있습니다.
Optimizing Scheduler Performance
스케줄러는 두 가지 작업을 담당합니다:
- DAG 파일을 지속적으로 파싱하고 데이터베이스 내의 DAG와 동기화하는 작업
- 실행할 태스크를 지속적으로 스케줄링하는 작업
스케줄러를 조정하기 위한 고려사항
이 두 작업은 스케줄러에 의해 병렬로 실행되며 서로 다른 프로세스에서 독립적으로 실행됩니다. 스케줄러를 세밀하게 조정하기 위해서는 다음과 같은 요소를 고려해야 합니다:
- 파일 시스템과 컴퓨팅 리소스(CPU, Memory, Network throughput)
- DAG 구조(DAG 파일 수, 파일 크기, Import time의 최적화)
- Scheduler Configuration(스케쥴러 수, 스케줄러 내 파싱 프로세스 수, DAG 파싱 주기 등)
Airflow에서는 Scheduler의 fine tunning을 지원하기 때문에 자신의 컴퓨팅 리소스와 니즈에 맞게 값을 잘 설정해야합니다. 이는 모니터링을 통해서 조정해 나갈 수 있습니다.
Airflow 스케줄러 성능을 제한하는 여러 리소스
- 파일시스템 성능 (DAG 파일을 가지고 있는 파일 시스템)
- DAG 분배 메커니즘 (DAG을 docker image포함 or gitsync를 이용)
- CPU Usage (min_file_process_interval , DAG 파일 수 및 구조)
- Memory Usage
스케줄러의 성능을 개선하기 위해 고려할 수 있는 개선 방안
- DAG Python 코드의 로직, 효율성 및 복잡성 개선.
→DAG코드 파싱부분이 많이 리소스를 필요로 함으로 이 부분을 개선해서 성능을 새선할 수 있습니다.
- resource 활용 개선 → 남는 자원들이 있으면 scheduler에 추가 resource 할당
- Hardware resource 추가
- Scheduler configration 조정 (parsing sort등)
Scheduler Configuration Options
- max_dagruns_to_create_per_loop
- max_dagruns_per_loop_to_schedule
- use_row_level_locking
- pool_metrics_interval
- orphaned_tasks_check_interval
- dag_dir_list_interval
- file_parsing_sort_mode
- max_tis_per_query
- min_file_process_interval
- parsing_processes
- scheduler_idle_sleep_time
- schedule_after_task_execution
성능개선을 위해 주로 조정하는 값으로는 file processing 주기인 min_file_process_interval 과 scheduler에서 parsing process의 수를 조정하는 parsing_processes 가 있습니다.
Reference
'Computer Engineering > Data Engineering' 카테고리의 다른 글
Airflow CeleryKubernetesExecutor 사용하기 (0) | 2023.06.16 |
---|---|
Pandas pivot_table 예제 및 설명 (0) | 2023.05.01 |
Airflow Task 우선순위 설정하기(Priority weights) (0) | 2023.04.06 |
Pandas dataframe 메모리 사용량 확인하기 (0) | 2022.12.02 |
Airflow Taskflow로 DAG refactoring하기 (0) | 2022.11.14 |