Airflow와 DataStage 통합 운영 가이드

원 질문: “Airflow와 DataStage를 함께 운영할 때의 모범 사례는?”

답변

아키텍처: 역할 분담

Airflow와 DataStage는 DAP 데이터 파이프라인에서 명확하게 다른 역할을 담당합니다(출처: apache-airflow, ibm-datastage):

계층담당 도구책임 영역
오케스트레이션apache-airflowDAG 기반 워크플로우 스케줄링·의존성 관리·장애 복구
ETL 처리ibm-datastage병렬 데이터 추출·변환·적재, 고성능 파이프라인 실행
데이터 품질ibm-qualitystage표준화·중복 제거·주소 검증(선택 라이선스)

DAP 파이프라인 흐름:

Airflow (스케줄링) → DataStage Job (ETL) → Redshift (적재)
  [매일 09:00]       [병렬 처리, 고속]     [쿼리 대기]

Airflow는 DataStage 잡을 외부 프로세스로 실행(BashOperator 또는 DataStageOperator)하고, DataStage는 자신의 병렬 엔진에서 독립적으로 동작합니다(출처: top-10-airflow-best-practices-data-engineers).


모범 사례 6가지

1️⃣ DataStage 잡의 멱등성 설계

dag-idempotency의 원칙이 DataStage에도 적용됩니다. 재실행 안전성을 위해:

  • 증분 처리: 전체 데이터 대신 실행 구간의 레코드만 처리. 소스 테이블의 last_modified_date 또는 sequence ID를 기준으로 필터링(출처: dag-idempotency)
  • SCD(Slowly Changing Dimension) 활용: 차원 테이블 갱신을 자동화. SCD 1형(단순 덮어쓰기)과 SCD 2형(이력 보존) 중 선택(출처: datastage-parallel-job-architecture)
  • 데이터 검증: Transformer 스테이지에서 데이터 유효성을 사전 확인(출처: datastage-developing-parallel-jobs)

Airflow 관점: Retry 설정 시 DataStage 잡이 멱등성을 갖추면 안전하게 재실행됩니다(출처: dag-idempotency).

-- 좋은 예: 증분 처리
SELECT *
FROM source_table
WHERE last_modified >= {{ prev_execution_date }}
  AND last_modified < {{ execution_date }}

2️⃣ DataStage 잡 설계 체크리스트 (장애 예방)

DataStage 잡 실패의 50% 이상은 설계 오류에서 발생합니다(출처: datastage-job-design-troubleshooting). 배포 전 다음을 반드시 확인:

항목점검 내용위험도
고아 스테이지Designer에서 Zoom Out → 입출력 링크 없는 스테이지 없나?🔴 Critical
컬럼명 규칙알파벳·숫자·밑줄만 사용? 특수문자(., -) 없나?🔴 Critical
단일 문자 컬럼명컬럼명이 t/T 단독으로 구성되어 있지 않나? (IIS 8.5+ 버그)🔴 Critical
버전 업그레이드 후Connector Migration Tool 사용했다면 고아 스테이지 재확인🟡 Suggested
컴파일 오류Flow Designer 11.7에서 모든 오류가 한번에 하이라이트되는가?🟡 Suggested

실행 순서: (1) 고아 스테이지 → (2) 컬럼명 규칙 → (3) 특수 케이스 순으로 점검(출처: datastage-troubleshooting-job-design-issues).

3️⃣ DataStage 잡의 병렬 성능 최적화

datastage-parallel-job-architecture의 핵심은 파이프라인 병렬성입니다. 최적화:

  • Transformer 스테이지 변수 타입 정확화: 예상 출력 타입과 일치하면 불필요한 타입 변환 방지(출처: datastage-developing-parallel-jobs)
  • Join vs Lookup 선택:
  • 병렬 엔진 설정 파일: processing·storage·sorting 자원을 구성. 잡 재컴파일 없이 실행 환경 변경 가능(출처: datastage-developing-parallel-jobs)

4️⃣ Airflow DAG에서 DataStage 잡 실행

airflow-dag-design-patterns의 원칙을 DataStage 연동에 적용:

# ✅ 좋은 예: 태스크 원자화 + Retry
@task(retries=2, retry_delay=timedelta(minutes=5))
def run_datastage_etl():
    """DataStage 병렬 잡 실행"""
    import subprocess
    result = subprocess.run([
        'dsx',
        'run',
        '-file', '/path/to/datastage_job.dsx',
        '-server', 'DATASTAGE_SERVER',
        '-user', Variable.get('ds_user'),
        '-password', Variable.get('ds_password')
    ], check=True)
    return result.returncode == 0
 
# ❌ 나쁜 예: 직접 호출, Retry 없음
os.system('dsx run -file /path/to/job.dsx')

핵심:

from airflow.models import TaskGroup
from airflow.operators.bash import BashOperator
 
with TaskGroup('datastage_etl') as tg_etl:
    task_extract = BashOperator(
        task_id='extract',
        bash_command='dsx run -file extract_job.dsx'
    )
    task_load = BashOperator(
        task_id='load',
        bash_command='dsx run -file load_job.dsx'
    )
    task_extract >> task_load

5️⃣ 공유 변수 / 파라미터 관리

Airflow와 DataStage 간 정보 공유:

  • Airflow → DataStage: Variable.get() 값을 환경변수로 전달(출처: airflow-dag-design-patterns)

    ds_env = os.environ.copy()
    ds_env['EXECUTION_DATE'] = '{{ execution_date }}'
    ds_env['BATCH_ID'] = Variable.get('batch_id')
    subprocess.run(['dsx', 'run', ...], env=ds_env)
  • DataStage → Airflow: DataStage 잡 완료 후 상태 파일 또는 로그 반환

    # DataStage 잡의 마지막 단계에서
    echo "ROWS_PROCESSED=$(wc -l /output/result.txt)" > /tmp/ds_result.txt
  • Airflow: 상태 파일 읽어 XCom으로 저장

    @task
    def check_datastage_result():
        with open('/tmp/ds_result.txt', 'r') as f:
            result = f.read()
        return result

6️⃣ 장애 진단 및 모니터링

DataStage 실패 원인 식별:

  1. 설계 오류 (35%): 고아 스테이지, 컬럼명 규칙 위반(출처: datastage-job-design-troubleshooting)
  2. 데이터 오류 (35%): 예상치 못한 null, 타입 불일치
  3. 리소스 부족 (20%): 메모리 초과, 병렬 엔진 설정 불충분
  4. 네트워크/DB 오류 (10%): 임시 연결 실패

모니터링 설정:

  • Airflow UI에서 DataStage 태스크 실행 로그 확인
  • DataStage Director에서 Job Run History 검토
  • Slack 알림으로 실패 즉시 감지(출처: airflow-dag-design-patterns)

DAP 운영 시나리오

일일 데이터 파이프라인 예시:

매일 09:00
  ↓
[Airflow DAG 시작]
  ├─ [TaskGroup: DataStage ETL]
  │  ├─ Extract Job (GoldenGate S3 → Local)
  │  ├─ Transform Job (복잡한 데이터 변환)
  │  └─ Load Job (Redshift 적재)
  │
  ├─ [QualityStage 데이터 품질 검증] (선택)
  │
  └─ [Slack 알림] (성공/실패)

Retry 흐름:

  • DataStage 잡 일시적 실패 → Airflow가 2회 재실행
  • 멱등성 확보 → 동일 execution_date의 데이터만 재처리
  • 기존 데이터 중복 없음 ✅