일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- Redshift
- Data engineering
- Data Engineer
- spark
- Parquet
- s3
- Schema Registry
- 델타레이크
- Zookeeper
- 카프카 구축
- 데이터
- 스파크 스트리밍
- 데이터 웨어하우스
- airflow
- 데이터 엔지니어링
- 대용량 처리
- 스파크
- MySQL
- kafka rest api
- 에어플로우
- spark streaming
- docker
- 레드시프트
- delta lake
- kafka
- AWS
- 컬럼 기반
- 데이터 엔지니어
- 카프카
- Data Warehouse
- Today
- Total
데이터 엔지니어 기술 블로그
[Airflow] 에어플로우 시작하기: 개념 및 설치 본문
개요
Airflow는 복잡한 워크플로우를 프로그래밍 방식으로 작성해서, 스케줄링하고 모니터링할 수 있는 플랫폼이다.
데이터 파이프라인을 이루고 있는 ETL 스크립트들을 스케줄링 할 때 crontab, cloudwatch 등을 사용하는 곳이 많다. 그러나 스크립트들이 많아지고 서로에 대한 의존성이 생기게 되면 컨트롤하기 어렵고, 기존 작업이 실패했을 때 다시 스크립트를 실행하려면 로그를 확인하고 실행해야 하는 등의 문제점이 생긴다. 이러면 바로 복구할수도 없고, 어디서 잘못되었는지 확인하기도 어렵고, 의존성이 있는 스크립트가 잘못되었는데 그 다음 스크립트가 실행이 되버리는 등의 문제점이 발생할 수 있다.
Airflow에는 서로에 대한 의존성을 표현할 수 있고, 스크립트가 실패했을 때 알람을 보내 확인하고 쉽게 수정 후 재시도할 수 있고, 이전 날짜 작업이 실패했을 때 그 날짜만 다시 실행하는 등 위의 문제점을 많이 해결해준다.
개념
DAG(Directed Acyclic Graph)
DAG는 유향 비순환 그래프라고 하며 에어플로우의 워크플로우는 python을 사용하여 작성할 수 있다.
하나의 DAG 안에는 한 개 이상의 Task가 있으며, Task는 실제 실행시키는 작업이다.
DAG 스크립트의 예시는 다음과 같다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from utils.alert import on_failure, on_success
default_args = {
'owner': 'airflow',
'catchup': False,
'execution_timeout': timedelta(hours=6),
'depends_on_past': False,
'on_failure_callback': on_failure,
'on_success_callback': on_success,
}
dag = DAG(
'sample_dag',
default_args = default_args,
description = "sample description",
schedule_interval = "0 16 * * *",
start_date = days_ago(2),
tags = ['daily'],
max_active_runs=3,
concurrency=1
)
sample_task = BashOperator(
task_id="sample_task",
bash_command='python3 sample_task.py',
dag=dag)
Operator
에어플로우는 Operator를 사용하여 python, bash, aws, slack 등 다양한 동작을 실행시킬 수 있다.
예를 들어서 아래처럼 BashOperator를 사용하면 bash 스크립트를 실행할 수 있다.
sample_task = BashOperator(
task_id="sample_task",
bash_command='python3 sample_task.py',
dag=dag)
다른 예시로 SlackOperator를 사용하면 slack에 메세지를 보낼 수 있다.
alert = SlackAPIPostOperator(
task_id='slack_failed',
channel=channel,
token=token,
text=text)
return alert.execute(context=context)
Executor
작업 실행을 시켜주는 실행기이다. SequentialExecutor, CeleryExecutor, LocalExecutor 등 실행 방법에 따라 종류를 선택할 수 있다.
Airflow 간단하게 설치하기
Airflow는 기본값으로 sqlite를 사용하며 이 경우 Executor는 SequentialExecutor를 사용하게 된다. 하지만 이것을 사용하게 되면 동시에 여러 작업을 실행시킬 수 없기 때문에 mysql을 간단하게 설치해서 사용하려고 한다.
MySQL 설치 및 설정하기
1. my.cnf파일을 생성하고 아래의 내용을 붙여넣는다.
# my.cnf
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
secure-file-priv= NULL
# Custom config should go here
!includedir /etc/mysql/conf.d/
explicit_defaults_for_timestamp = 1
default_authentication_plugin=mysql_native_password
- caching_sha2_password 에러가 발생할 경우 default_authentication_plugin=mysql_native_password 옵션을 사용해주어야 한다.
- explicit_defaults_for_timestamp 에러가 발생할 경우 explicit_defaults_for_timestamp=1 옵션을 사용해주어야 한다.
-
raise Exception("Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql")
-
2. Dockerfile을 생성 후 빌드한다.
FROM mysql
COPY my.cnf /etc/mysql/my.cnf
docker build -t pipeline-db .
3. 빌드한 도커를 실행한 후 도커에 접속한다.
docker stop pipeline-db && docker rm pipeline-db
docker run -d --name pipeline-db -e MYSQL_ROOT_PASSWORD=your_password -p 3306:3306 -v ~/mysql:/var/lib/mysql pipeline-db
docker logs pipeline-db
docker exec -it pipeline-db bash
mysql -u root -p
4. 아래의 코드를 입력하여 airflow에 필요한 데이터베이스, 유저 등을 생성하고 설정한다.
CREATE DATABASE airflow CHARACTER SET UTF8mb3 COLLATE utf8_general_ci;
CREATE USER 'airflow'@'localhost' IDENTIFIED BY 'your_password';
GRANT ALL PRIVILEGES ON airflow.* TO 'airflow' @localhost;
CREATE USER 'airflow'@'%' IDENTIFIED BY 'your_password';
GRANT ALL PRIVILEGES ON airflow.* To 'airflow'@'%';
flush privileges;
- 최대 문자열 길이 오류가 아래와 같이 발생할 수도 있다.
- sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (1071, 'Specified key was too long; max key length is 3072 bytes') [SQL: ALTER TABLE xcom ADD CONSTRAINT pk_xcom PRIMARY KEY (dag_id, task_id, `key`, execution_date)]
- 데이터베이스를 생성할 때 아래의 명령어로 생성하면 된다.
-
CREATE DATABASE airflow CHARACTER SET UTF8mb3 COLLATE utf8_general_ci;
-
Airflow 설치하기
이 글을 쓰는 당시에는 pip 버전이 20.2.4 여야만 문제없이 설치가 되었다. 만약 문제가 발생할 경우 pip 20.2.4 로 다운그레이드한다.
1. pip로 airflow를 설치한다.
python3 -m pip install apache-airflow
2. airflow 계정을 생성한다.
airflow users create --username admin --firstname airflow --lastname your_lastname --role Admin --email airflow@sample.com -p your_password
3. $AIRFLOW_HOME 경로(기본값 ~/airflow) 에서 airflow.cfg 파일을 열고 아래와 같이 수정한다.
# airflow 예제 제거하기
load_example = True
# DAG 업데이트 시간 조정하기 (UI), 이 시간을 0으로 둘 경우 아주 많은 CPU를 사용하게 된다.
min_file_process_interval = 60
dag_dir_list_interval = 30
# 데이터베이스 연결하기
sql_alchemy_conn = mysql://airflow:your_password@127.0.0.1:3306/airflow?charset=utf8
# Executor 설정하기
# SequentialExecutor(Default)를 사용하면 한 번에 하나의 작업만 처리할 수 있다.
executor = LocalExecutor
parallelism = 64
dag_concurrency = 32
4. airflow webserver 실행하기
screen -dmS airflow-webserver airflow webserver --port 7171
웹서버에 문제가 생길 경우 종료해야하는데 이 때 pid는 $AIRFLOW_HOME/airflow-webserver.pid 경로에서 찾을 수 있다.
5. airflow scheduler 실행하기(screen을 사용한 방법)
ssh 연결이 끊기더라도 계속 실행되게 하기 위해 screen을 사용하는 방법이다.
screen -dmS airflow-scheduler airflow scheduler
스크린 사용 방법은 다음과 같다.
# 로그 보는 방법
screen -r 세션 이름
ctrl + a, d 로 Detach
# 종료하는 방법
screen -r 세션 이름
Ctrl + c 로 종료
ps -ef | grep airflow 명령어로 동작중인 프로세스를 확인할 수 있다.
6. {your_server_ip}:7171 로 접속하면 Airflow 웹서버를 확인할 수 있으며 toggle을 파란색(Unfuse)로 두면 스케줄 동작을 시작할 수 있다.
7. 작업 중지하는 방법은 DAG의 Task UI에서 Make Failed 버튼을 누르면 되고, 작업을 재시작하는 방법은 Clear 버튼을 클릭하면 된다.
8. 한 번에 실행되는 횟수가 16개인 이유는 Max Active Runs가 16이어서 그런데, DAG를 생성할 때나 airflow.cfg 파일을 수정해주면 된다.
9. 인터페이스는 다음과 같이 이루어져 있다.
Airflow 간단한 DAG 생성하기
$AIRFLOW_HOME/dags 경로에 py파일로 스크립트를 생성하면 airflow가 아까 설정했던 min_file_process_interval 시간에 맞춰 파일을 확인하고 업데이트한다.
1. 위의 경로에 sample.py파일을 생성한다.
2. 아래의 스크립트를 붙여넣는다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'airflow',
'catchup': False,
'execution_timeout': timedelta(hours=6),
'depends_on_past': False,
}
dag = DAG(
'sample',
default_args = default_args,
description = "sample description",
schedule_interval = "@daily",
start_date = days_ago(3),
tags = ['daily'],
max_active_runs=3,
concurrency=1
)
sample_a = BashOperator(
task_id='sample_a',
bash_command='echo hello',
dag=dag)
sample_b = BashOperator(
task_id='sample_b',
bash_command='echo hello',
dag=dag)
sample_a << sample_b
3. 저장하고 기다린 후 airflow web을 확인해보면 새로운 DAG가 생성된 것을 볼 수 있다.
4. toggle로 unfuse 상태로 만들면 실행이 잘 되는 것을 확인할 수 있다.
'데이터 엔지니어링' 카테고리의 다른 글
[Airflow] Task 알아보기: Operator, Sensor, TaskFlow (0) | 2021.06.21 |
---|---|
[Airflow] 실행 날짜(execution date) 쉽게 이해하기 (0) | 2021.06.21 |
[Airflow] 에어플로우 작업 실패시 Slack으로 메세지 보내는 방법 (0) | 2021.06.14 |
[MYSQL] Docker MYSQL 로그에서 "mbind: Operation not permitted" 이슈 해결방법 (0) | 2021.06.07 |
[데이터 엔지니어링] RDS의 테이블을 Athena 에서 사용할 때의 문제 (0) | 2021.06.04 |