반응형
Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- delta lake
- Schema Registry
- 카프카
- 델타레이크
- 에어플로우
- Zookeeper
- Data Warehouse
- docker
- 스파크 스트리밍
- Redshift
- kafka
- 데이터 엔지니어링
- kafka rest api
- airflow
- Parquet
- spark
- 대용량 처리
- 스파크
- 데이터 엔지니어
- spark streaming
- 카프카 구축
- 데이터 웨어하우스
- Data Engineer
- AWS
- 데이터
- Data engineering
- s3
- 컬럼 기반
- MySQL
- 레드시프트
Archives
- Today
- Total
데이터 엔지니어 기술 블로그
[Airflow] 에어플로우 작업 실패시 Slack으로 메세지 보내는 방법 본문
반응형
개요
에어플로우에서 작업이 실패했을 때 알 수 있는 방법은 Airflow 인터페이스 웹에 들어가서 확인을 하거나 email을 연동하거나 slack을 연동하는 등의 방법이 있다.
현재는 연동이 되어있지 않아서 작업에 실패하면 직접 들어가서 확인을 해야했는데, 이러면 알람이 늦어져서 조치가 어려워진다. 그래서 지금은 현재 사용하는 Slack과 연동을 해보려고 한다.
-
이 문서에서 사용하는 에어플로우 버전은 2.0.0이다.
Slack으로 메세지 보내는 방법
Airflow Operator 중에 slack으로 메세지를 보내주는 오퍼레이터가 있다. 자세한 내용은 이 링크를 참조하면 된다.
1. Airflow가 설치되어 있는 서버에서 apache-airflow-providers-slack 을 설치한다.
python3 -m pip install apache-airflow-providers-slack
만약 설치하지 않으면 No module named 오류가 발생한다.
2. 사용하는 많은 DAG에 적용을 하려면 따로 분리시켜두는 것이 좋다. alert.py 라는 파일을 생성하고 아래의 코드를 붙여넣는다.
from airflow.operators.slack_operator import SlackAPIPostOperator
from dateutil.relativedelta import relativedelta
def on_failure(context):
# 슬랙 채널명을 적는다.
channel = 'your_channel_name'
# 슬랙 토큰명을 적는다.
token = 'your_slack_token'
task_instance = context.get('task_instance')
task_id = task_instance.task_id
dag_id = task_instance.dag_id
# 서버가 UTC 시간 기준일 경우 9시간을 더해준다.
execution_date = (context.get('execution_date') + relativedelta(hours=9)).strftime('%Y-%m-%d %H:%M:%S')
next_execution_date = (context.get('next_execution_date') + relativedelta(hours=9)).strftime('%Y-%m-%d %H:%M:%S')
# 여기에 슬랙으로 보낼 메세지를 적는다.
text = f'''
*[:exclamation: AIRFLOW ERROR REPORT]*
■ DAG: _{dag_id}_
■ Task: _{task_id}_
■ Execution Date (KST): _{execution_date}_
■ Next Execution Date (KST): _{next_execution_date}_'''
# Slack Operator를 사용하여 메세지를 보내도록 한다.
alert = SlackAPIPostOperator(
task_id='slack_failed',
channel=channel,
token=token,
text=text)
return alert.execute(context=context)
3. 파일명은 send_slack.py 으로 항상 실패하는 DAG를 하나 만들고, 위에서 만든 함수를 가져와서 on_failure_callback에 넣고, 실패하게 되면 만들었던 함수를 실행하도록 한다.
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from alert import on_failure
from airflow.operators.bash import BashOperator
# on_failure_callback에 on_failure 함수를 넣는다.
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'on_failure_callback': on_failure
}
dag = DAG(
'send_slack',
default_args=default_args,
start_date = days_ago(1),
schedule_interval='0 1 * * *',
)
# 항상 실패하도록 한다.
BashOperator(
task_id="test",
bash_command='exit(1)',
dag=dag
)
4. Airflow 인터페이스 웹에서 만들어진 DAG를 실행해본다.
5. Slack에서 확인해보면 메세지가 잘 오는 것을 볼 수 있다.
6. 끝 😄
반응형
'데이터 엔지니어링' 카테고리의 다른 글
[Airflow] 실행 날짜(execution date) 쉽게 이해하기 (0) | 2021.06.21 |
---|---|
[Airflow] 에어플로우 시작하기: 개념 및 설치 (7) | 2021.06.16 |
[MYSQL] Docker MYSQL 로그에서 "mbind: Operation not permitted" 이슈 해결방법 (0) | 2021.06.07 |
[데이터 엔지니어링] RDS의 테이블을 Athena 에서 사용할 때의 문제 (0) | 2021.06.04 |
[간단 데이터 엔지니어링] 프레스토(Presto)란? (0) | 2021.06.02 |
Comments