일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- kafka
- Zookeeper
- 델타레이크
- MySQL
- 데이터
- s3
- 대용량 처리
- kafka rest api
- Parquet
- delta lake
- Data Engineer
- spark streaming
- 컬럼 기반
- AWS
- 카프카 구축
- 데이터 엔지니어링
- Schema Registry
- 스파크 스트리밍
- 스파크
- Data Warehouse
- 에어플로우
- airflow
- 데이터 웨어하우스
- spark
- Redshift
- docker
- 카프카
- 레드시프트
- Data engineering
- 데이터 엔지니어
- Today
- Total
데이터 엔지니어 기술 블로그
[Airflow] Task 알아보기: Operator, Sensor, TaskFlow 본문
개요
Task는 에어플로우의 기본 실행 단위이다. Task에는 세가지 종류가 있다.
1. Operator: Bash, Python 등과 연결되어 있는 미리 정의된 작업 템플릿
2. Sensor: 외부 이벤트를 기다리는 연산자의 하위 클래스
3. TaskFlow: @task 데코레이터를 사용하여 패키지와 할 수 있는 기능(Airflow 2.0.0 신규 기능)
기본 개념
관계
작업간의 종속성을 설정하기 위해서 upstream, downstream을 사용할 수 있다.
<<, >> 연산자를 사용하여 종속성을 설정할 수 있다.
a_task >> b_task >> [c_task, d_task]
작업은 기본적으로 자신의 정보를 전달하지 않으며 독립적으로 실행이 되지만, XCom을 사용하여 다른 태스크로 정보를 전달할 수 있다.
SLA
execution_timeout은 타임아웃 시간이 지나면 작업을 종료시키는 반면 SLA(Service Level Agreement)를 사용하면 작업을 종료하지 않고 정해둔 시간이 지나면 이메일로 보내서 알려줄 수 있다.
Zombie/Undead Tasks
인스턴스는 죽었지만 산 것으로 표시될 수도 있는데, 에어플로우에서는 이러한 작업과 프로세스 불일치를 감지할 수 있다.
Zombie Task: 실행 중이지만 갑자기 종료된 작업. 주기적으로 설정에 따라 작업을 실패시키거나 재시도한다.
Undead Task: 실행되지 말아야할 작업이지만 실행이 된 경우 주기적으로 이를 찾아 종료시킨다. 이는 UI를 통해 인스턴스를 수동으로 편집하면 종종 발생한다.
Sensor
센서 설정
poke(기본값): worker slot을 모두 차지한다. 초 단위로 확인할 수 있다. 이 모드를 사용하면 슬롯이 하나 차지가 되기 때문에 다른 작업이 보류가 될 수 있다.
reschedule: 검사 중일때만 worker slot을 차지한다. 분 단위로 확인할 수 있다.
smart sensor: 단일 중앙 집중식 버전
현재 지원되는 센서의 종류
airflow.sensors.base
airflow.sensors.base_sensor_operator
airflow.sensors.bash
airflow.sensors.date_time
airflow.sensors.date_time_sensor
airflow.sensors.external_task
airflow.sensors.external_task_sensor
airflow.sensors.filesystem
airflow.sensors.hdfs_sensor
airflow.sensors.hive_partition_sensor
airflow.sensors.http_sensor
airflow.sensors.metastore_partition_sensor
airflow.sensors.named_hive_partition_sensor
airflow.sensors.python
airflow.sensors.s3_key_sensor
airflow.sensors.s3_prefix_sensor
airflow.sensors.smart_sensor
airflow.sensors.sql
airflow.sensors.sql_sensor
airflow.sensors.time_delta
airflow.sensors.time_delta_sensor
airflow.sensors.time_sensor
airflow.sensors.web_hdfs_sensor
airflow.sensors.weekday
직접 사용해보기
센서 중 PythonSensor를 직접 사용해보려고 한다. PythonSensor란 python callback이 True를 반환하면 센서가 동작하는 방식의 센서이다.
airflow.sensors.python.PythonSensor(*, python_callable: Callable, op_args: Optional[List] = None, op_kwargs: Optional[Dict] = None, templates_dict: Optional[Dict] = None, **kwargs)
■ 샘플 코드
이 코드대로 작성을 하게되면, 30초에 한 번씩 callback 함수를 호출해서 랜덤으로 나온 숫자가 1보다 크면 다음 작업(BashOperator)을진행하게 된다.
from airflow import DAG
from airflow.sensors.python import PythonSensor
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
import datetime
import random
default_args = {
'owner': 'airflow',
'catchup': False
}
dag = DAG('sensor_test', default_args=default_args, description='sensor test sample', start_date=days_ago(3), schedule_interval = "0 1 * * *",)
def callback():
if random.randint(0, 3) > 1:
return True
else:
return False
python_sensor_sample = PythonSensor(python_callable=callback, task_id='python_sensor_sample', poke_interval=30, dag=dag)
bash_sample = BashOperator(bash_command='echo hello',task_id='bash_sample', dag=dag)
python_sensor_sample >> bash_sample
Taskflow
taskflow는 airflow 2.0.0 에서 새로 나온 기능이다. 간단한 Task들을 파이썬으로 작성할 수 있게 도와준다.
아래의 예시는 간단한 ETL을 Taskflow를 사용하여 만드는 것을 보여준다.
from airflow import decorators
from airflow import DAG
from airflow.utils.dates import days_ago
import json
@decorators.task()
def extract():
data = '{"apple": 3, "banana": 5}'
return json.loads(data)
@decorators.task()
def transform(fruits):
return {
'total': sum(fruits.values())
}
@decorators.task()
def load(result):
print('Total: ', result['total'])
default_args = {
'owner': 'airflow',
'catchup': False
}
with DAG('taskflow_sample', default_args=default_args, description='taskflow sample', start_date=days_ago(3), schedule_interval = "0 1 * * *") as dag:
load(transform(extract()))
위에서 작성한대로 DAG 아래에 Task들이 관계를 맺고 있다.
load task의 로그를 보면 아래와 같은 원하는 결과가 나오는 것을 확인할 수 있다.
[2021-06-21 11:51:46,826] {logging_mixin.py:103} INFO - Total: 8
'데이터 엔지니어링' 카테고리의 다른 글
[압축 방식 비교] gzip vs snappy vs lz4 vs brotli vs zstd vs lzo (0) | 2021.07.10 |
---|---|
[🧙Kafka] S3 Sink Connector: 카프카 S3와 연동하기 (0) | 2021.07.09 |
[Airflow] 실행 날짜(execution date) 쉽게 이해하기 (0) | 2021.06.21 |
[Airflow] 에어플로우 시작하기: 개념 및 설치 (7) | 2021.06.16 |
[Airflow] 에어플로우 작업 실패시 Slack으로 메세지 보내는 방법 (0) | 2021.06.14 |