Concept
Operator의 한 가지 type으로 wait for something to occur 이라는 한 가지 목적으로 만들어졌다.
time-based로 어떤 이벤트 또는 조건이 성사되는 것을 기다린다.
여기서 something은 아래와 같은 것들이 있다.
- file의 생성여부
- external event
- 그 밖의 다양한 기다릴 수 있는 조건 or event.
기본적으로 해당 event가 일어날 때 까지 기다리다가 발생하면 다음 downstream의 task가 동작할 수 있도록 하는 역할을한다. 즉, 어떤 파일이나 이벤트 기반으로 다음 task를 수행해야 할 때 사용할 수 있다.
ex) loader라는 서비스에서 file을 지정된 위치에 저장한 후에 compactor라는 서비스에서 데이터를 compact해야 할 때, compact를 수행하는 task 앞에 file이 존재하는지 여부를 sensor로 확인할 수 있다. Schedule기반으로만 trigger를 했을 때는 예상보다 시간이 조금 더 걸리거나 에러가 발생해서 retry하는 경우등에 task가 실패할 수 있다.
Time based (시간 조건)
위에 조건이나 이벤트를 확인할 때 time based라고 했는데 Sensor는 3가지 mode를 가지고 있다. 필요에 따라서 선택해서 사용하면 된다.
- poke (default): 전체 worker slot를 sensor task가 차지한다. 전체 execution이 sleep과 poke(check)으로 구성되게 된다. 이 mode는 sensor가 짧은 시간 동안만 동작하는 것을 보장할 수 있을 때 좋다.
- reshedule: Sensor는 checking할 때만 worker slot을 차지하고 sleep기간에는 worker slot을 차지 하지 않음. 그렇기 때문에 sensor의 의한 deadlock현상을 피할 수 있다. sensor가 오랜 시간 기다려야 하는 경우에는 이 mode가 적절하다.
- smart sensor: Sensor가 모든 실행을 일괄 처리하는 단일 중앙 집중식 버전. 2.4에서 deprecated될 것이라고 해서....좀 더 자세한 내용은→ (https://airflow.apache.org/docs/apache-airflow/stable/concepts/smart-sensors.html)
ETC (기타)
parameter
- timeout: 총 기다리는 시간. 이 시간 동안 이벤트가 발생하지 않으면 task가 fail된다. 일정 시간을 정해 놓는게 빠르게 이슈를 발견할 수 있어서 좋다.
Deadlock issue
DAG의 concurrency 따라 다르지만 sensor는 어떤 결과를 기다리는 task이므로 worker slot을 많이 차지 할 수 있다. timeout과 DAG의 concurrency를 적당히 설정하던지, reschedule mode를 활용하여 실제 체크를 할 때만 slot을 차지하는 것이 좋다.
S3Sensor Sample
file_sensor = S3KeySensor(
task_id='s3_key_sensor_task',
poke_interval=60 * 30, # (seconds); checking file every half an hour
timeout=60 * 60 * 12, # timeout in 12 hours
bucket_key="s3://[bucket_name]/[key]",
bucket_name=None,
wildcard_match=False,
dag=dag
)
print_message = PythonOperator(task_id='print_message',
provide_context=True,
python_callable=new_file_detection,
dag=dag
)
file_sensor >> print_message
Reference
- airflow 2.2.4 version 기준
- https://airflow.apache.org/docs/apache-airflow/stable/concepts/sensors.html
'Computer Engineering > Data Engineering' 카테고리의 다른 글
Airflow Taskflow로 DAG refactoring하기 (0) | 2022.11.14 |
---|---|
Pandas NaN이란 그리고 None 차이 (0) | 2022.11.13 |
Airflow k8s 로컬 개발환경 셋팅 (2) | 2022.07.08 |
IntelliJ로 Spark 개발 환경 구축하기 (0) | 2022.07.01 |
Pandas Dataframe Type Casting 하기. (Feat. BigQuery) (0) | 2021.12.12 |