데이터 엔지니어 기술 블로그

[Airflow] Task 알아보기: Operator, Sensor, TaskFlow 본문

데이터 엔지니어링

[Airflow] Task 알아보기: Operator, Sensor, TaskFlow

jun_yeong_park 2021. 6. 21. 23:32
반응형

개요

Task는 에어플로우의 기본 실행 단위이다. Task에는 세가지 종류가 있다.

 

1. Operator: Bash, Python 등과 연결되어 있는 미리 정의된 작업 템플릿

2. Sensor: 외부 이벤트를 기다리는 연산자의 하위 클래스

3. TaskFlow: @task 데코레이터를 사용하여 패키지와 할 수 있는 기능(Airflow 2.0.0 신규 기능)

 

 

기본 개념

관계

작업간의 종속성을 설정하기 위해서 upstream, downstream을 사용할 수 있다.

<<, >> 연산자를 사용하여 종속성을 설정할 수 있다.

a_task >> b_task >> [c_task, d_task]

 

작업은 기본적으로 자신의 정보를 전달하지 않으며 독립적으로 실행이 되지만, XCom을 사용하여 다른 태스크로 정보를 전달할 수 있다.

 

SLA

execution_timeout은 타임아웃 시간이 지나면 작업을 종료시키는 반면 SLA(Service Level Agreement)를 사용하면 작업을 종료하지 않고 정해둔 시간이 지나면 이메일로 보내서 알려줄 수 있다.

 

Zombie/Undead Tasks

인스턴스는 죽었지만 산 것으로 표시될 수도 있는데, 에어플로우에서는 이러한 작업과 프로세스 불일치를 감지할 수 있다.

Zombie Task: 실행 중이지만 갑자기 종료된 작업. 주기적으로 설정에 따라 작업을 실패시키거나 재시도한다.

Undead Task: 실행되지 말아야할 작업이지만 실행이 된 경우 주기적으로 이를 찾아 종료시킨다. 이는 UI를 통해 인스턴스를 수동으로 편집하면 종종 발생한다.

 

 

Sensor

센서 설정

poke(기본값): worker slot을 모두 차지한다. 초 단위로 확인할 수 있다. 이 모드를 사용하면 슬롯이 하나 차지가 되기 때문에 다른 작업이 보류가 될 수 있다.

reschedule: 검사 중일때만 worker slot을 차지한다. 분 단위로 확인할 수 있다.

smart sensor: 단일 중앙 집중식 버전

 

현재 지원되는 센서의 종류

airflow.sensors.base
airflow.sensors.base_sensor_operator
airflow.sensors.bash
airflow.sensors.date_time
airflow.sensors.date_time_sensor
airflow.sensors.external_task
airflow.sensors.external_task_sensor
airflow.sensors.filesystem
airflow.sensors.hdfs_sensor
airflow.sensors.hive_partition_sensor
airflow.sensors.http_sensor
airflow.sensors.metastore_partition_sensor
airflow.sensors.named_hive_partition_sensor
airflow.sensors.python
airflow.sensors.s3_key_sensor
airflow.sensors.s3_prefix_sensor
airflow.sensors.smart_sensor
airflow.sensors.sql
airflow.sensors.sql_sensor
airflow.sensors.time_delta
airflow.sensors.time_delta_sensor
airflow.sensors.time_sensor
airflow.sensors.web_hdfs_sensor
airflow.sensors.weekday

 

직접 사용해보기

센서 중 PythonSensor를 직접 사용해보려고 한다. PythonSensor란 python callback이 True를 반환하면 센서가 동작하는 방식의 센서이다.

airflow.sensors.python.PythonSensor(*, python_callable: Callable, op_args: Optional[List] = None, op_kwargs: Optional[Dict] = None, templates_dict: Optional[Dict] = None, **kwargs)

 

■ 샘플 코드

이 코드대로 작성을 하게되면, 30초에 한 번씩 callback 함수를 호출해서 랜덤으로 나온 숫자가 1보다 크면 다음 작업(BashOperator)을진행하게 된다.

from airflow import DAG
from airflow.sensors.python import PythonSensor
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
import datetime
import random

default_args = {
    'owner': 'airflow',
    'catchup': False
}

dag = DAG('sensor_test', default_args=default_args, description='sensor test sample', start_date=days_ago(3), schedule_interval = "0 1 * * *",)


def callback():
    if random.randint(0, 3) > 1:
        return True
    else:
        return False

python_sensor_sample = PythonSensor(python_callable=callback, task_id='python_sensor_sample', poke_interval=30, dag=dag)
bash_sample = BashOperator(bash_command='echo hello',task_id='bash_sample', dag=dag)

python_sensor_sample >> bash_sample

 

Sensor

Taskflow

taskflow는 airflow 2.0.0 에서 새로 나온 기능이다. 간단한 Task들을 파이썬으로 작성할 수 있게 도와준다.

아래의 예시는 간단한 ETL을 Taskflow를 사용하여 만드는 것을 보여준다.

 

from airflow import decorators
from airflow import DAG
from airflow.utils.dates import days_ago
import json
@decorators.task()
def extract():
    data =  '{"apple": 3, "banana": 5}'
    return json.loads(data)


@decorators.task()
def transform(fruits):
    return {
        'total': sum(fruits.values())
    }


@decorators.task()
def load(result):
    print('Total: ', result['total'])


default_args = {
    'owner': 'airflow',
    'catchup': False
}


with DAG('taskflow_sample', default_args=default_args, description='taskflow sample', start_date=days_ago(3), schedule_interval = "0 1 * * *") as dag:
    load(transform(extract()))

 

 

위에서 작성한대로 DAG 아래에 Task들이 관계를 맺고 있다.

Taskflow

 

load task의 로그를 보면 아래와 같은 원하는 결과가 나오는 것을 확인할 수 있다.

[2021-06-21 11:51:46,826] {logging_mixin.py:103} INFO - Total:  8

 

 

 

 

 

반응형
Comments