안녕하세요! 오늘은 데이터 엔지니어링 세계에서 아주 중요한 도구인 Apache Airflow에 대해 알아보려고 합니다. 데이터 작업을 자동화하고 정확한 시간에, 정확한 방법으로, 정확한 순서대로 실행해야 하는 경험이 있으신가요? 🤔
여러분이 식당 주방에서 요리사라고 생각해보세요.
- 매일 같은 메뉴를 정확한 순서와 타이밍으로 준비해야 합니다
- 재료 준비, 조리, 플레이팅까지 모든 단계가 순서대로 이루어져야 합니다
- 어느 하나라도 잘못되면 전체 요리가 망가집니다
Apache Airflow는 이런 "요리 과정"을 자동화하는 훌륭한 셰프라고 할 수 있습니다. 데이터를 추출하고, 변환하고, 적재하는 과정(ETL)을 자동화하며, 문제가 생기면 알려주는 똑똑한 도우미입니다! 🍳
왜 필요한가? 🤷♂️
Apache Airflow가 해결하는 문제들은 다음과 같습니다:
- 복잡한 워크플로우 관리: 수십, 수백 개의 작업들을 순서대로 실행하고 모니터링하는 것은 수동으로 하기 어렵습니다.
- 의존성 처리: 한 작업이 끝나야 다음 작업이 시작되는 의존관계를 쉽게 정의하고 관리할 수 있습니다.
- 실패 처리와 재시도: 작업이 실패했을 때 자동으로 재시도하거나 대체 작업을 실행할 수 있습니다.
- 스케줄링: 매일, 매주, 매월 등 원하는 시간에 자동으로 작업을 실행할 수 있습니다.
- 모니터링과 알림: 작업 상태를 시각적으로 모니터링하고 문제 발생 시 알림을 받을 수 있습니다.
기본 원리 ⚙️
Apache Airflow의 핵심 원리를 알아볼까요?
DAG(Directed Acyclic Graph) 📊
DAG는 Airflow의 핵심 개념으로, 작업들과 그 의존성을 정의하는 방향성 비순환 그래프입니다. 쉽게 말해 "이 작업 다음에 저 작업" 형태로 작업 흐름을 정의합니다.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
# DAG 정의
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'my_first_dag',
default_args=default_args,
description='간단한 튜토리얼 DAG',
schedule_interval=timedelta(days=1),
)
Operators(연산자)와 Tasks(작업) 🔧
Operators는 "무엇을 할 것인가"를 정의합니다. Airflow에는 다양한 종류의 Operators가 있습니다:
# Python 함수를 실행하는 작업 정의
def print_hello():
return 'Hello World!'
hello_task = PythonOperator(
task_id='print_hello',
python_callable=print_hello,
dag=dag,
)
# Bash 명령어를 실행하는 작업 정의
from airflow.operators.bash_operator import BashOperator
bash_task = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
# 작업 간의 의존성 정의 (print_hello 다음에 print_date 실행)
hello_task >> bash_task
Airflow 구성 요소 🏗️
Airflow는 다음과 같은 주요 구성 요소로 이루어져 있습니다:
Scheduler: DAG를 분석하고 작업을 실행 큐에 넣는 역할을 합니다. 정해진 일정에 따라 작업을 시작하도록 관리합니다.
Web Server: 사용자 인터페이스(UI)를 제공하여 워크플로우를 시각적으로 모니터링하고 관리할 수 있게 합니다.
Executor: 작업을 실제로 실행하는 방법을 결정합니다. 로컬 실행, 병렬 실행, 분산 실행 등의 방식을 지원합니다.
Worker: 실제 작업을 실행하는 프로세스입니다. Executor 설정에 따라 여러 Worker가 병렬로 작업을 처리할 수 있습니다.
Metastore: 메타데이터(DAG 정보, 작업 상태, 실행 이력 등)를 저장하는 데이터베이스입니다.
실제 예제 🚀
실제 비즈니스 환경에서는 Airflow를 어떻게 활용할까요? 간단한 데이터 파이프라인 예제를 살펴보겠습니다.
기본 사용법
다음은 데이터를 추출(Extract), 변환(Transform), 적재(Load)하는 간단한 ETL 파이프라인입니다:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def extract_data():
# 데이터 추출 로직
return {"data": [1, 2, 3, 4, 5]}
def transform_data(**context):
# 추출된 데이터 가져오기
extracted_data = context['ti'].xcom_pull(task_ids='extract')
# 데이터 변환 로직
transformed_data = [x * 10 for x in extracted_data["data"]]
return {"transformed_data": transformed_data}
def load_data(**context):
# 변환된 데이터 가져오기
transformed_data = context['ti'].xcom_pull(task_ids='transform')
# 데이터 적재 로직
print(f"데이터베이스에 적재: {transformed_data}")
with DAG('etl_pipeline',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily') as dag:
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
provide_context=True,
)
load = PythonOperator(
task_id='load',
python_callable=load_data,
provide_context=True,
)
# 작업 순서 정의
extract >> transform >> load
다음은 Airflow의 주요 구성 요소를 표로 정리한 내용입니다:
구성 요소 | 역할 |
---|---|
Scheduler | DAG를 분석하고 작업을 실행 큐에 넣는 역할 |
Web Server | 사용자 인터페이스를 제공하여 워크플로우를 시각화하고 관리 |
Executor | 작업을 실제로 실행하는 방법을 결정 |
Worker | 실제 작업을 실행하는 프로세스 |
Metastore | 메타데이터를 저장하는 데이터베이스 |
주의사항 및 팁 💡
⚠️ 이것만은 주의하세요!
DAG의 ID는 고유해야 합니다
- 동일한 ID를 가진 DAG가 있으면 충돌이 발생합니다
- 명확하고 의미 있는 ID를 사용하세요
의존성 순환을 만들지 마세요
- A → B → C → A와 같은 순환 의존성은 허용되지 않습니다
- 항상 방향성이 있고 순환이 없는 구조(DAG)를 유지하세요
무거운 처리 작업은 피하세요
- Airflow 작업에서 직접 큰 데이터를 처리하지 마세요
- 대신 외부 처리 시스템(Spark, Hadoop 등)을 호출하는 방식으로 구현하세요
Top-Level 코드는 피하세요
- DAG 정의 바깥에서 데이터베이스 연결이나 무거운 처리를 하지 마세요
- 이는 DAG가 실행되지 않더라도 자원을 소모할 수 있습니다
환경 변수 대신 Airflow 변수 사용하기
- 하드코딩된 값이나 환경 변수 대신 Airflow의 Variable을 사용하세요
- 이는 UI를 통해 쉽게 관리하고 변경할 수 있습니다
💡 꿀팁
- 날짜 관련 작업에는 Airflow의 내장 매크로 변수를 활용하세요 (예:
{{ ds }}
,{{ execution_date }}
) - 작업 실패 시 알림을 설정하여 빠르게 대응하세요
- 코드 반복을 줄이기 위해 커스텀 연산자나 훅을 만들어 재사용하세요
- 실행하기 전에
airflow test
명령으로 개별 작업을 테스트하세요 - 민감한 정보는 Airflow의 Variables나 Connections 기능을 사용하여 안전하게 관리하세요
- 실무에서는 테스트 환경과 프로덕션 환경을 분리하여 운영하세요
- 복잡한 DAG는 SubDAG를 활용하여 모듈화하세요
- Airflow UI의 Graph 뷰를 활용하여 워크플로우를 시각적으로 확인하세요
마치며 🎁
지금까지 Apache Airflow에 대해 알아보았습니다. 데이터 파이프라인 자동화를 위한 강력한 도구로, Python을 통해 워크플로우를 정의하고 관리할 수 있습니다. 처음에는 학습 곡선이 있을 수 있지만, 한번 익숙해지면 복잡한 데이터 처리 작업도 효율적으로 관리할 수 있습니다.
Airflow는 많은 기업에서 데이터 파이프라인을 관리하는 데 사용되고 있으며, 특히 데이터 엔지니어링 영역에서 필수적인 도구로 자리 잡았습니다. 클라우드 환경에서는 AWS의 MWAA(Managed Workflows for Apache Airflow), GCP의 Cloud Composer 등의 관리형 서비스로도 제공되어 더욱 쉽게 도입할 수 있습니다.
혹시 궁금한 점이 있으시거나, 더 알고 싶은 내용이 있으시면 댓글로 남겨주세요!
참고 자료 🔖
- Apache Airflow 공식 문서: https://airflow.apache.org/docs/
- GitHub 저장소: https://github.com/apache/airflow
- Airflow 튜토리얼: https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html
#ApacheAirflow #워크플로우 #데이터파이프라인 #ETL #데이터엔지니어링
'300===Dev Framework > Apache' 카테고리의 다른 글
Apache Kafka - 실시간 데이터의 택배기사 📦🛵 (0) | 2025.03.29 |
---|---|
Apache Kafka - 실시간 데이터의 택배기사 📦🛵 (0) | 2025.03.29 |