Computer Engineering/Data Engineering

Airflow Sensor 정리 (feat. S3 Sensor)

jordan.bae 2022. 7. 12. 23:39

 

Airflow

Concept

Operator의 한 가지 type으로 wait for something to occur 이라는 한 가지 목적으로 만들어졌다.

time-based로 어떤 이벤트 또는 조건이 성사되는 것을 기다린다.

여기서 something은 아래와 같은 것들이 있다.

  • file의 생성여부
  • external event
  • 그 밖의 다양한 기다릴 수 있는 조건 or event.

기본적으로 해당 event가 일어날 때 까지 기다리다가 발생하면 다음 downstream의 task가 동작할 수 있도록 하는 역할을한다. 즉, 어떤 파일이나 이벤트 기반으로 다음 task를 수행해야 할 때 사용할 수 있다.

 

ex) loader라는 서비스에서 file을 지정된 위치에 저장한 후에 compactor라는 서비스에서 데이터를 compact해야 할 때, compact를 수행하는 task 앞에 file이 존재하는지 여부를 sensor로 확인할 수 있다. Schedule기반으로만 trigger를 했을 때는 예상보다 시간이 조금 더 걸리거나 에러가 발생해서 retry하는 경우등에 task가 실패할 수 있다.

 

Time based (시간 조건)

위에 조건이나 이벤트를 확인할 때 time based라고 했는데 Sensor는 3가지 mode를 가지고 있다. 필요에 따라서 선택해서 사용하면 된다.

  • poke (default): 전체 worker slot를 sensor task가 차지한다. 전체 execution이 sleep과 poke(check)으로 구성되게 된다. 이 mode는 sensor가 짧은 시간 동안만 동작하는 것을 보장할 수 있을 때 좋다.
  • reshedule: Sensor는 checking할 때만 worker slot을 차지하고 sleep기간에는 worker slot을 차지 하지 않음. 그렇기 때문에 sensor의 의한 deadlock현상을 피할 수 있다. sensor가 오랜 시간 기다려야 하는 경우에는 이 mode가 적절하다.
  • smart sensor: Sensor가 모든 실행을 일괄 처리하는 단일 중앙 집중식 버전. 2.4에서 deprecated될 것이라고 해서....좀 더 자세한 내용은→ (https://airflow.apache.org/docs/apache-airflow/stable/concepts/smart-sensors.html)

 

ETC (기타) 

parameter

- timeout: 총 기다리는 시간. 이 시간 동안 이벤트가 발생하지 않으면 task가 fail된다. 일정 시간을 정해 놓는게 빠르게 이슈를 발견할 수 있어서 좋다.

 

Deadlock issue

DAG의 concurrency 따라 다르지만 sensor는 어떤 결과를 기다리는 task이므로 worker slot을 많이 차지 할 수 있다. timeout과 DAG의 concurrency를 적당히 설정하던지, reschedule mode를 활용하여 실제 체크를 할 때만 slot을 차지하는 것이 좋다.

 

S3Sensor Sample

file_sensor = S3KeySensor(
     task_id='s3_key_sensor_task',
     poke_interval=60 * 30, # (seconds); checking file every half an hour
     timeout=60 * 60 * 12, # timeout in 12 hours
     bucket_key="s3://[bucket_name]/[key]",
     bucket_name=None,
     wildcard_match=False,
     dag=dag
)

print_message = PythonOperator(task_id='print_message',
     provide_context=True,
     python_callable=new_file_detection,
     dag=dag
)

file_sensor >> print_message

 

 

Reference

반응형