Search

[백엔드] 이벤트 버스

백엔드에서 시스템을 설계할 때, 모듈 간의 긴밀한 결합을 방지하고 코드의 유연성을 높이는 것은 중요한 과제입니다. 이런 맥락에서 등장한 중요한 개념 중 하나가 바로 이벤트 버스(Event Bus)입니다.
이번 글에서는 이벤트 버스의 개념을 간단히 살펴보고, 파이썬으로 이벤트 버스를 실제로 구축하고 사용하는 방법을 구체적인 코드와 함께 단계별로 살펴보겠습니다.

1. 이벤트 버스(Event Bus)

이벤트 버스는 애플리케이션 내부에서 일어나는 다양한 이벤트(Event)를 처리하는 중앙 집중형 통신 채널입니다. 조금 더 풀어서 설명하자면, 애플리케이션에서 특정 작업이 완료되거나, 중요한 상태 변화가 발생했을 때 이를 다른 컴포넌트나 서비스에 알리는 역할을 합니다.
이벤트 버스의 기본적인 구성 요소는 다음과 같습니다:
이벤트(Event): 시스템 내에서 일어난 중요한 변화나 행동을 나타내는 데이터 객체입니다. 예를 들어 사용자 회원가입 완료, 결제 처리 완료, 주문 상태 변경 등이 대표적인 이벤트입니다.
이벤트 핸들러(Event Handler): 특정 이벤트가 발생했을 때 실행되는 함수나 로직으로, 이벤트가 발생했을 때 수행해야 하는 작업을 정의합니다.
이벤트 버스(Event Bus): 이벤트 발행자와 이벤트 핸들러 사이를 연결하는 매개체로, 이벤트를 전달받아 등록된 핸들러에 전달하고 실행하는 역할을 수행합니다.
쉽게 예시를 들어보겠습니다.
예를 들어, 사용자가 웹사이트에서 회원가입을 하면 다음과 같은 일이 일어날 수 있습니다:
1.
회원가입을 처리하는 서비스에서 사용자 정보를 저장하고, 사용자 회원가입 완료라는 이벤트를 이벤트 버스에 발행합니다.
2.
이벤트 버스는 이 이벤트를 받아서, 미리 등록된 핸들러들에 전달합니다.
3.
각 핸들러는 독립적으로 자신이 해야 하는 일을 수행합니다. 예를 들어:
환영 이메일을 전송하는 핸들러
CRM 시스템에 신규 고객 정보를 업데이트하는 핸들러
마케팅 분석 데이터베이스에 가입 이벤트를 기록하는 핸들러 등
이 과정에서 중요한 것은 회원가입을 처리하는 컴포넌트가 직접 이메일 서비스나 CRM 서비스와 연동하는 것이 아니라, 이벤트 버스를 통해 간접적으로 연동된다는 것입니다.

2. 이벤트 버스의 주요 기능과 장점

이벤트 버스는 다음과 같은 중요한 장점을 제공합니다.

모듈 간 결합도 낮추기

일반적으로, 여러 서비스나 모듈이 서로 직접 호출하면 서비스 간의 종속성이 강해져 유지보수와 확장이 어려워집니다. 이벤트 버스를 통해 이벤트 기반 아키텍처를 적용하면 다음과 같은 이점을 얻습니다:
특정 서비스가 변경되어도 다른 서비스에 미치는 영향이 적어져 코드 유지보수가 쉬워집니다.
서비스 간의 명확한 경계를 유지할 수 있어 구조가 깔끔해지고, 새로운 기능을 추가하거나 기존 기능을 변경할 때 안정성이 높아집니다.
예를 들어, 환영 이메일을 보내는 서비스가 일시적으로 문제가 생겨 중단되더라도 회원가입 로직이나 CRM 서비스에는 전혀 영향을 미치지 않습니다.

비동기 처리로 시스템 성능과 확장성 증가

많은 시스템에서는 모든 작업이 즉시 완료될 필요가 없습니다. 예를 들어, 사용자가 가입을 완료한 후 환영 이메일을 보내는 작업은 즉각적이지 않아도 무방합니다. 이런 작업들을 비동기적으로 처리하면, 사용자 요청 처리 속도를 크게 향상시킬 수 있습니다.
사용자의 요청 처리가 빠르게 끝나고 무거운 작업은 백그라운드에서 처리되므로, 전반적인 성능 향상 효과가 있습니다.
시스템 부하가 클 때에도 주요 기능이 중단되지 않고 유연하게 처리됩니다.

확장성과 유연성

이벤트 기반 아키텍처는 새로운 핸들러 추가가 간단하며, 기존 이벤트에 대한 핸들러를 추가하거나 제거하는 것이 매우 쉽습니다.
새로운 비즈니스 요구사항이나 기능이 추가되었을 때, 기존 코드를 최소한으로 수정하면서 이벤트 핸들러를 추가하여 간편하게 대응할 수 있습니다.
신규 서비스가 필요하면 새로운 이벤트 핸들러만 정의하고 기존 이벤트 버스에 연결하면 됩니다.
예를 들어, 기존 회원가입 완료 이벤트를 기반으로 신규 마케팅 자동화 서비스를 연결하려면, 새 핸들러만 추가하면 간단히 통합할 수 있습니다.

3. 이벤트 버스 실제 구현하기 (Python 코드 예시)

아래에서 구체적인 예시를 통해 이벤트 버스를 파이썬 코드로 구현하는 과정을 살펴보겠습니다.

(1) 이벤트 버스 클래스 구현하기

먼저, 이벤트 버스의 핵심 기능을 담당하는 클래스(EventBus)를 정의합니다. 이벤트 핸들러를 등록하고, 이벤트를 발행하며, 이벤트 내역을리하는 등의 작업을 수행합니다.
# event_bus.py from typing import Dict, List, Type, Callable, Any import asyncio import logging import inspect logger = logging.getLogger(__name__) class EventBus: """비동기 지원 이벤트 버스 구현""" def __init__(self): # 이벤트 타입별 핸들러를 저장할 딕셔너리 self._handlers: Dict[Type, List[Callable]] = {} # 이벤트 내역 저장용 리스트 (디버깅 용도) self._event_history: List[Any] = [] self._max_history = 100 # 현재 실행 중인 비동기 태스크 추적 self._running_tasks: List[asyncio.Task] = [] def subscribe(self, event_type: Type, handler: Callable) -> None: """이벤트 핸들러를 등록합니다.""" if event_type not in self._handlers: self._handlers[event_type] = [] if handler not in self._handlers[event_type]: self._handlers[event_type].append(handler) logger.debug(f"핸들러 등록됨: {event_type.__name__} -> {handler.__name__}") async def publish(self, event: Any) -> None: """이벤트를 발행하고, 등록된 핸들러를 호출합니다.""" event_type = type(event) # 이벤트 내역 기록 self._event_history.append(event) if len(self._event_history) > self._max_history: self._event_history.pop(0) logger.debug(f"이벤트 발행됨: {event_type.__name__}") if event_type not in self._handlers: logger.debug(f"등록된 핸들러 없음: {event_type.__name__}") return # 핸들러 실행을 백그라운드에서 시작 task = asyncio.create_task(self._process_handlers(event, event_type)) self._running_tasks.append(task) task.add_done_callback(lambda t: self._running_tasks.remove(t)) async def _process_handlers(self, event: Any, event_type: Type) -> None: """핸들러를 비동기적으로 처리하는 내부 메서드""" tasks = [] for handler in self._handlers[event_type]: if inspect.iscoroutinefunction(handler): tasks.append(handler(event)) else: tasks.append(asyncio.to_thread(handler, event)) if tasks: await asyncio.gather(*tasks, return_exceptions=True) def get_running_tasks_count(self) -> int: """실행 중인 핸들러 수 반환 (디버깅용)""" return len(self._running_tasks)
Python
복사

(2) 싱글톤 이벤트 버스 인스턴스 생성 및 데코레이터 정의하기

이벤트 버스를 편리하게 사용하기 위해 싱글톤 인스턴스를 생성하고, 핸들러를 쉽게 등록할 수 있는 데코레이터를 정의합니다.
# __init__.py from typing import Type, Callable, Any from common.arch.event_bus.event_bus import EventBus # 이벤트 버스 싱글톤 인스턴스 event_bus = EventBus() def on_event(event_type: Type): """동기식 이벤트 핸들러 데코레이터""" def decorator(handler: Callable): event_bus.subscribe(event_type, handler) return handler return decorator def on_event_async(event_type: Type): """비동기식 이벤트 핸들러 데코레이터""" def decorator(handler: Callable): event_bus.subscribe(event_type, handler) return handler return decorator __all__ = ["EventBus", "event_bus", "on_event", "on_event_async"]
Python
복사

(3) 이벤트 정의하기

이벤트는 일반적으로 데이터 클래스로 정의합니다.
from dataclasses import dataclass @dataclass class UserCreatedEvent: user_id: str email: str
Python
복사

(4) 이벤트 핸들러 등록하기

정의한 데코레이터를 사용하여 간편하게 이벤트 핸들러를 등록할 수 있습니다.
동기 핸들러 예시:
from common.arch.event_bus import on_event @on_event(UserCreatedEvent) def handle_user_created(event: UserCreatedEvent): print(f"새로운 사용자 생성됨: {event.user_id}") # 동기 로직 처리
Python
복사
비동기 핸들러 예시:
from common.arch.event_bus import on_event_async @on_event_async(UserCreatedEvent) async def send_welcome_email(event: UserCreatedEvent): await email_service.send_email( to=event.email, subject="환영합니다!", content="가입해 주셔서 감사합니다!" )
Python
복사

(5) 이벤트 발행하기

이벤트를 발행하여 등록된 핸들러들이 실행되도록 합니다.
from common.arch.event_bus import event_bus async def create_user(user_data): # 사용자 생성 로직... user = {"user_id": "123", "email": "user@example.com"} # 이벤트 발행 await event_bus.publish(UserCreatedEvent(**user)) # 이벤트는 비동기적으로 처리되고 즉시 다음 코드 실행 return user
Python
복사

(6) 이벤트 버스 테스트 코드

다음 코드는 이벤트 버스의 실제 동작을 검증하고, 다양한 시나리오를 테스트하는 코드입니다.
# event_bus_test.py import asyncio import time from datetime import datetime from dataclasses import dataclass import logging # 타임스탬프에 밀리초를 포함하는 커스텀 로그 포매터 class MillisecondFormatter(logging.Formatter): def formatTime(self, record, datefmt=None): # datetime 객체 생성 dt = datetime.fromtimestamp(record.created) # 밀리초 포함하여 포맷팅 return dt.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 로깅 설정 handler = logging.StreamHandler() formatter = MillisecondFormatter("%(asctime)s [%(levelname)s] %(message)s") handler.setFormatter(formatter) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # 기존 핸들러 제거 후 새 핸들러 추가 logger.handlers = [] logger.addHandler(handler) # 상위 로거로 전파 방지 logger.propagate = False # 이벤트 버스를 직접 import (독립적인 테스트를 위해) if __name__ == "__main__": logger.info("현재 디렉토리로 import") # 현재 디렉토리의 event_bus.py 직접 import from event_bus import EventBus # 이벤트 버스 인스턴스 생성 event_bus = EventBus() # 데코레이터 함수 정의 def on_event(event_type): def decorator(handler): event_bus.subscribe(event_type, handler) return handler return decorator def on_event_async(event_type): def decorator(handler): event_bus.subscribe(event_type, handler) return handler return decorator else: # 모듈로 임포트 시에는 일반 경로 사용 logger.info("절대 경로로 import") from common.arch.event_bus import event_bus, on_event, on_event_async # 테스트용 이벤트 정의 @dataclass class TestEvent: message: str timestamp: str = None def __post_init__(self): if self.timestamp is None: # 현재 시간을 읽기 쉬운 형식으로 저장 self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 이벤트 처리 결과 저장소 (테스트 검증용) results = [] # 동기 이벤트 핸들러 등록 @on_event(TestEvent) def sync_event_handler(event: TestEvent): """동기 이벤트 핸들러""" logger.info( f"[동기 핸들러] 시작: 메시지='{event.message}', 이벤트시간={event.timestamp}" ) results.append(f"동기:{event.message}") # 의도적으로 약간의 지연 추가 (처리 시간 시뮬레이션) time.sleep(3) completion_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] logger.info( f"[동기 핸들러] 완료: 메시지='{event.message}', 완료시간={completion_time}" ) # 비동기 이벤트 핸들러 등록 @on_event_async(TestEvent) async def async_event_handler(event: TestEvent): """비동기 이벤트 핸들러""" logger.info( f"[비동기 핸들러] 시작: 메시지='{event.message}', 이벤트시간={event.timestamp}" ) results.append(f"비동기:{event.message}") # 의도적으로 약간의 지연 추가 (비동기 처리 시간 시뮬레이션) await asyncio.sleep(3) completion_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] logger.info( f"[비동기 핸들러] 완료: 메시지='{event.message}', 완료시간={completion_time}" ) # 다른 비동기 이벤트 핸들러 등록 (멀티 핸들러 테스트) @on_event_async(TestEvent) async def another_async_handler(event: TestEvent): """추가 비동기 이벤트 핸들러""" logger.info( f"[다른 비동기 핸들러] 시작: 메시지='{event.message}', 이벤트시간={event.timestamp}" ) results.append(f"다른비동기:{event.message}") await asyncio.sleep(0.2) completion_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] logger.info( f"[다른 비동기 핸들러] 완료: 메시지='{event.message}', 완료시간={completion_time}" ) async def publish_events(): """여러 이벤트를 발행하는 비동기 함수""" logger.info("===== 이벤트 버스 테스트 시작 =====") # 첫 번째 이벤트 발행 logger.info("----- 첫 번째 이벤트 발행 -----") event1 = TestEvent(message="사용자_생성_이벤트") await event_bus.publish(event1) # 비동기 처리 확인을 위한 로그 (병렬 처리 시각화) logger.info(f"[시스템] 첫 번째 이벤트 발행 후 메인 스레드는 계속 실행됨") await asyncio.sleep(1) logger.info(f"[시스템] 첫 번째 이벤트는 백그라운드에서 계속 처리 중") # 두 번째 이벤트 발행 logger.info("----- 두 번째 이벤트 발행 -----") event2 = TestEvent(message="주문_완료_이벤트") await event_bus.publish(event2) # 비동기 처리 확인을 위한 로그 (병렬 처리 시각화) logger.info(f"[시스템] 두 번째 이벤트 발행 후 메인 스레드는 계속 실행됨") await asyncio.sleep(1) logger.info(f"[시스템] 첫/두 번째 이벤트는 백그라운드에서 병렬 처리 중") # 세 번째 이벤트 발행 (연속 발행 테스트) logger.info("----- 여러 이벤트 연속 발행 -----") events = [ TestEvent(message=f"결제_처리_이벤트"), TestEvent(message=f"알림_발송_이벤트"), TestEvent(message=f"재고_업데이트_이벤트"), ] for i, event in enumerate(events, 1): event_types = ["결제_처리", "알림_발송", "재고_업데이트"] # 이벤트 발행 시작 로깅 current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] logger.info( f"[시스템] {event_types[i-1]} 이벤트 발행 시작: 시간={current_time}" ) # 이벤트 발행 await event_bus.publish(event) # 이벤트 발행 완료 로깅 current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] logger.info( f"[시스템] {event_types[i-1]} 이벤트 발행 완료: 시간={current_time}" ) # 약간의 간격을 두고 발행 (로그 확인용) await asyncio.sleep(0.1) # 모든 이벤트 처리 완료 대기 current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] logger.info( f"[시스템] 모든 이벤트 발행 완료, 백그라운드 처리 확인중: 시간={current_time}" ) await asyncio.sleep(2) # 실행 중인 모든 이벤트 핸들러가 완료될 때까지 대기 wait_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] logger.info(f"[시스템] 모든 이벤트 핸들러 완료 대기 시작: 시간={wait_start_time}") # 실행 중인 태스크 수 확인 (최대 30초 대기) timeout = 30 # 최대 대기 시간(초) start_time = time.time() while event_bus.get_running_tasks_count() > 0: # 현재 실행 중인 태스크 수 로깅 if event_bus.get_running_tasks_count() > 0: logger.info( f"[시스템] 실행 중인 이벤트 핸들러: {event_bus.get_running_tasks_count()}개" ) # 제한 시간 확인 if time.time() - start_time > timeout: logger.warning( f"[시스템] 최대 대기 시간({timeout}초)이 초과되었습니다. 일부 핸들러는 여전히 실행 중입니다." ) break # 잠시 대기 await asyncio.sleep(1) wait_end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] logger.info(f"[시스템] 모든 이벤트 핸들러 처리 완료: 시간={wait_end_time}") # 결과 출력 logger.info("===== 테스트 결과 =====") logger.info(f"수집된 결과: {results}") logger.info(f"발행된 이벤트 수: {len(event_bus.get_event_history())}") logger.info( f"이벤트 히스토리: {[e.message for e in event_bus.get_event_history()]}" ) logger.info("===== 테스트 완료 =====") # 메인 실행 함수 async def main(): try: await publish_events() except Exception as e: logger.error(f"테스트 중 오류 발생: {e}", exc_info=True) # 직접 실행 시 테스트 수행 if __name__ == "__main__": logger.info("이벤트 버스 테스트 시작...") asyncio.run(main())
Python
복사

마무리

이번 글에서는 백엔드에서 유용하게 사용할 수 있는 이벤트 버스의 개념과 실제 구현 방법을 단계별로 구체적으로 살펴보았습니다.
이벤트 버스는 복잡한 애플리케이션에서 모듈 간의 독립성과 유지 보수성을 높여주는 필수적인 아키텍처 요소입니다. 위에서 제시한 코드와 설명을 참고하여 실제 프로젝트에 적용하면, 코드 구조의 명확성과 관리 효율성을 크게 향상시킬 수 있t습니다.
다음 글에서는 LLM 에이전트 시스템 설계에 대해서 간단히 이야기해보려고 합니다.