Computer Engineering/Data Engineering

Airflow Task 우선순위 설정하기(Priority weights)

jordan.bae 2023. 4. 6. 22:18

Task Priority Weights

이 글은 Airflow Task의 우선순위를 priority_weight 파라미터와 weight_rule 을 이용해서 관리하는 방법을 설명합니다. 최근에 많은 양의 DAG을 rerun해야하는 경우가 발생했는데 우선순위가 높은 DAG의 스케쥴이 뒤로 밀려서 불편함을 겪어서 Task우선순위를 설정하는 방법에 대해서 찾아보게 됐습니다. Airflow는 task의 우선순위를 결정하기 위해 task의 priority_weights를 사용합니다.

priority_weight는 각 task에 대해 정의되어 있으며, Airflow scheduler가 task를 실행할 때 사용되고, weight_rule는 priority_weight을 계산하는 방법에 대한 설정 값입니다.

해당 값들을 이용해서 효율적으로 task의 우선순위를 설정할 수 있습니다.

 

priority_weight

Executor는 priority_weight이 큰 task를 먼저 trigger하게 됩니다.

기본 값은 1로 설정되어 있기 때문에 우선순위를 높이고 싶다면 1보다 큰 값을 설정하면 됩니다

BaseOperator의 인자를 살펴보면 아래와 같이 주석이 되어 있습니다.

:param priority_weight: priority weight of this task against other task.
        This allows the executor to trigger higher priority tasks before
        others when things get backed up. Set priority_weight as a higher
        number for more important tasks.

jobs/cheduler_job.py 코드에서 보면 task instance priority_weight 을 통해 ordering하는 것을 확인할 수 있습니다.

            # Get task instances associated with scheduled
            # DagRuns which are not backfilled, in the given states,
            # and the dag is not paused
            query = (
                session.query(TI)
                .with_hint(TI, 'USE INDEX (ti_state)', dialect_name='mysql')
                .join(TI.dag_run)
                .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
                .join(TI.dag_model)
                .filter(not_(DM.is_paused))
                .filter(TI.state == TaskInstanceState.SCHEDULED)
                .options(selectinload('dag_model'))
                .order_by(-TI.priority_weight, DR.execution_date)
            )

 

weight_rule

Airflow에서는 세 가지 가중치 계산 방법을 제공합니다. 기본 값은 downstream입니다. 그렇기 때문에 priority_weight도 기본 값인 경우에는 하위 task들이 많은 task부터 실행되게 됩니다. 즉, 중요한 DAG의 마지막 task가 따로 설정하지 않으면 다른 DAG의 상위 task들에게 우선순위에서 밀릴 수가 있습니다.

downstream

  • downstream 방법은 task의 가중치를 downstream에 있는 하위 task들의 가중치 합으로 계산합니다.
  • 따라서, upstream에 있는 task는 downstream task에 비해 높은 가중치를 가지며, 가중치 값을 높이면 더욱 높은 우선순위로 스케쥴링됩니다.
  • 이 방법은 여러 개의 DAG 실행 인스턴스가 있는 경우, 모든 DAG가 downstream task를 처리하기 전에 upstream task가 모두 완료될 수 있도록 보장하는 데 유용합니다.
  •  

upstream

  • upstream 방법은 task의 가중치를 upstream에 있는 상위 task들의 가중치 합으로 계산합니다.
  • 따라서, downstream에 있는 task는 upstream task에 비해 높은 가중치를 가지며, 가중치 값을 높이면 더욱 높은 우선순위로 스케쥴링됩니다.
  • 이 방법은 여러 개의 DAG 실행 인스턴스가 있는 경우, 각 DAG가 완료된 후에 다른 DAG의 upstream task를 시작하도록 보장하는 데 유용합니다.

absolute

  • absolute 방법은 추가적인 가중치 없이 우선순위 가중치의 정확한 값으로 계산됩니다.
  • 이 방법은 task의 우선순위 가중치를 정확하게 알 때 유용하며, 매우 큰 DAG의 경우 task 생성 프로세스를 크게 빠르게 할 수 있습니다.

 

간단하게 Airflow에서 task들의 우선순위를 설정하는 방법에 대해서 정리해봤습니다.

혹시, 제가 잘 못 이해하고 있는 부분이 있거나 추가로 궁금하신 부분이 있다면 편하게 댓글로 알려주세요!

반응형