Search

[트러블슈팅] Airflow Broken DAG

문제 배경

동료 개발자가 다른 프로젝트에서 사용하던 라이브러리를 그대로 가져와, Airflow의 dags/ 내부에 별도의 프로젝트 폴더를 구성했습니다. 이 폴더는 DAG 실행용 코드가 아니라 단순히 모듈과 유틸을 담은 Python 패키지 형태였고, 해당 코드를 포함한 이미지를 빌드하여 EKS Operator로 정상적으로 실행 중이었습니다.
즉, 실제 DAG Task는 문제없이 실행되었고, ImportError도 EKS 파이프라인 실행 중에는 전혀 발생하지 않았습니다. 하지만 이상하게도 Airflow UI에서는 계속해서 해당 프로젝트에서 사용한 라이브러리에 대해서 import 오류로 인한 Broken DAG 경고가 나타났습니다.
더욱 이상한 점은 아래와 같았습니다.
1.
해당 에러가 발생하여 .airflowignore에 해당 폴더를 포함시켰지만 계속 에러가 발생함
2.
.airflowignore에 해당 폴더를 포함시킨 후 운영(Prod) 환경에 배포하면 해당 에러가 발생하지 않
3.
Webserver 재시작 후에도 UI의 Broken 표시가 사라지지 않음
즉, “DAG에서 쓰지도 않는 프로젝트 폴더가 왜 임포트되고, 왜 UI에서만 에러가 남아있는가”가 문제의 핵심이었습니다.

1. 첫 번째 원인 — .airflowignore미래 스캔만 막음

.airflowignore앞으로의 파일 스캔 범위를 제한할 뿐, 과거에 기록된 ImportError를 지우지 않습니다. Airflow는 임포트 실패를 메[타데이터 DB의 import_error 테이블에 저장하고, UI는 이 테이블을 기반으로 Broken DAG를 표시합니다.
-- 과거 오류 기록 확인 SELECT * FROM import_error WHERE filename LIKE '%{에러난 폴더 이름}%';
SQL
복사
과거 기록이 남아 있으면 .airflowignore를 추가하더라도 UI 경고는 계속 표시됩니다. 정리는 CLI로 진행합니다.
# 특정 시점 이전의 import_error 정리 airflow db clean \ --table import_error \ --clean-before-timestamp '2025-10-20 05:00:00' \ --yes
Bash
복사
정리 직후 UI의 Broken DAG 표시는 사라졌습니다.
airflowignore는 예방책입니다. 이미 기록된 과거는 DB에서 별도로 청소해야 합니다.

2. 두 번째 원인 — DAG_DISCOVERY_SAFE_MODE

Airflow의 기본 설정 DAG_DISCOVERY_SAFE_MODE=True에서는 파일 확장자만 보지 않고, 파일 내용까지 간단한 휴리스틱으로 검사합니다. 핵심 로직은 아래와 같습니다.
def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool: """ Heuristic that guesses whether a Python file contains an Airflow DAG definition. :param file_path: Path to the file to be checked. :param zip_file: if passed, checks the archive. Otherwise, check local filesystem. :return: True, if file might contain DAGs. """ if zip_file: with zip_file.open(file_path) as current_file: content = current_file.read() else: if zipfile.is_zipfile(file_path): return True with open(file_path, "rb") as dag_file: content = dag_file.read() content = content.lower() if b"airflow" not in content: return False return any(s in content for s in (b"dag", b"asset"))
Python
복사
즉, 파일 내용 안에 airflowdag(또는 asset) 문자열이 있으면 DAG 후보로 분류합니다. 문제 디렉토리 일부 파일의 주석에 “Airflow DAG”라는 문구가 포함되어 있었고, 이 때문에 Safe Mode 단계에서 이미 “후보”로 올라갔습니다. 이후 단계에서 .airflowignore가 적용되더라도, 환경/구조에 따라 후보 선정과 무시 로직의 타이밍이 어긋나 Import 시도가 발생할 수 있습니다.

3. Airflow의 DAG 파싱 순서

[1] 파일 스캔 시작 └─ .airflowignore 적용 (탐색 범위 축소) [2] Safe Mode 휴리스틱 검사 (DAG_DISCOVERY_SAFE_MODE=True) └─ 파일 내용에 'airflow' AND ('dag' OR 'asset')가 있으면 "후보" [3] 후보 파일 임포트 시도 ├─ 성공: DAG 등록 └─ 실패: import_error 테이블 기록 → UI에 Broken 표시
Plain Text
복사

부록: 소스 코드로 확인한 Airflow 파싱/오류 처리 메커니즘

역할
주요 경로
코드 위치
내용
오케스트레이션
airflow/dag_processing/manager.py
DAG 파일 스캔·할당·감시 총괄
파일 탐색/필터
airflow/utils/file.py
.airflowignore 적용, 후보 파일 산출
개별 파일 파싱
airflow/dag_processing/processor.py
격리 프로세스에서 import 실행
오류 영속화
airflow/models/errors.py
import_error 테이블 스키마

마무리

우리는 보통 DAG에서 직접 호출하는 라이브러리에만 신경을 씁니다. 그러나 Airflow 자체의 DAG 파싱 순서를 이해해 두면, dags/ 아래 프로젝트 폴더 구조와 주석/키워드 관리, 그리고 import_error 정리 절차까지 더 체계적으로 운영할 수 있습니다.
오랜만에 Airflow GitHub 소스를 직접 확인하며 파이프라인 내부 동작을 확인했습니다. 문제는 작았지만, “왜 그런가”를 코드 레벨에서 추적해 보는 과정이 재밌었습니다. 앞으로도 운영 이슈가 생기면 동작 순서와 상태 기록 지점을 먼저 의심하고, 필요한 경우 소스 코드를 한번 더 확인해보면 좋을 것 같습니다.