Airflow와 DataStage 통합 운영 가이드
원 질문: “Airflow와 DataStage를 함께 운영할 때의 모범 사례는?”
답변
아키텍처: 역할 분담
Airflow와 DataStage는 DAP 데이터 파이프라인에서 명확하게 다른 역할을 담당합니다(출처: apache-airflow, ibm-datastage):
| 계층 | 담당 도구 | 책임 영역 |
|---|---|---|
| 오케스트레이션 | apache-airflow | DAG 기반 워크플로우 스케줄링·의존성 관리·장애 복구 |
| ETL 처리 | ibm-datastage | 병렬 데이터 추출·변환·적재, 고성능 파이프라인 실행 |
| 데이터 품질 | ibm-qualitystage | 표준화·중복 제거·주소 검증(선택 라이선스) |
DAP 파이프라인 흐름:
Airflow (스케줄링) → DataStage Job (ETL) → Redshift (적재)
[매일 09:00] [병렬 처리, 고속] [쿼리 대기]
Airflow는 DataStage 잡을 외부 프로세스로 실행(BashOperator 또는 DataStageOperator)하고, DataStage는 자신의 병렬 엔진에서 독립적으로 동작합니다(출처: top-10-airflow-best-practices-data-engineers).
모범 사례 6가지
1️⃣ DataStage 잡의 멱등성 설계
dag-idempotency의 원칙이 DataStage에도 적용됩니다. 재실행 안전성을 위해:
- 증분 처리: 전체 데이터 대신 실행 구간의 레코드만 처리. 소스 테이블의
last_modified_date또는 sequence ID를 기준으로 필터링(출처: dag-idempotency) - SCD(Slowly Changing Dimension) 활용: 차원 테이블 갱신을 자동화. SCD 1형(단순 덮어쓰기)과 SCD 2형(이력 보존) 중 선택(출처: datastage-parallel-job-architecture)
- 데이터 검증: Transformer 스테이지에서 데이터 유효성을 사전 확인(출처: datastage-developing-parallel-jobs)
Airflow 관점: Retry 설정 시 DataStage 잡이 멱등성을 갖추면 안전하게 재실행됩니다(출처: dag-idempotency).
-- 좋은 예: 증분 처리
SELECT *
FROM source_table
WHERE last_modified >= {{ prev_execution_date }}
AND last_modified < {{ execution_date }}2️⃣ DataStage 잡 설계 체크리스트 (장애 예방)
DataStage 잡 실패의 50% 이상은 설계 오류에서 발생합니다(출처: datastage-job-design-troubleshooting). 배포 전 다음을 반드시 확인:
| 항목 | 점검 내용 | 위험도 |
|---|---|---|
| 고아 스테이지 | Designer에서 Zoom Out → 입출력 링크 없는 스테이지 없나? | 🔴 Critical |
| 컬럼명 규칙 | 알파벳·숫자·밑줄만 사용? 특수문자(., -) 없나? | 🔴 Critical |
| 단일 문자 컬럼명 | 컬럼명이 t/T 단독으로 구성되어 있지 않나? (IIS 8.5+ 버그) | 🔴 Critical |
| 버전 업그레이드 후 | Connector Migration Tool 사용했다면 고아 스테이지 재확인 | 🟡 Suggested |
| 컴파일 오류 | Flow Designer 11.7에서 모든 오류가 한번에 하이라이트되는가? | 🟡 Suggested |
실행 순서: (1) 고아 스테이지 → (2) 컬럼명 규칙 → (3) 특수 케이스 순으로 점검(출처: datastage-troubleshooting-job-design-issues).
3️⃣ DataStage 잡의 병렬 성능 최적화
datastage-parallel-job-architecture의 핵심은 파이프라인 병렬성입니다. 최적화:
- Transformer 스테이지 변수 타입 정확화: 예상 출력 타입과 일치하면 불필요한 타입 변환 방지(출처: datastage-developing-parallel-jobs)
- Join vs Lookup 선택:
- Join: 정렬된 대용량 입력 결합 (병렬 스트림)
- Lookup: 참조 데이터 메모리 로드 후 조회(출처: datastage-parallel-job-architecture)
- 병렬 엔진 설정 파일: processing·storage·sorting 자원을 구성. 잡 재컴파일 없이 실행 환경 변경 가능(출처: datastage-developing-parallel-jobs)
4️⃣ Airflow DAG에서 DataStage 잡 실행
airflow-dag-design-patterns의 원칙을 DataStage 연동에 적용:
# ✅ 좋은 예: 태스크 원자화 + Retry
@task(retries=2, retry_delay=timedelta(minutes=5))
def run_datastage_etl():
"""DataStage 병렬 잡 실행"""
import subprocess
result = subprocess.run([
'dsx',
'run',
'-file', '/path/to/datastage_job.dsx',
'-server', 'DATASTAGE_SERVER',
'-user', Variable.get('ds_user'),
'-password', Variable.get('ds_password')
], check=True)
return result.returncode == 0
# ❌ 나쁜 예: 직접 호출, Retry 없음
os.system('dsx run -file /path/to/job.dsx')핵심:
- Tag 그룹화: TaskGroup으로 DataStage 관련 태스크를 묶기(출처: airflow-dag-design-patterns)
- Retry 설정: DataStage 잡이 멱등성을 갖추면
retries=2안전(출처: dag-idempotency) - Slack 알림: on_failure_callback으로 DataStage 실패를 즉시 감지(출처: airflow-dag-design-patterns)
from airflow.models import TaskGroup
from airflow.operators.bash import BashOperator
with TaskGroup('datastage_etl') as tg_etl:
task_extract = BashOperator(
task_id='extract',
bash_command='dsx run -file extract_job.dsx'
)
task_load = BashOperator(
task_id='load',
bash_command='dsx run -file load_job.dsx'
)
task_extract >> task_load5️⃣ 공유 변수 / 파라미터 관리
Airflow와 DataStage 간 정보 공유:
-
Airflow → DataStage:
Variable.get()값을 환경변수로 전달(출처: airflow-dag-design-patterns)ds_env = os.environ.copy() ds_env['EXECUTION_DATE'] = '{{ execution_date }}' ds_env['BATCH_ID'] = Variable.get('batch_id') subprocess.run(['dsx', 'run', ...], env=ds_env) -
DataStage → Airflow: DataStage 잡 완료 후 상태 파일 또는 로그 반환
# DataStage 잡의 마지막 단계에서 echo "ROWS_PROCESSED=$(wc -l /output/result.txt)" > /tmp/ds_result.txt -
Airflow: 상태 파일 읽어 XCom으로 저장
@task def check_datastage_result(): with open('/tmp/ds_result.txt', 'r') as f: result = f.read() return result
6️⃣ 장애 진단 및 모니터링
DataStage 실패 원인 식별:
- 설계 오류 (35%): 고아 스테이지, 컬럼명 규칙 위반(출처: datastage-job-design-troubleshooting)
- 데이터 오류 (35%): 예상치 못한 null, 타입 불일치
- 리소스 부족 (20%): 메모리 초과, 병렬 엔진 설정 불충분
- 네트워크/DB 오류 (10%): 임시 연결 실패
모니터링 설정:
- Airflow UI에서 DataStage 태스크 실행 로그 확인
- DataStage Director에서 Job Run History 검토
- Slack 알림으로 실패 즉시 감지(출처: airflow-dag-design-patterns)
DAP 운영 시나리오
일일 데이터 파이프라인 예시:
매일 09:00
↓
[Airflow DAG 시작]
├─ [TaskGroup: DataStage ETL]
│ ├─ Extract Job (GoldenGate S3 → Local)
│ ├─ Transform Job (복잡한 데이터 변환)
│ └─ Load Job (Redshift 적재)
│
├─ [QualityStage 데이터 품질 검증] (선택)
│
└─ [Slack 알림] (성공/실패)
Retry 흐름:
- DataStage 잡 일시적 실패 → Airflow가 2회 재실행
- 멱등성 확보 → 동일 execution_date의 데이터만 재처리
- 기존 데이터 중복 없음 ✅
Related Pages
- dag-idempotency — DAG/DataStage 재실행 안전성 원칙
- airflow-dag-design-patterns — Airflow 설계 패턴 (TaskGroup·Retry·알림)
- datastage-parallel-job-architecture — DataStage 병렬 엔진·스테이지 카테고리
- datastage-job-design-troubleshooting — DataStage 설계 오류 진단 (고아 스테이지·컬럼명)
- astronomer-dag-best-practices — Airflow 공식 가이드
- top-10-airflow-best-practices-data-engineers — Airflow 실무 10가지
- datastage-developing-parallel-jobs — DataStage 병렬 잡 개발 공식 문서
- datastage-troubleshooting-job-design-issues — DataStage 트러블슈팅 공식 문서
- apache-airflow — Airflow 플랫폼
- ibm-datastage — DataStage ETL 플랫폼
- ibm-qualitystage — QualityStage 데이터 품질 플랫폼