300===Dev Framework/Apache

Apache Airflow - 워크플로우 자동화의 마법사 🧙‍♂️

블로글러 2025. 3. 24. 21:46

안녕하세요! 오늘은 데이터 엔지니어링 세계에서 아주 중요한 도구인 Apache Airflow에 대해 알아보려고 합니다. 데이터 작업을 자동화하고 정확한 시간에, 정확한 방법으로, 정확한 순서대로 실행해야 하는 경험이 있으신가요? 🤔

여러분이 식당 주방에서 요리사라고 생각해보세요.

  • 매일 같은 메뉴를 정확한 순서와 타이밍으로 준비해야 합니다
  • 재료 준비, 조리, 플레이팅까지 모든 단계가 순서대로 이루어져야 합니다
  • 어느 하나라도 잘못되면 전체 요리가 망가집니다

Apache Airflow는 이런 "요리 과정"을 자동화하는 훌륭한 셰프라고 할 수 있습니다. 데이터를 추출하고, 변환하고, 적재하는 과정(ETL)을 자동화하며, 문제가 생기면 알려주는 똑똑한 도우미입니다! 🍳

왜 필요한가? 🤷‍♂️

Apache Airflow가 해결하는 문제들은 다음과 같습니다:

  1. 복잡한 워크플로우 관리: 수십, 수백 개의 작업들을 순서대로 실행하고 모니터링하는 것은 수동으로 하기 어렵습니다.
  2. 의존성 처리: 한 작업이 끝나야 다음 작업이 시작되는 의존관계를 쉽게 정의하고 관리할 수 있습니다.
  3. 실패 처리와 재시도: 작업이 실패했을 때 자동으로 재시도하거나 대체 작업을 실행할 수 있습니다.
  4. 스케줄링: 매일, 매주, 매월 등 원하는 시간에 자동으로 작업을 실행할 수 있습니다.
  5. 모니터링과 알림: 작업 상태를 시각적으로 모니터링하고 문제 발생 시 알림을 받을 수 있습니다.

기본 원리 ⚙️

Apache Airflow의 핵심 원리를 알아볼까요?

DAG(Directed Acyclic Graph) 📊

DAG는 Airflow의 핵심 개념으로, 작업들과 그 의존성을 정의하는 방향성 비순환 그래프입니다. 쉽게 말해 "이 작업 다음에 저 작업" 형태로 작업 흐름을 정의합니다.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# DAG 정의
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'my_first_dag',
    default_args=default_args,
    description='간단한 튜토리얼 DAG',
    schedule_interval=timedelta(days=1),
)

Operators(연산자)와 Tasks(작업) 🔧

Operators는 "무엇을 할 것인가"를 정의합니다. Airflow에는 다양한 종류의 Operators가 있습니다:

# Python 함수를 실행하는 작업 정의
def print_hello():
    return 'Hello World!'

hello_task = PythonOperator(
    task_id='print_hello',
    python_callable=print_hello,
    dag=dag,
)

# Bash 명령어를 실행하는 작업 정의
from airflow.operators.bash_operator import BashOperator

bash_task = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

# 작업 간의 의존성 정의 (print_hello 다음에 print_date 실행)
hello_task >> bash_task

Airflow 구성 요소 🏗️

Airflow는 다음과 같은 주요 구성 요소로 이루어져 있습니다:

  1. Scheduler: DAG를 분석하고 작업을 실행 큐에 넣는 역할을 합니다. 정해진 일정에 따라 작업을 시작하도록 관리합니다.

  2. Web Server: 사용자 인터페이스(UI)를 제공하여 워크플로우를 시각적으로 모니터링하고 관리할 수 있게 합니다.

  3. Executor: 작업을 실제로 실행하는 방법을 결정합니다. 로컬 실행, 병렬 실행, 분산 실행 등의 방식을 지원합니다.

  4. Worker: 실제 작업을 실행하는 프로세스입니다. Executor 설정에 따라 여러 Worker가 병렬로 작업을 처리할 수 있습니다.

  5. Metastore: 메타데이터(DAG 정보, 작업 상태, 실행 이력 등)를 저장하는 데이터베이스입니다.

실제 예제 🚀

실제 비즈니스 환경에서는 Airflow를 어떻게 활용할까요? 간단한 데이터 파이프라인 예제를 살펴보겠습니다.

기본 사용법

다음은 데이터를 추출(Extract), 변환(Transform), 적재(Load)하는 간단한 ETL 파이프라인입니다:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def extract_data():
    # 데이터 추출 로직
    return {"data": [1, 2, 3, 4, 5]}

def transform_data(**context):
    # 추출된 데이터 가져오기
    extracted_data = context['ti'].xcom_pull(task_ids='extract')
    # 데이터 변환 로직
    transformed_data = [x * 10 for x in extracted_data["data"]]
    return {"transformed_data": transformed_data}

def load_data(**context):
    # 변환된 데이터 가져오기
    transformed_data = context['ti'].xcom_pull(task_ids='transform')
    # 데이터 적재 로직
    print(f"데이터베이스에 적재: {transformed_data}")

with DAG('etl_pipeline',
         start_date=datetime(2023, 1, 1),
         schedule_interval='@daily') as dag:

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
        provide_context=True,
    )

    load = PythonOperator(
        task_id='load',
        python_callable=load_data,
        provide_context=True,
    )

    # 작업 순서 정의
    extract >> transform >> load

다음은 Airflow의 주요 구성 요소를 표로 정리한 내용입니다:

구성 요소 역할
Scheduler DAG를 분석하고 작업을 실행 큐에 넣는 역할
Web Server 사용자 인터페이스를 제공하여 워크플로우를 시각화하고 관리
Executor 작업을 실제로 실행하는 방법을 결정
Worker 실제 작업을 실행하는 프로세스
Metastore 메타데이터를 저장하는 데이터베이스

주의사항 및 팁 💡

⚠️ 이것만은 주의하세요!

  1. DAG의 ID는 고유해야 합니다

    • 동일한 ID를 가진 DAG가 있으면 충돌이 발생합니다
    • 명확하고 의미 있는 ID를 사용하세요
  2. 의존성 순환을 만들지 마세요

    • A → B → C → A와 같은 순환 의존성은 허용되지 않습니다
    • 항상 방향성이 있고 순환이 없는 구조(DAG)를 유지하세요
  3. 무거운 처리 작업은 피하세요

    • Airflow 작업에서 직접 큰 데이터를 처리하지 마세요
    • 대신 외부 처리 시스템(Spark, Hadoop 등)을 호출하는 방식으로 구현하세요
  4. Top-Level 코드는 피하세요

    • DAG 정의 바깥에서 데이터베이스 연결이나 무거운 처리를 하지 마세요
    • 이는 DAG가 실행되지 않더라도 자원을 소모할 수 있습니다
  5. 환경 변수 대신 Airflow 변수 사용하기

    • 하드코딩된 값이나 환경 변수 대신 Airflow의 Variable을 사용하세요
    • 이는 UI를 통해 쉽게 관리하고 변경할 수 있습니다

💡 꿀팁

  • 날짜 관련 작업에는 Airflow의 내장 매크로 변수를 활용하세요 (예: {{ ds }}, {{ execution_date }})
  • 작업 실패 시 알림을 설정하여 빠르게 대응하세요
  • 코드 반복을 줄이기 위해 커스텀 연산자나 훅을 만들어 재사용하세요
  • 실행하기 전에 airflow test 명령으로 개별 작업을 테스트하세요
  • 민감한 정보는 Airflow의 Variables나 Connections 기능을 사용하여 안전하게 관리하세요
  • 실무에서는 테스트 환경과 프로덕션 환경을 분리하여 운영하세요
  • 복잡한 DAG는 SubDAG를 활용하여 모듈화하세요
  • Airflow UI의 Graph 뷰를 활용하여 워크플로우를 시각적으로 확인하세요

마치며 🎁

지금까지 Apache Airflow에 대해 알아보았습니다. 데이터 파이프라인 자동화를 위한 강력한 도구로, Python을 통해 워크플로우를 정의하고 관리할 수 있습니다. 처음에는 학습 곡선이 있을 수 있지만, 한번 익숙해지면 복잡한 데이터 처리 작업도 효율적으로 관리할 수 있습니다.

Airflow는 많은 기업에서 데이터 파이프라인을 관리하는 데 사용되고 있으며, 특히 데이터 엔지니어링 영역에서 필수적인 도구로 자리 잡았습니다. 클라우드 환경에서는 AWS의 MWAA(Managed Workflows for Apache Airflow), GCP의 Cloud Composer 등의 관리형 서비스로도 제공되어 더욱 쉽게 도입할 수 있습니다.

혹시 궁금한 점이 있으시거나, 더 알고 싶은 내용이 있으시면 댓글로 남겨주세요!

참고 자료 🔖


#ApacheAirflow #워크플로우 #데이터파이프라인 #ETL #데이터엔지니어링

728x90