Computer Engineering/My Stack Overflow

Airflow(에어플로우) could not queue task issue

jordan.bae 2022. 11. 22. 19:00

Could not queue task issue

GKE위에서 Airflow2.3.4 버젼에 kubernates executor의 scheduler에서 task를 queue에 넣지 못하는 이슈가 발생. 실질적으로 문제는 되지 않는데 그 이유는 이미 task가 queue로 들어갔기 때문이다.
코드를 디테일하게 살펴보지는 않았지만 대략적으로 에러 발생전 실해된 쿼리를 보면 lock을 획득과정에서 이슈가 있는 것 같다.

SELECT pg_try_advisory_xact_lock(%(id)s)

해당 에러가 발생하는 코드는 아래 코드같은데 이 self.queued_tasks 컨테이너 안에 해당 key가 있다는건 lock 로직이 제대로 동작하지 않은 것으로 보인다.

    def queue_command(
        self,
        task_instance: TaskInstance,
        command: CommandType,
        priority: int = 1,
        queue: Optional[str] = None,
    ):
        """Queues command to task"""
        if task_instance.key not in self.queued_tasks:
            self.log.info("Adding to queue: %s", command)
            self.queued_tasks[task_instance.key] = (
                command,
                priority,
                queue,
                task_instance,
            )
        else:
            self.log.error("could not queue task %s", task_instance.key)

Solution

다른 사람들도 해당 이슈를 겪고 있었고, github issue에도 올라와 있었다.
https://github.com/apache/airflow/issues/26394

could not queue task - airflow 2.3.4 - kubernetes executor · Issue #26394 · apache/airflow

Apache Airflow version 2.3.4 What happened Tasks are no longer scheduled since upgrading to airflow.2.3.4, with logs: {base_executor.py:211} INFO - task TaskInstanceKey(dag_id='sys_liveness'...

github.com

이슈에 Fix PR링크가 없어서 정확한 확인은 하지 못했고, release에 관련 이슈가 보이지 않지만 2.4.1 에서 해결된 것으로 보인다. 이슈가 다른 이슈랑 같이 해결된 느낌이다.

Kubernetes Executor (1.22.11) / Airflow 2.4.1-python3.9 / Helm Chart 1.7.0 with dag processor enabled.


잘 동작하는지는 업데이트 후에 결과를 업데이트 하겠습니다!

반응형