Search

[Apache Flink] 5. Confluent Cloud for Apache Flink Lab 2 따라하기

[Apache Flink] 1. 데이터 파이프라인의 진화: 배치에서 스트리밍으로
Data Engineering
Apache Flink
2025/06/22
[Apache Flink] 1. 데이터 파이프라인의 진화: 배치에서 스트리밍으로
Data Engineering
Apache Flink
2025/06/22
지난 글에서는 Confluent Cloud for Apache Flink의 기본 기능과 중복 데이터 제거를 통한 실시간 데이터 정제를 살펴보았습니다.
이번 글에서는 Lab 2를 통해 한 단계 더 발전된 데이터 enrichment 패턴고급 조인 기법을 자세히 다뤄보겠습니다. 특히 실제 운영 환경에서 자주 접하는 temporal joinstatement sets를 활용한 복합 데이터 파이프라인 구축 방법을 구체적으로 살펴보겠습니다.

실시간 데이터 Enrichment의 핵심 과제

실제 비즈니스 환경에서는 주문 데이터가 단독으로 존재하는 경우는 거의 없습니다. 고객 정보, 상품 카탈로그, 결제 기록 등 다양한 데이터 소스와 결합되어야 의미 있는 비즈니스 인사이트를 제공할 수 있습니다. 하지만 이러한 데이터 enrichment 과정에서 다음과 같은 현실적인 문제들이 발생합니다.
문제 유형
구체적 이슈
실무 영향
시간 불일치
주문 시점과 고객 정보 변경 시점의 차이
잘못된 고객 정보로 주문 처리
상태 폭증
무한 스트림과 차원 테이블 조인 시 메모리 사용량 급증
시스템 성능 저하 및 장애
데이터 일관성
여러 데이터 소스 간 동기화 문제
비즈니스 로직 오류
Lab 2에서는 이러한 문제들을 체계적으로 해결하는 방법을 제시합니다.

고급 조인 패턴: Regular Join vs Temporal Join

Regular Join의 한계점

먼저 일반적인 내부 조인의 문제점을 살펴보겠습니다.
-- 고객 ID가 3001인 주문에 대해 고객 이메일 정보를 조인 SELECT order_id, unique_orders.`$rowtime`, -- 주문 발생 시각 (이벤트 시간) email -- 고객 이메일 정보 FROM unique_orders INNER JOIN customers -- 일반 조인 (Regular Join) ON unique_orders.customer_id = customers.customer_id WHERE unique_orders.customer_id = 3001;
SQL
복사
Regular Join의 문제점
1.
무한 상태 증가: 주문 데이터가 무한히 증가하면 조인 상태도 무한히 커짐
2.
부적절한 업데이트: 고객 정보 변경 시 과거 주문까지 모두 업데이트됨
3.
메모리 부족: State TTL 없이는 메모리 사용량이 계속 증가

Temporal Join의 우월성

Temporal Join은 이러한 문제를 근본적으로 해결합니다.
-- 고객 ID가 3001인 주문에 대해 주문 시점의 고객 이메일 정보를 조회 SELECT order_id, unique_orders.`$rowtime`, -- 주문 발생 시각 (이벤트 시간) email -- 주문 당시의 고객 이메일 (변경 불가) FROM unique_orders -- Temporal Join을 사용하여 주문 발생 시점 기준으로 고객 정보 조회 INNER JOIN customers FOR SYSTEM_TIME AS OF unique_orders.`$rowtime` ON unique_orders.customer_id = customers.customer_id WHERE unique_orders.customer_id = 3001;
SQL
복사
Temporal Join의 핵심 메커니즘
구성 요소
기능
실무 의미
FOR SYSTEM_TIME AS OF
시점 기준 조인
주문 발생 시점의 고객 정보만 조회
unique_orders.$rowtime
이벤트 시간 참조
실제 주문 발생 시간 기준으로 버전 선택
Versioned Table
버전 관리 테이블
고객 정보 변경 이력을 시간순으로 관리
추가 설명 - Temporal Join의 내부 동작
Temporal Join은 "Time Travel" 개념을 구현합니다. 각 주문에 대해 다음과 같은 과정을 거칩니다.
1.
시점 결정: 주문의 $rowtime을 기준 시점으로 설정
2.
버전 검색: 해당 시점에서 유효했던 고객 정보 버전을 찾음
3.
스냅샷 조인: 찾은 고객 정보 버전과 주문을 조인
4.
불변성 보장: 이후 고객 정보가 변경되어도 과거 주문은 영향받지 않음
이를 통해 고객이 이메일을 king.okuneva@yahoo.com에서 johnny.kling@gmail.com으로 변경하더라도, 이전 주문들은 변경 전 이메일을 유지합니다.

Interval Join: 시간 기반 연관성 검증

결제가 없는 주문은 유효하지 않습니다. Interval Join을 사용하여 주문과 결제를 시간 조건으로 연결할 수 있습니다.

Valid Orders 테이블 생성

-- 상태 관리 설정 SET 'client.statement-name' = 'valid-orders-materializer'; -- 유효 주문 테이블 생성 -- 주문과 결제를 시간 조건으로 조인하여 유효한 주문 추출 -- 결제 시각 기준으로 10분 이내에 발생한 주문만 매칭 CREATE TABLE valid_orders ( order_id STRING, customer_id INT, product_id STRING, order_time TIMESTAMP_LTZ(3), payment_time TIMESTAMP_LTZ(3), amount DECIMAL, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND -- 이벤트 시간 기준 워터마크 설정 ) AS SELECT unique_orders.order_id, customer_id, product_id, unique_orders.`$rowtime` AS order_time, -- 주문 시각 payment_time, -- 결제 시각 amount -- 결제 금액 FROM unique_orders INNER JOIN payments ON unique_orders.order_id = payments.order_id -- 주문 시각이 결제 시각보다 10분 전 ~ 결제 시각 사이인 경우만 유지 WHERE unique_orders.`$rowtime` BETWEEN payment_time - INTERVAL '10' MINUTES AND payment_time;
SQL
복사
Interval Join의 핵심 로직
BETWEEN payment_time - INTERVAL '10' MINUTES AND payment_time: 결제 시간 기준 10분 전까지의 주문과 매칭
실무 활용: 결제 지연을 고려한 유연한 매칭 정책
성능 최적화: 시간 제약으로 조인 범위 제한

복합 데이터 Enrichment: Multi-Table Join

종합 데이터 프로덕트 생성

실제 비즈니스에서는 여러 데이터 소스를 동시에 결합해야 합니다.
-- 복합 데이터 프로덕트 설정 SET 'client.statement-name' = 'valid-orders-customer-product-materializer'; -- 주문 + 고객 + 상품 정보를 통합한 데이터 프로덕트 생성 CREATE TABLE order_customer_product ( order_id STRING, customer_id INT, name STRING, email STRING, brand STRING, product STRING, sale_price DOUBLE ) WITH ('changelog.mode' = 'retract') -- 변경 사항 추적을 위한 CDC 설정 AS SELECT valid_orders.order_id, valid_orders.customer_id, customers.name, customers.email, products.brand, products.name AS product, valid_orders.amount AS sale_price FROM valid_orders -- 주문 시점 기준 고객 정보 조인 (Temporal Join) INNER JOIN customers FOR SYSTEM_TIME AS OF valid_orders.order_time ON valid_orders.customer_id = customers.customer_id -- 주문 시점 기준 상품 정보 조인 (Temporal Join) INNER JOIN products FOR SYSTEM_TIME AS OF valid_orders.order_time ON valid_orders.product_id = products.product_id;
SQL
복사

다중 Temporal Join 패턴 분석

조인 대상
시간 기준
활용 목적
customers
valid_orders.order_time
주문 당시 고객 정보 확보
products
valid_orders.order_time
주문 당시 상품 정보 확보
changelog.mode = 'retract'
-
업데이트/삭제 이벤트 처리
추가 설명 - Changelog Mode
changelog.mode = 'retract' 설정은 변경 데이터 캡처(CDC) 패턴을 지원합니다.
INSERT: 새로운 레코드 추가 시
UPDATE_BEFORE: 기존 레코드 삭제 (retraction)
UPDATE_AFTER: 새로운 레코드 삽입
DELETE: 레코드 삭제 시
이를 통해 downstream 시스템에서 정확한 상태 변경을 추적할 수 있습니다.

Statement Sets: 효율적인 다중 파이프라인 관리

프로모션 계산 로직

고급 비즈니스 로직을 구현해보겠습니다.
-- 전자제품 프로모션 대상 고객 식별 -- 특정 브랜드 제품을 많이 구매한 고객을 프로모션 대상으로 추출 SELECT customer_id, COLLECT(brand) AS products, -- 구매한 브랜드 목록 수집 'bundle_offer' AS promotion_name -- 고정 프로모션 이름 FROM order_customer_product WHERE brand IN ('Samsung', 'Sony', 'LG') -- 대상 브랜드 필터 GROUP BY customer_id HAVING COUNT(DISTINCT brand) >= 2 -- 서로 다른 브랜드 2개 이상 AND COUNT(brand) > 5; -- 총 5개 이상 구매
SQL
복사

프로모션 결과 저장

-- 프로모션 테이블 생성 CREATE TABLE electronics_promotions ( customer_id INT, promotion_name STRING, PRIMARY KEY (customer_id) NOT ENFORCED ); -- 프로모션 데이터 삽입 SET 'client.statement-name' = 'electronics-promotions-materializer'; INSERT INTO electronics_promotions SELECT customer_id, 'bundle_offer' AS promotion_name FROM order_customer_product WHERE brand IN ('Samsung', 'Sony', 'LG') GROUP BY customer_id HAVING COUNT(DISTINCT brand) >= 2 AND COUNT(brand) > 5;
SQL
복사

고급 윈도우 연산: 고객 충성도 계산

시간 기반 집계와 윈도우 함수

-- 고객별 시간대별 주문 통계 -- 고객별 최근 1시간 내 주문 수와 총 결제 금액 집계 SELECT customer_id, order_id, order_time, COUNT(*) OVER w AS num_orders, -- 최근 1시간 주문 수 SUM(amount) OVER w AS total_price -- 최근 1시간 총 결제 금액 FROM `valid_orders` WINDOW w AS ( PARTITION BY customer_id -- 고객별로 집계 분리 ORDER BY order_time -- 주문 시각 기준 정렬 RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW -- 최근 1시간 범위 );
SQL
복사
윈도우 함수의 핵심 구성 요소
구성 요소
기능
실무 활용
PARTITION BY customer_id
고객별 윈도우 분리
고객별 독립적 계산
ORDER BY order_time
시간순 정렬
순차적 누적 계산
RANGE BETWEEN ... AND CURRENT ROW
시간 범위 지정
최근 1시간 데이터만 포함

CTE를 활용한 충성도 등급 계산

-- CTE를 활용한 고객 등급 분류 CREATE TABLE reward_levels ( customer_id BIGINT, total_price DOUBLE, rewards_level STRING, updated_at TIMESTAMP_LTZ(3), PRIMARY KEY (customer_id) NOT ENFORCED ) AS WITH total_price_per_customer_1h AS ( -- 고객별 최근 1시간 누적 결제 금액 SELECT customer_id, SUM(amount) OVER w AS total_price, order_time FROM `valid_orders` WINDOW w AS ( PARTITION BY customer_id ORDER BY order_time RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW ) ) -- 금액 구간별 등급 부여 SELECT COALESCE(customer_id, 0) AS customer_id, -- null 방지 total_price, CASE WHEN total_price > 30000 THEN 'GOLD' WHEN total_price > 20000 THEN 'SILVER' WHEN total_price > 10000 THEN 'BRONZE' ELSE 'NONE' END AS rewards_level, order_time AS updated_at FROM total_price_per_customer_1h;
SQL
복사
추가 설명 - CTE(Common Table Expression)의 실무 활용
CTE는 복잡한 쿼리의 가독성 향상중간 결과 재사용을 위해 사용됩니다.
1.
임시 결과 집합: total_price_per_customer_1h는 쿼리 실행 동안만 존재
2.
로직 분리: 윈도우 계산과 등급 분류 로직을 명확히 구분
3.
메모리 효율성: 물리적 테이블 생성 없이 논리적 분할 수행
COALESCE(customer_id, 0) 함수는 null 값 처리를 담당합니다.
NULL인 customer_id를 0으로 대체
데이터 품질 보장 및 downstream 오류 방지

실시간 데이터 파이프라인 관찰성

Stream Lineage를 통한 데이터 플로우 추적

Confluent Cloud의 Stream Lineage 기능을 통해 구축한 데이터 파이프라인을 시각화할 수 있습니다.
1.
데이터 프로덕트 식별: order_customer_product, electronics_promotions
2.
의존성 추적: 각 테이블 간 연관관계 파악
3.
영향도 분석: 변경 사항의 downstream 영향 범위 확인
이는 데이터 거버넌스운영 투명성 확보에 필수적입니다.

Lab 2에서 배운 핵심 패턴 정리

조인 패턴 선택 가이드

패턴
적용 시나리오
장점
주의사항
Temporal Join
Event enrichment
시점 정확성, 메모리 효율성
Primary key 필수
Interval Join
시간 관련성 검증
유연한 시간 조건
워터마크 설정 중요
Regular Join
정적 테이블 조인
간단한 구현
State TTL 필수

성능 최적화 기법

기법
목적
구현 방법
State TTL
메모리 사용량 제한
SET 'sql.state-ttl' = '1 hour'
Changelog Mode
효율적 변경 추적
'changelog.mode' = 'retract'
Statement Sets
여러 파이프라인 최적화
EXECUTE STATEMENT SET

마무리

이번 글에서는 Confluent Cloud for Apache Flink Lab 2를 통해 고급 데이터 처리 패턴을 살펴보았습니다. 특히 temporal join을 활용한 정확한 시점 기반 enrichment와 statement sets를 통한 효율적인 다중 파이프라인 관리 방법을 구체적으로 학습했습니다.
Kafka가 이벤트를 안전하게 수집·전달하는 데 중점을 두었다면, Flink는 그 위에서 시간 기준의 정제, 상태 기반의 조인, 그리고 복합 로직 실행까지 가능하게 해주는 실시간 처리 플랫폼입니다. 앞으로 실무에서 Flink를 사용할 때 이 글들이 조금이라도 도움이 되었으면 좋겠습니다.
이 시리즈가 Flink 기반 실시간 파이프라인을 설계할 때 실질적인 참고가 되었기를 바랍니다.