Airflow DAG 멱등성 보장 실전 가이드

원 질문: “Airflow DAG 설계에서 멱등성을 보장하는 실전 방법은?”

답변

멱등성이 중요한 이유

멱등성(Idempotency)은 동일한 입력으로 DAG Run을 몇 번 반복 실행해도 항상 동일한 결과가 나오는 성질입니다(출처: dag-idempotency). 이것이 중요한 이유는:

  • 안전한 Retry: 멱등성 없이 Retry하면 데이터 중복·손실 발생(출처: dag-idempotency)
  • 장애 복구: 특정 구간만 선택적으로 재실행(backfill) 가능(출처: dag-idempotency)
  • 분산 환경 안정성: 네트워크 오류 후에도 데이터 무결성 보장(출처: astronomer-dag-best-practices)

DAP 운영에서는 상품추천 모델과 Redshift 데이터 적재 등이 Airflow DAG로 운영되므로, 멱등성 확보는 선택이 아닌 필수입니다(출처: apache-airflow).


실전 방법 5가지

1️⃣ 태스크 원자화 (Task Atomicity)

방법: ETL 파이프라인을 Extract → Transform → Load 3개의 독립 태스크로 분리합니다(출처: dag-idempotency).

효과:

  • Transform 태스크 실패 시, Extract를 다시 실행할 필요 없음
  • 부분 실패가 전체 DAG에 영향을 주지 않음
  • 각 태스크를 독립적으로 재시도 가능

Python 예시:

@task
def extract_task():
    # Extract 로직만
    return data
 
@task
def transform_task(data):
    # Transform 로직만
    return processed_data
 
@task
def load_task(processed_data):
    # Load 로직만
    return None
 
extract >> transform >> load

2️⃣ Jinja 템플릿 사용 (시점 고정)

문제: datetime.today() 같은 현재 시각을 기반으로 코드를 작성하면, 같은 DAG Run을 재실행할 때마다 다른 결과가 나옵니다(출처: dag-idempotency).

해결: Airflow 내장 매크로 {{ prev_start_date_success }} 등을 사용해 execution_date를 기준으로 고정합니다(출처: astronomer-dag-best-practices).

비교:

# ❌ 나쁜 예 — 재실행마다 다른 결과
yesterday = datetime.today() - timedelta(1)
 
# ✅ 좋은 예 — execution_date 고정
yesterday = {{ prev_start_date_success }}

효과: 시점이 고정되므로 동일 DAG Run의 재실행이 안전합니다(출처: dag-idempotency).


3️⃣ 증분 처리 (Incremental Filtering)

개념: 전체 데이터셋을 매번 재처리하지 말고, DAG 실행 구간의 레코드만 처리합니다(출처: dag-idempotency).

두 가지 패턴:

패턴사용 조건구현
last_modified_date 기반소스 레코드에 수정 시각 컬럼 존재(권장)WHERE last_modified >= {{ prev_start_date_success }}
sequence_id 기반레코드가 append-only, 수정 시각 없음WHERE id > (SELECT MAX(id) FROM last_successful_run)

효과: 특정 구간 실패 시 해당 구간만 재실행하면 됩니다(출처: astronomer-dag-best-practices).


4️⃣ Top-level 코드 회피

문제: apache-airflow Scheduler는 dags_folder를 기본 30초마다 파싱합니다. 파싱 시점에 실행되는 외부 시스템 호출(top-level 코드)은:

해결: 모든 외부 시스템 호출을 Task 내부로 이동합니다(출처: airflow-dag-design-patterns).

비교:

# ❌ 나쁜 예 — 파싱 시마다 실행
hook = PostgresHook("database_conn")
results = hook.get_records("SELECT * FROM config;")
 
# ✅ 좋은 예 — Task 내부에서만 실행
@task
def get_config_from_db():
    hook = PostgresHook("database_conn")
    return hook.get_records("SELECT * FROM config;")

5️⃣ Retry 설정 (적절한 재시도 정책)

기본 설정: retries=2가 분산 환경 대부분의 일시적 오류를 커버합니다(출처: astronomer-dag-best-practices).

우선순위(높음→낮음):

  1. Task 개별 retries 파라미터
  2. DAG default_argsretries
  3. 환경변수 AIRFLOW__CORE__DEFAULT_TASK_RETRIES(출처: dag-idempotency)

구현:

default_args = {
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
}
 
dag = DAG('my_dag', default_args=default_args)

주의: Retry는 멱등성이 확보된 태스크에서만 안전합니다. 멱등성 없이 Retry하면 데이터 중복 발생(출처: dag-idempotency).


실행 체크리스트

멱등성 확보를 위한 단계별 점검 목록:

  • 태스크 분리: ETL 파이프라인을 Extract/Transform/Load로 분리했나?
  • 시점 고정: 현재 시각(datetime.today()) 대신 Jinja 매크로 사용했나?
  • 증분 처리: 전체 데이터 대신 실행 구간 데이터만 처리하도록 구성했나?
  • Top-level 회피: DAG 파싱 시 외부 시스템 호출이 없나? (모두 Task 내부)
  • Retry 설정: default_argsretries=2 이상 설정했나?
  • 모듈화: TaskGroup으로 관련 태스크를 묶어 재사용성 확보했나?(출처: airflow-dag-design-patterns)