Airflow DAG 설계 패턴

apache-airflow DAG를 효율적·안전하게 작성하기 위한 설계 패턴 모음. 멱등성 확보, 모듈화, 파라미터화, 보안 자격증명 관리, 리소스 최적화가 핵심 축이다.

설명

Airflow DAG는 Python 코드이므로 언어 자유도가 높지만, 잘못 작성하면 Scheduler 부하, 데이터 중복, 보안 사고로 이어진다. 아래 패턴들은 DAP 운영 환경에서 실전 적용 가능한 설계 기준이다.

Top-level 코드 회피

Airflow Scheduler는 dags_folder의 모든 파일을 기본 30초마다 파싱한다. 파싱 시점에 실행되는 top-level 코드(task 외부의 외부 시스템 호출)는 이 주기마다 반복 실행되어 DB 부하와 예측 불가 동작을 유발한다.

# 나쁜 예 — 파싱 시마다 DB 쿼리 실행
hook = PostgresHook("database_conn")
results = hook.get_records("SELECT * FROM grocery_list;")
 
# 좋은 예 — Task 내부로 이동
@task
def get_list_of_results():
    hook = PostgresHook("database_conn")
    return hook.get_records("SELECT * FROM grocery_list;")

모듈화: TaskGroup 활용

TaskGroup으로 관련 태스크를 논리적으로 묶으면 DAG 구조가 명확해지고 재사용이 쉬워진다.

with TaskGroup('extract_group') as extract_group:
    extract_task = PythonOperator(task_id='extract', python_callable=extract)
with TaskGroup('transform_group') as transform_group:
    transform_task = PythonOperator(task_id='transform', python_callable=transform)
extract_group >> transform_group >> load_task

파라미터화: Variable.get()

환경별 설정(dev/staging/prod)을 코드에 하드코딩하지 않고 Airflow Variables에서 가져온다.

from airflow.models import Variable
db_connection = Variable.get("db_connection")

YAML/JSON 외부 파일로 설정을 관리하면 코드 변경 없이 환경 전환이 가능하다.

오류 처리와 알림

default_args = {
    'email': ['alerts@example.com'],
    'email_on_failure': True,
    'retries': 1,
}

Slack·PagerDuty를 on_failure_callback에 연결하면 장애 발생 즉시 담당자에게 알린다. DAP 운영에서는 Slack 알림이 표준이다.

보안 자격증명 관리

코드나 설정 파일에 크레덴셜을 직접 기입하지 않는다.

from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection('my_conn_id')

프로덕션에서는 HashiCorp Vault 또는 AWS Secrets Manager를 Airflow secrets backend로 연결한다.

Executor 선택과 리소스 튜닝

Executor적합 상황
LocalExecutor소규모, 단일 서버
CeleryExecutor중규모, 수평 확장 필요
KubernetesExecutor태스크별 리소스 격리, 클라우드 환경

주요 설정:

[core]
executor = CeleryExecutor
parallelism = 32
 
[celery]
worker_concurrency = 16

max_active_tasks_per_dag를 DAG 단위로 제한하면 특정 DAG가 전체 워커를 독점하는 것을 방지한다.

태스크 의존성 표기 일관성

한 가지 방법을 프로젝트 전체에서 일관 사용한다.

# 권장: >> 연산자로 통일
task_1 >> task_2 >> [task_3, task_4]

관련 개념

소스