데이터 엔지니어 기술 블로그

[Airflow] 에어플로우 시작하기: 개념 및 설치 본문

데이터 엔지니어링

[Airflow] 에어플로우 시작하기: 개념 및 설치

jun_yeong_park 2021. 6. 16. 00:35
반응형

개요

Airflow는 복잡한 워크플로우를 프로그래밍 방식으로 작성해서, 스케줄링하고 모니터링할 수 있는 플랫폼이다.

 

데이터 파이프라인을 이루고 있는 ETL 스크립트들을 스케줄링 할 때 crontab, cloudwatch 등을 사용하는 곳이 많다. 그러나 스크립트들이 많아지고 서로에 대한 의존성이 생기게 되면 컨트롤하기 어렵고, 기존 작업이 실패했을 때 다시 스크립트를 실행하려면 로그를 확인하고 실행해야 하는 등의 문제점이 생긴다. 이러면 바로 복구할수도 없고, 어디서 잘못되었는지 확인하기도 어렵고, 의존성이 있는 스크립트가 잘못되었는데 그 다음 스크립트가 실행이 되버리는 등의 문제점이 발생할 수 있다.

 

Airflow에는 서로에 대한 의존성을 표현할 수 있고, 스크립트가 실패했을 때 알람을 보내 확인하고 쉽게 수정 후 재시도할 수 있고, 이전 날짜 작업이 실패했을 때 그 날짜만 다시 실행하는 등 위의 문제점을 많이 해결해준다.

 

개념

DAG(Directed Acyclic Graph)

DAG유향 비순환 그래프라고 하며 에어플로우의 워크플로우는 python을 사용하여 작성할 수 있다. 

하나의 DAG 안에는 한 개 이상의 Task가 있으며, Task는 실제 실행시키는 작업이다.

DAG 스크립트의 예시는 다음과 같다.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from utils.alert import on_failure, on_success

default_args = {
        'owner': 'airflow',
        'catchup': False,
        'execution_timeout': timedelta(hours=6),
        'depends_on_past': False,
        'on_failure_callback': on_failure,
        'on_success_callback': on_success,
    }

dag = DAG(
    'sample_dag',
    default_args = default_args,
    description = "sample description",
    schedule_interval = "0 16 * * *",
    start_date = days_ago(2),
    tags = ['daily'],
    max_active_runs=3,
    concurrency=1
)

sample_task = BashOperator(
    task_id="sample_task",
    bash_command='python3 sample_task.py',
    dag=dag)

 

출처: https://airflow.apache.org/docs/apache-airflow/stable/index.html#apache-airflow-documentation

 

 

Operator

에어플로우는 Operator를 사용하여 python, bash, aws, slack 등 다양한 동작을 실행시킬 수 있다. 

예를 들어서 아래처럼 BashOperator를 사용하면 bash 스크립트를 실행할 수 있다.

sample_task = BashOperator(
    task_id="sample_task",
    bash_command='python3 sample_task.py',
    dag=dag)

 

다른 예시로 SlackOperator를 사용하면 slack에 메세지를 보낼 수 있다.

alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel=channel,
        token=token,
        text=text)
return alert.execute(context=context)

 

Executor

작업 실행을 시켜주는 실행기이다. SequentialExecutor, CeleryExecutor, LocalExecutor 등 실행 방법에 따라 종류를 선택할 수 있다.

 

Airflow 간단하게 설치하기

Airflow는 기본값으로 sqlite를 사용하며 이 경우 Executor는 SequentialExecutor를 사용하게 된다. 하지만 이것을 사용하게 되면 동시에 여러 작업을 실행시킬 수 없기 때문에 mysql을 간단하게 설치해서 사용하려고 한다.

 

MySQL 설치 및 설정하기

1. my.cnf파일을 생성하고 아래의 내용을 붙여넣는다. 

# my.cnf
[mysqld]
pid-file        = /var/run/mysqld/mysqld.pid
socket          = /var/run/mysqld/mysqld.sock
datadir         = /var/lib/mysql
secure-file-priv= NULL

# Custom config should go here
!includedir /etc/mysql/conf.d/
explicit_defaults_for_timestamp = 1
default_authentication_plugin=mysql_native_password
  • caching_sha2_password 에러가 발생할 경우 default_authentication_plugin=mysql_native_password 옵션을 사용해주어야 한다. 
  • explicit_defaults_for_timestamp 에러가 발생할 경우 explicit_defaults_for_timestamp=1 옵션을 사용해주어야 한다.
    • raise Exception("Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql")

 

2. Dockerfile을 생성 후 빌드한다.

FROM mysql
COPY my.cnf /etc/mysql/my.cnf
docker build -t pipeline-db .

 

3. 빌드한 도커를 실행한 후 도커에 접속한다.

docker stop pipeline-db && docker rm pipeline-db
docker run -d --name pipeline-db -e MYSQL_ROOT_PASSWORD=your_password -p 3306:3306 -v ~/mysql:/var/lib/mysql pipeline-db
docker logs pipeline-db
docker exec -it pipeline-db bash
mysql -u root -p

 

4. 아래의 코드를 입력하여 airflow에 필요한 데이터베이스, 유저 등을 생성하고 설정한다.

CREATE DATABASE airflow CHARACTER SET UTF8mb3 COLLATE utf8_general_ci;
CREATE USER 'airflow'@'localhost' IDENTIFIED BY 'your_password';
GRANT ALL PRIVILEGES ON airflow.* TO 'airflow' @localhost;
CREATE USER 'airflow'@'%' IDENTIFIED BY 'your_password';
GRANT ALL PRIVILEGES ON airflow.* To 'airflow'@'%';
flush privileges;
  • 최대 문자열 길이 오류가 아래와 같이 발생할 수도 있다.
    • sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (1071, 'Specified key was too long; max key length is 3072 bytes') [SQL: ALTER TABLE xcom ADD CONSTRAINT pk_xcom PRIMARY KEY (dag_id, task_id, `key`, execution_date)]
    • 데이터베이스를 생성할 때 아래의 명령어로 생성하면 된다.
      • CREATE DATABASE airflow CHARACTER SET UTF8mb3 COLLATE utf8_general_ci;

 

 

Airflow 설치하기

이 글을 쓰는 당시에는 pip 버전이 20.2.4 여야만 문제없이 설치가 되었다. 만약 문제가 발생할 경우 pip 20.2.4 로 다운그레이드한다.

 

1. pip로 airflow를 설치한다.

python3 -m pip install apache-airflow

2. airflow 계정을 생성한다.

airflow users create --username admin --firstname airflow --lastname your_lastname --role Admin --email airflow@sample.com -p your_password

 

3. $AIRFLOW_HOME 경로(기본값 ~/airflow) 에서 airflow.cfg 파일을 열고 아래와 같이 수정한다.

# airflow 예제 제거하기
load_example = True

# DAG 업데이트 시간 조정하기 (UI), 이 시간을 0으로 둘 경우 아주 많은 CPU를 사용하게 된다.
min_file_process_interval = 60
dag_dir_list_interval = 30

# 데이터베이스 연결하기
sql_alchemy_conn = mysql://airflow:your_password@127.0.0.1:3306/airflow?charset=utf8

# Executor 설정하기
# SequentialExecutor(Default)를 사용하면 한 번에 하나의 작업만 처리할 수 있다.
executor = LocalExecutor
parallelism = 64
dag_concurrency = 32

 

4. airflow webserver 실행하기

screen -dmS airflow-webserver airflow webserver --port 7171

 

웹서버에 문제가 생길 경우 종료해야하는데 이 때 pid는 $AIRFLOW_HOME/airflow-webserver.pid  경로에서 찾을 수 있다.



5. airflow scheduler 실행하기(screen을 사용한 방법)

ssh 연결이 끊기더라도 계속 실행되게 하기 위해 screen을 사용하는 방법이다.

screen -dmS airflow-scheduler airflow scheduler

스크린 사용 방법은 다음과 같다.

 

# 로그 보는 방법
screen -r 세션 이름
ctrl + a, d 로 Detach

# 종료하는 방법
screen -r 세션 이름
Ctrl + c 로 종료

ps -ef | grep airflow 명령어로 동작중인 프로세스를 확인할 수 있다.



6. {your_server_ip}:7171 로 접속하면 Airflow 웹서버를 확인할 수 있으며 toggle을 파란색(Unfuse)로 두면 스케줄 동작을 시작할 수 있다.

dag unfuse

7. 작업 중지하는 방법은 DAG의 Task UI에서 Make Failed 버튼을 누르면 되고, 작업을 재시작하는 방법은 Clear 버튼을 클릭하면 된다.

airflow clear

 

8. 한 번에 실행되는 횟수가 16개인 이유는 Max Active Runs가 16이어서 그런데, DAG를 생성할 때나 airflow.cfg 파일을 수정해주면 된다.

 

9. 인터페이스는 다음과 같이 이루어져 있다.

DAGs
DAG Details
Tasks

Airflow 간단한 DAG 생성하기

$AIRFLOW_HOME/dags 경로에 py파일로 스크립트를 생성하면 airflow가 아까 설정했던 min_file_process_interval 시간에 맞춰 파일을 확인하고 업데이트한다.

 

1. 위의 경로에 sample.py파일을 생성한다.

2. 아래의 스크립트를 붙여넣는다.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
        'owner': 'airflow',
        'catchup': False,
        'execution_timeout': timedelta(hours=6),
        'depends_on_past': False,
    }

dag = DAG(
    'sample',
    default_args = default_args,
    description = "sample description",
    schedule_interval = "@daily",
    start_date = days_ago(3),
    tags = ['daily'],
    max_active_runs=3,
    concurrency=1
)

sample_a = BashOperator(
    task_id='sample_a',
    bash_command='echo hello',
    dag=dag)

sample_b = BashOperator(
    task_id='sample_b',
    bash_command='echo hello',
    dag=dag)
    
sample_a << sample_b

3. 저장하고 기다린 후 airflow web을 확인해보면 새로운 DAG가 생성된 것을 볼 수 있다.

4. toggleunfuse 상태로 만들면 실행이 잘 되는 것을 확인할 수 있다.

 

반응형
Comments