Computer Engineering/Data Engineering

Airflow KubernetesPodOperator 예제 코드 및 설명

jordan.bae 2023. 7. 22. 23:01

KubernetesPodOperator는 Kuberntes cluster에서 Airflow가 실행중일 때 사용자가 원하는 docker image에서 task를 실행하는 task를 만드는 Operator입니다.

Airflow는 여러 가지 서비스들을 Orchestration할 수 있다는 강점을 가지고 있습니다. 직접 데이터를 처리하는 서비스들을 여러 개 만들수도 있는데 이 때 각각의 서비스의 의존성이 다를 수 있는데 이 때 KubernetesPodOperator 를 사용하면 독립적인 컨테이너 환경에서 서비스를 실행할 수 있습니다. 

Install

KubernetesPodOperator 를 위해 필요한 kubernetes provider 패키지 설치

pip install apache-airflow[cncf.kubernetes]

 

예제 코드

import datetime

from airflow import DAG
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)


resources = k8s.V1ResourceRequirements(
    limits={"memory": "1Gi", "cpu": "1"},
    requests={"memory": "500Mi", "cpu": "0.5"},
)
with DAG(
    "example_kubernetes_python",
    schedule_interval=None,
    start_date=datetime.datetime(2020, 2, 2),
    tags=["example"],
) as dag:

    run_python = KubernetesPodOperator(
        task_id="run_python_script",
        name="run_python_script",
        namespace="insighter-prod",
        image="python:3.10-slim",  # 이 이미지에 필요한 파이썬 스크립트와 의존성이 포함되어 있어야 합니다.
        is_delete_operator_pod=True,
        cmds=["python", "-c"],
        arguments=[
            'print("Hello, World!")'
        ],  # 여기에 파이썬 스크립트를 입력하거나 실행할 파이썬 파일의 경로를 제공하세요.
        get_logs=True,
        resources=resources,
    )

 

주요 파라미터

def __init__(
        self,
        *,
        kubernetes_conn_id: str | None = KubernetesHook.default_conn_name,
        namespace: str | None = None,
        image: str | None = None,
        name: str | None = None,
        random_name_suffix: bool = True,
        cmds: list[str] | None = None,
        arguments: list[str] | None = None,
        ports: list[k8s.V1ContainerPort] | None = None,
        volume_mounts: list[k8s.V1VolumeMount] | None = None,
        volumes: list[k8s.V1Volume] | None = None,
        env_vars: list[k8s.V1EnvVar] | None = None,
        env_from: list[k8s.V1EnvFromSource] | None = None,
        secrets: list[Secret] | None = None,
        in_cluster: bool | None = None,
        cluster_context: str | None = None,
        labels: dict | None = None,
        reattach_on_restart: bool = True,
        startup_timeout_seconds: int = 120,
        get_logs: bool = True,
        container_logs: Iterable[str] | str | Literal[True] = BASE_CONTAINER_NAME,
        image_pull_policy: str | None = None,
        annotations: dict | None = None,
        container_resources: k8s.V1ResourceRequirements | None = None,
        affinity: k8s.V1Affinity | None = None,
        config_file: str | None = None,
        node_selector: dict | None = None,
        image_pull_secrets: list[k8s.V1LocalObjectReference] | None = None,
        service_account_name: str | None = None,
        hostnetwork: bool = False,
        tolerations: list[k8s.V1Toleration] | None = None,
        security_context: dict | None = None,
        container_security_context: dict | None = None,
        dnspolicy: str | None = None,
        dns_config: k8s.V1PodDNSConfig | None = None,
        hostname: str | None = None,
        subdomain: str | None = None,
        schedulername: str | None = None,
        full_pod_spec: k8s.V1Pod | None = None,
        init_containers: list[k8s.V1Container] | None = None,
        log_events_on_failure: bool = False,
        do_xcom_push: bool = False,
        pod_template_file: str | None = None,
        priority_class_name: str | None = None,
        pod_runtime_info_envs: list[k8s.V1EnvVar] | None = None,
        termination_grace_period: int | None = None,
        configmaps: list[str] | None = None,
        skip_on_exit_code: int | Container[int] | None = None,
        base_container_name: str | None = None,
        deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
        poll_interval: float = 2,
        log_pod_spec_on_failure: bool = True,
        on_finish_action: str = "delete_pod",
        is_delete_operator_pod: None | bool = None,
        **kwargs,
    ) -> None:
  • image: 사용할 docker image. default는 dockerhub에서 가져오기 때문에 custom repositories는 fully urls을 설정
  • namespace: k8s namespace
  • cmds: container의 entry point
  • arguments**:** arguments of the entrypoint.
  • image_pull_secrets: custom repository의 이미지를 가져오는 경우 secret.
  • env_vars: 컨테이너의 환경변수
  • secrets: 컨테이너에 주일할 secrets.
  • get_logs: 컨테이너의 표준출력을 로그로 가져올 건지
  • on_finish_action: pod이 끝나거나 중단됐을 때 pod을 삭제할건지

 

Pod 의 resource 설정

따로 요청하지 않으면 BestEffort로 작동.

from airflow.kubernetes.pod import Resources


resources = k8s.V1ResourceRequirements(
    limits={"memory": "1Gi", "cpu": "1"},
    requests={"memory": "500Mi", "cpu": "0.5"},
)

참고: Kubernetes는 파드에 할당된 리소스를 관리하기 위해 Request와 Limit라는 두 가지 개념을 사용합니다.

  1. Request: 파드가 시작될 때 필요한 최소 리소스 양을 정의합니다. 이는 파드가 스케줄링될 노드를 결정하는 데 사용됩니다. 예를 들어, 노드에 남아있는 리소스가 파드의 Request보다 작으면, 그 노드에 파드는 스케줄링되지 않습니다.
  2. Limit: 파드가 사용할 수 있는 최대 리소스 양을 정의합니다. 파드가 이 한도를 초과하면, 해당 파드는 제한 또는 축출이 이루어질 수 있습니다.

Kubernetes의 QoS(Quality of Service) 클래스는 파드의 "OOM(Out of Memory) 축출 순위"를 결정하며, 파드의 Request와 Limit 설정에 따라 결정됩니다. 쿠버네티스의 QoS 클래스는 다음의 세 가지 중 하나로 분류됩니다:

  1. Guaranteed: 모든 컨테이너에 CPU와 메모리에 대한 Limit와 Request가 모두 설정되어 있고, 그 값이 같은 경우, 해당 파드는 Guaranteed QoS 클래스에 속합니다.
  2. Burstable: 적어도 하나의 컨테이너에 CPU 또는 메모리에 대한 Request 또는 Limit이 설정되어 있지만, Guaranteed 클래스의 조건을 충족하지 않는 경우, 해당 파드는 Burstable QoS 클래스에 속합니다.
  3. BestEffort: 컨테이너에 CPU와 메모리에 대한 Request나 Limit이 설정되어 있지 않은 경우, 해당 파드는 BestEffort QoS 클래스에 속합니다.

이러한 QoS 클래스 설정은 파드의 안정성을 관리하는데 중요한 역할을 합니다. 예를 들어, 리소스 부족 상황에서는 BestEffort 파드가 가장 먼저 제거되고, 그 다음으로 Burstable 파드가 제거됩니다. Guaranteed 파드는 가장 마지막에 제거됩니다. 이처럼 QoS 클래스는 파드가 리소스를 얼마나 "안정적으로" 사용할 수 있는지를 결정합니다.

 

KubernetesPodOperator에서 Xcom 사용

KubernetesPodOperator는 다른 오퍼레이터들과 달리 XCom 값을 다르게 처리합니다. KubernetesPodOperator에서 XCom 값을 전달하려면 do_xcom_push 인자를 True로 지정해야 합니다. 이렇게 하면 파드 옆에 실행되는 사이드카 컨테이너가 생성되고 /airflow/xcom/return.json의 값을 읽어 전달합니다. 그렇기 때문에 파드는 /airflow/xcom/return.json 경로에 XCom 값을 작성해야 합니다.

...위에 코드 생략

    run_python = KubernetesPodOperator(
        task_id="run_python_script",
        name="run_python_script",
        namespace="insighter-prod",
        image="python:3.10-slim",  # 이 이미지에 필요한 파이썬 스크립트와 의존성이 포함되어 있어야 합니다.
        is_delete_operator_pod=True,
        cmds=["python", "-c", "print('Hello, World!')", ">> /airflow/xcom/return.json"],
        get_logs=True,
        do_xcom_push=True,
        resources=resources,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('run_python_script') }}\"",
        task_id="pod_task_xcom_result",
    )

    run_python >> pod_task_xcom_result

 

Reference

반응형