데이터 엔지니어 기술 블로그

[Airflow] 에어플로우 작업 실패시 Slack으로 메세지 보내는 방법 본문

데이터 엔지니어링

[Airflow] 에어플로우 작업 실패시 Slack으로 메세지 보내는 방법

jun_yeong_park 2021. 6. 14. 17:35
반응형

개요

에어플로우에서 작업이 실패했을 때 알 수 있는 방법은 Airflow 인터페이스 웹에 들어가서 확인을 하거나 email을 연동하거나 slack을 연동하는 등의 방법이 있다.

현재는 연동이 되어있지 않아서 작업에 실패하면 직접 들어가서 확인을 해야했는데, 이러면 알람이 늦어져서 조치가 어려워진다. 그래서 지금은 현재 사용하는 Slack과 연동을 해보려고 한다.

 

-

이 문서에서 사용하는 에어플로우 버전은 2.0.0이다.

 

Slack으로 메세지 보내는 방법

Airflow Operator 중에 slack으로 메세지를 보내주는 오퍼레이터가 있다. 자세한 내용은 이 링크를 참조하면 된다.

 

1. Airflow가 설치되어 있는 서버에서 apache-airflow-providers-slack 을 설치한다. 

 

 

python3 -m pip install apache-airflow-providers-slack

만약 설치하지 않으면 No module named 오류가 발생한다.

No module named error

 

2. 사용하는 많은 DAG에 적용을 하려면 따로 분리시켜두는 것이 좋다. alert.py 라는 파일을 생성하고 아래의 코드를 붙여넣는다.

from airflow.operators.slack_operator import SlackAPIPostOperator
from dateutil.relativedelta import relativedelta


def on_failure(context):
	
    # 슬랙 채널명을 적는다.
    channel = 'your_channel_name'
    # 슬랙 토큰명을 적는다.
    token = 'your_slack_token'
    
    
    task_instance = context.get('task_instance')
    task_id = task_instance.task_id
    dag_id = task_instance.dag_id
    
    # 서버가 UTC 시간 기준일 경우 9시간을 더해준다.
    execution_date = (context.get('execution_date') + relativedelta(hours=9)).strftime('%Y-%m-%d %H:%M:%S')
    next_execution_date = (context.get('next_execution_date') + relativedelta(hours=9)).strftime('%Y-%m-%d %H:%M:%S')
    
    # 여기에 슬랙으로 보낼 메세지를 적는다.
    text = f'''
*[:exclamation: AIRFLOW ERROR REPORT]*
■ DAG: _{dag_id}_
■ Task: _{task_id}_
■ Execution Date (KST): _{execution_date}_
■ Next Execution Date (KST): _{next_execution_date}_'''

    # Slack Operator를 사용하여 메세지를 보내도록 한다.
    alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel=channel,
        token=token,
        text=text)
    return alert.execute(context=context)

 

3. 파일명은 send_slack.py 으로 항상 실패하는 DAG를 하나 만들고, 위에서 만든 함수를 가져와서 on_failure_callback에 넣고, 실패하게 되면 만들었던 함수를 실행하도록 한다.

from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from alert import on_failure
from airflow.operators.bash import BashOperator


# on_failure_callback에 on_failure 함수를 넣는다.
default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'on_failure_callback': on_failure
}

dag = DAG(
    'send_slack',
    default_args=default_args,
    start_date = days_ago(1),
    schedule_interval='0 1 * * *',
)

# 항상 실패하도록 한다.
BashOperator(
	task_id="test",
	bash_command='exit(1)',
	dag=dag
)

 

4. Airflow 인터페이스 웹에서 만들어진 DAG를 실행해본다.

send_slack

5. Slack에서 확인해보면 메세지가 잘 오는 것을 볼 수 있다.

on failure slack alarm

6. 끝 😄

 

반응형
Comments