Could not queue task issue
GKE위에서 Airflow2.3.4 버젼에 kubernates executor의 scheduler에서 task를 queue에 넣지 못하는 이슈가 발생. 실질적으로 문제는 되지 않는데 그 이유는 이미 task가 queue로 들어갔기 때문이다.
코드를 디테일하게 살펴보지는 않았지만 대략적으로 에러 발생전 실해된 쿼리를 보면 lock을 획득과정에서 이슈가 있는 것 같다.
SELECT pg_try_advisory_xact_lock(%(id)s)
해당 에러가 발생하는 코드는 아래 코드같은데 이 self.queued_tasks 컨테이너 안에 해당 key가 있다는건 lock 로직이 제대로 동작하지 않은 것으로 보인다.
def queue_command(
self,
task_instance: TaskInstance,
command: CommandType,
priority: int = 1,
queue: Optional[str] = None,
):
"""Queues command to task"""
if task_instance.key not in self.queued_tasks:
self.log.info("Adding to queue: %s", command)
self.queued_tasks[task_instance.key] = (
command,
priority,
queue,
task_instance,
)
else:
self.log.error("could not queue task %s", task_instance.key)
Solution
다른 사람들도 해당 이슈를 겪고 있었고, github issue에도 올라와 있었다.
https://github.com/apache/airflow/issues/26394
이슈에 Fix PR링크가 없어서 정확한 확인은 하지 못했고, release에 관련 이슈가 보이지 않지만 2.4.1 에서 해결된 것으로 보인다. 이슈가 다른 이슈랑 같이 해결된 느낌이다.
Kubernetes Executor (1.22.11) / Airflow 2.4.1-python3.9 / Helm Chart 1.7.0 with dag processor enabled.
잘 동작하는지는 업데이트 후에 결과를 업데이트 하겠습니다!
반응형