보통 DataHub에 소스(데이터베이스, 파일, BI 등) 연결은 잘 해두는데, 막상 Airflow 쪽은 빈칸으로 남는 경우가 많습니다. Datahub 내 Data Sources에서 Airflow와 관련된 항목이 보이지 않기 때문입니다. 그래서 Airflow 쪽에서 CLI나 GMS API를 통해 DataHub로 메타데이터를 푸시해야 하는데, 그 과정을 모르면 단순히 Source들만 연결한 채로 끝내는 경우가 많습니다.
하지만 Airflow는 조직 내 ETL/ELT 파이프라인의 핵심 중 하나입니다. 이걸 DataHub와 연동하면, Task → Dataset → Task 흐름이 살아나면서 데이터의 실제 이동 경로를 한눈에 파악할 수 있습니다.
이번 글에서는 운영 중인 DataHub에 Airflow Lineage 연결을 위해 필요한 최소 설정만 추가해서 자동 수집하는 방법을 알아보겠습니다.
전체 구조 개념
DataHub와 Airflow를 연결하는 방식은 크게 두 가지입니다.
구분 | 방식 | 주요 도구 | 특징 |
Pull | DataHub 쪽에서 Airflow 메타데이터를 “가져오는” 방식 | datahub ingest CLI / source: airflow | 정기 수집, 환경 단순, 실시간성 낮음 |
Push | Airflow가 DAG 실행 시 DataHub로 메타데이터를 보내는 방식 | acryl-datahub-airflow-plugin | 실시간 라인리지, 세밀한 Task 단위 추적 |
Pull 방식 (Ingestion)
DataHub가 Airflow 메타데이터 DB를 주기적으로 긁어옵니다.
DataHub CLI를 사용해 아래처럼 실행합니다.
datahub ingest -c ./airflow_ingestion.yml
Plain Text
복사
# airflow_ingestion.yml
source:
type: airflow
config:
# Airflow 메타데이터 DB 연결 정보
connection:
host: localhost
user: airflow
password: airflow
database: airflow
sink:
type: datahub-rest
config:
server: http://<gms-host>:8080
Plain Text
복사
이 방식은 Airflow 메타데이터 DB에 접근해 DAG / Task 정의를 긁어오기 때문에 라인리지를 주기적으로 업데이트할 수 있지만, DAG 실행 단위의 실제 데이터 입출력(세밀한 라인리지)는 반영되지 않습니다.
Push 방식 (Plugin)
Airflow 자체가 DAG 실행 중 직접 GMS로 이벤트를 전송하는 구조입니다.
•
acryl-datahub-airflow-plugin: DAG/Task 실행 이벤트를 감지
•
inlets / outlets: Task의 입력/출력 Dataset 명시
•
datahub-rest connection: GMS로 전송할 대상 주소
Push 방식은 DAG이 실행될 때마다 DataHub로 실시간 메타데이터가 푸시되어 Task 간 실제 데이터 플로우가 시각화됩니다.
1. DataHub 쪽 사전 조건
DataHub는 별도 설정이 필요 없습니다.
•
GMS(메타데이터 서버)가 외부 접근 가능 (http://<host>:8080)
•
Kafka / OpenSearch 등 백엔드 스택 정상 동작
•
인증 환경이라면 Airflow가 토큰/헤더로 접근 가능해야 함
2. Airflow 플러그인 설치
Airflow 배포 환경에 아래 두 패키지만 추가하면 됩니다.
pip install acryl-datahub-airflow-plugin>=1.3.0 acryl-datahub>=0.13.0
Plain Text
복사
Docker Compose 예시
DATAHUB_GMS_HOST는 실제 사용하는 사내 GMS 주소로 세팅하면 됩니다.
services:
webserver:
image: apache/airflow:2.9.3-python3.11
environment:
AIRFLOW__CORE__LOAD_EXAMPLES: "False"
AIRFLOW__LINEAGE__BACKEND: "" # 구형 lineage 비활성화
_PIP_ADDITIONAL_REQUIREMENTS: "acryl-datahub-airflow-plugin>=1.3.0 acryl-datahub>=0.13.0"
scheduler:
image: apache/airflow:2.9.3-python3.11
environment:
AIRFLOW__CORE__LOAD_EXAMPLES: "False"
AIRFLOW__LINEAGE__BACKEND: ""
_PIP_ADDITIONAL_REQUIREMENTS: "acryl-datahub-airflow-plugin>=1.3.0 acryl-datahub>=0.13.0"
airflow-init:
image: apache/airflow:2.9.3-python3.11
entrypoint: ["/bin/bash"]
command: >
-c "
set -e
python -m pip install --no-cache-dir acryl-datahub-airflow-plugin>=1.3.0 acryl-datahub>=0.13.0 &&
airflow db init &&
airflow users create --role Admin --username admin --password admin --firstname A --lastname D --email admin@example.com &&
airflow connections add datahub_rest_default --conn-type datahub-rest --conn-host \"$DATAHUB_GMS_HOST\" &&
echo 'Init done'
"
environment:
DATAHUB_GMS_HOST: "http://host.docker.internal:8080"
Plain Text
복사
3. Airflow
DataHub 연결
Airflow Connection은 한 개면 충분합니다. 플러그인은 이 커넥션을 기본으로 사용합니다. 별도 설정이 없다면 자동으로 datahub_rest_default를 참조하기 때문에 Airflow UI 접속 → Admin → Connections에서 세팅하면 됩니다.
항목 | 값 |
Conn Id | datahub_rest_default |
Conn Type | datahub-rest |
Host | http://<gms-host>:8080 |
CLI로 생성하려면 아래와 같습니다.
airflow connections add datahub_rest_default \
--conn-type datahub-rest \
--conn-host "http://<gms-host>:8080"
Plain Text
복사
4. DAG 설정 — inlets / outlets 선언
핵심은 Task의 입력(inlets) 과 출력(outlets) Dataset을 선언하는 것입니다.
이 한 줄 덕분에 DataHub가 DAG 간 라인리지를 자동으로 구성합니다.
(1) 생산 DAG — outlets
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from datahub_provider.entities import Dataset
def transform_stage_2(**context):
print("Transform stage 2 and output dataset...")
with DAG(
dag_id="fake_mongo_extract_dag",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
) as dag:
t3 = PythonOperator(
task_id="transform_stage_2",
python_callable=transform_stage_2,
outlets=[Dataset(platform="mongodb", name="fake_mongo.output", env="PROD")],
)
Python
복사
(2) 소비 DAG — inlets
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from datahub_provider.entities import Dataset
def final_load(**context):
print("Load data from fake_mongo.output into downstream system")
with DAG(
dag_id="downstream_processing_dag",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
) as dag:
load_task = PythonOperator(
task_id="final_load",
python_callable=final_load,
inlets=[Dataset(platform="mongodb", name="fake_mongo.output", env="PROD")],
)
Python
복사
platform, name, env 세 값이 양쪽에서 동일해야 DataHub가 Task → Dataset → Task 관계를 자동으로 연결합니다.
5. 실행 및 확인
1.
DAG을 Trigger
2.
플러그인이 실행 이벤트 + IO 메타데이터를 GMS로 전송
3.
DataHub UI에서 확인
Datahub 내 위치 | 내용 |
Platforms | Trigger된 Dag이 Airflow 내에서 확인 가능 |
Browse → Datasets → DB 내 연동된 테이블 | Upstream과 Downstream에서 연결된 Airflow 확인 가능 |
6. URN 직접 선언
DataHub 내부에서는 모든 엔티티(Dataset, Flow, Job 등)를 URN(Uniform Resource Name) 으로 식별합니다.
urn:li:dataset:(urn:li:dataPlatform:mongodb,fake_mongo.output,PROD) 이렇게 작성하면 “PROD 환경의 mongodb 플랫폼에 있는 fake_mongo.output 데이터셋”을 뜻합니다.
기본적으로 아래처럼 쓰면 DataHub가 자동으로 URN을 생성합니다.
Dataset(platform="mongodb", name="fake_mongo.output", env="PROD")
Plain Text
복사
하지만 이미 DataHub에 등록된 Dataset을 정확한 URN으로 직접 참조하고 싶을 때는 아래처럼 사용할 수 있습니다.
from datahub_provider.entities import Dataset
Dataset.from_urn("urn:li:dataset:(urn:li:dataPlatform:mongodb,fake_mongo.output,PROD)")
Python
복사
일반적인 DAG에서는 Dataset() 선언이면 충분합니다. 다만, 다른 팀 Dataset을 연결하거나 DataHub UI에서 복사한 URN을 그대로 사용해야 할 경우에만 from_urn()을 쓰면 됩니다.
마무리
Airflow를 DataHub에 연동하는 건 복잡한 구축 작업이 아닙니다. 플러그인 두 개, 커넥션 하나, DAG에 in/out 한 줄이면 충분합니다. Airflow는 대부분의 데이터 조직에서 핵심 파이프라인 역할을 합니다. 하지만 Airflow UI만으로는 데이터의 실제 흐름과 종속 관계를 한눈에 보기 어렵습니다.
DataHub에 Airflow를 간단히 연동하기만 해도, 단순한 데이터 카탈로그를 넘어 데이터가 어디서 생성되고 어디로 흘러가는지까지 누구든지 Datahub에서 확인할 수 있게 됩니다.