Airflow DAG 멱등성 보장 실전 가이드
원 질문: “Airflow DAG 설계에서 멱등성을 보장하는 실전 방법은?”
답변
멱등성이 중요한 이유
멱등성(Idempotency)은 동일한 입력으로 DAG Run을 몇 번 반복 실행해도 항상 동일한 결과가 나오는 성질입니다(출처: dag-idempotency). 이것이 중요한 이유는:
- 안전한 Retry: 멱등성 없이 Retry하면 데이터 중복·손실 발생(출처: dag-idempotency)
- 장애 복구: 특정 구간만 선택적으로 재실행(backfill) 가능(출처: dag-idempotency)
- 분산 환경 안정성: 네트워크 오류 후에도 데이터 무결성 보장(출처: astronomer-dag-best-practices)
DAP 운영에서는 상품추천 모델과 Redshift 데이터 적재 등이 Airflow DAG로 운영되므로, 멱등성 확보는 선택이 아닌 필수입니다(출처: apache-airflow).
실전 방법 5가지
1️⃣ 태스크 원자화 (Task Atomicity)
방법: ETL 파이프라인을 Extract → Transform → Load 3개의 독립 태스크로 분리합니다(출처: dag-idempotency).
효과:
- Transform 태스크 실패 시, Extract를 다시 실행할 필요 없음
- 부분 실패가 전체 DAG에 영향을 주지 않음
- 각 태스크를 독립적으로 재시도 가능
Python 예시:
@task
def extract_task():
# Extract 로직만
return data
@task
def transform_task(data):
# Transform 로직만
return processed_data
@task
def load_task(processed_data):
# Load 로직만
return None
extract >> transform >> load2️⃣ Jinja 템플릿 사용 (시점 고정)
문제: datetime.today() 같은 현재 시각을 기반으로 코드를 작성하면, 같은 DAG Run을 재실행할 때마다 다른 결과가 나옵니다(출처: dag-idempotency).
해결: Airflow 내장 매크로 {{ prev_start_date_success }} 등을 사용해 execution_date를 기준으로 고정합니다(출처: astronomer-dag-best-practices).
비교:
# ❌ 나쁜 예 — 재실행마다 다른 결과
yesterday = datetime.today() - timedelta(1)
# ✅ 좋은 예 — execution_date 고정
yesterday = {{ prev_start_date_success }}효과: 시점이 고정되므로 동일 DAG Run의 재실행이 안전합니다(출처: dag-idempotency).
3️⃣ 증분 처리 (Incremental Filtering)
개념: 전체 데이터셋을 매번 재처리하지 말고, DAG 실행 구간의 레코드만 처리합니다(출처: dag-idempotency).
두 가지 패턴:
| 패턴 | 사용 조건 | 구현 |
|---|---|---|
| last_modified_date 기반 | 소스 레코드에 수정 시각 컬럼 존재(권장) | WHERE last_modified >= {{ prev_start_date_success }} |
| sequence_id 기반 | 레코드가 append-only, 수정 시각 없음 | WHERE id > (SELECT MAX(id) FROM last_successful_run) |
효과: 특정 구간 실패 시 해당 구간만 재실행하면 됩니다(출처: astronomer-dag-best-practices).
4️⃣ Top-level 코드 회피
문제: apache-airflow Scheduler는 dags_folder를 기본 30초마다 파싱합니다. 파싱 시점에 실행되는 외부 시스템 호출(top-level 코드)은:
- Scheduler 부하 증가
- 예측 불가능한 동작
- 멱등성 파괴(출처: airflow-dag-design-patterns)
해결: 모든 외부 시스템 호출을 Task 내부로 이동합니다(출처: airflow-dag-design-patterns).
비교:
# ❌ 나쁜 예 — 파싱 시마다 실행
hook = PostgresHook("database_conn")
results = hook.get_records("SELECT * FROM config;")
# ✅ 좋은 예 — Task 내부에서만 실행
@task
def get_config_from_db():
hook = PostgresHook("database_conn")
return hook.get_records("SELECT * FROM config;")5️⃣ Retry 설정 (적절한 재시도 정책)
기본 설정: retries=2가 분산 환경 대부분의 일시적 오류를 커버합니다(출처: astronomer-dag-best-practices).
우선순위(높음→낮음):
- Task 개별
retries파라미터 - DAG
default_args의retries - 환경변수
AIRFLOW__CORE__DEFAULT_TASK_RETRIES(출처: dag-idempotency)
구현:
default_args = {
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
}
dag = DAG('my_dag', default_args=default_args)주의: Retry는 멱등성이 확보된 태스크에서만 안전합니다. 멱등성 없이 Retry하면 데이터 중복 발생(출처: dag-idempotency).
실행 체크리스트
멱등성 확보를 위한 단계별 점검 목록:
- 태스크 분리: ETL 파이프라인을 Extract/Transform/Load로 분리했나?
- 시점 고정: 현재 시각(
datetime.today()) 대신 Jinja 매크로 사용했나? - 증분 처리: 전체 데이터 대신 실행 구간 데이터만 처리하도록 구성했나?
- Top-level 회피: DAG 파싱 시 외부 시스템 호출이 없나? (모두 Task 내부)
- Retry 설정:
default_args에retries=2이상 설정했나? - 모듈화: TaskGroup으로 관련 태스크를 묶어 재사용성 확보했나?(출처: airflow-dag-design-patterns)
Related Pages
- dag-idempotency — 멱등성 개념과 기본 방법론
- airflow-dag-design-patterns — DAG 설계 패턴 전체 (모듈화·파라미터화·보안 등)
- astronomer-dag-best-practices — 공식 문서 기반 상세 가이드
- top-10-airflow-best-practices-data-engineers — 엔지니어 관점 실무 경험
- apache-airflow — Airflow 플랫폼 개요 (DAP 운영 맥락)