이벤트 리스너
Last update - 2025. 9. 5.
개요
이벤트 리스너는 특정 이벤트를 수신하고 처리하는 핵심 컴포넌트입니다. 다양한 패턴과 고급 기능을 활용하여 효율적인 이벤트 처리 시스템을 구축할 수 있습니다.
리스너 등록과 관리
기본 리스너 등록
use orbital::event::{EventListener, EventCallback, EventEmitter};
use std::sync::Arc;
let emitter = Arc::new(EventEmitter::new());
// 단순 콜백 리스너
let simple_callback: EventCallback = Arc::new(|event_data| {
println!("이벤트 수신: {} - {}", event_data.name(), event_data.payload());
Ok(())
});
let listener = EventListener::new("user:login", simple_callback);
let listener_id = emitter.add_listener(listener)?;
println!("리스너 등록됨, ID: {}", listener_id);
여러 리스너 관리
use orbital::event::{EventListener, EventCallback, EventEmitter};
use std::sync::Arc;
use std::collections::HashMap;
struct ListenerManager {
emitter: Arc<EventEmitter>,
listeners: HashMap<String, String>, // name -> listener_id
}
impl ListenerManager {
fn new() -> Self {
Self {
emitter: Arc::new(EventEmitter::new()),
listeners: HashMap::new(),
}
}
fn register_listener(&mut self, name: &str, event_name: &str, callback: EventCallback) -> Result<(), Box<dyn std::error::Error>> {
let listener = EventListener::new(event_name, callback);
let listener_id = self.emitter.add_listener(listener)?;
self.listeners.insert(name.to_string(), listener_id);
println!("리스너 '{}' 등록: {} -> {}", name, event_name, listener_id);
Ok(())
}
fn unregister_listener(&mut self, name: &str) -> Result<(), Box<dyn std::error::Error>> {
if let Some(listener_id) = self.listeners.remove(name) {
self.emitter.remove_listener(&listener_id)?;
println!("리스너 '{}' 제거됨", name);
}
Ok(())
}
fn list_listeners(&self) {
println!("등록된 리스너들:");
for (name, id) in &self.listeners {
println!(" {} -> {}", name, id);
}
}
}
// 사용 예시
let mut manager = ListenerManager::new();
manager.register_listener(
"user_logger",
"user:*",
Arc::new(|event_data| {
println!("사용자 이벤트: {}", event_data.name());
Ok(())
})
)?;
manager.register_listener(
"error_handler",
"error:*",
Arc::new(|event_data| {
eprintln!("에러 이벤트: {}", event_data.payload());
Ok(())
})
)?;
manager.list_listeners();
고급 리스너 패턴
조건부 리스너
use orbital::event::{EventListener, EventCallback};
use std::sync::Arc;
// 조건부 처리 리스너
let conditional_callback: EventCallback = Arc::new(|event_data| {
match event_data.name() {
"user:login" => {
if let Some(user_type) = event_data.payload().get("user_type") {
if user_type.as_str() == Some("premium") {
println!("🌟 프리미엄 사용자 로그인: {}", event_data.payload());
// 프리미엄 사용자 전용 처리
} else {
println!("👤 일반 사용자 로그인");
}
}
},
"user:logout" => {
println!("👋 사용자 로그아웃");
},
_ => {
println!("🔍 기타 사용자 이벤트: {}", event_data.name());
}
}
Ok(())
});
let conditional_listener = EventListener::new("user:*", conditional_callback);
상태 기반 리스너
use orbital::event::{EventListener, EventCallback};
use std::sync::{Arc, Mutex};
// 상태를 유지하는 리스너
struct StatefulListener {
counter: Arc<Mutex<u32>>,
threshold: u32,
}
impl StatefulListener {
fn new(threshold: u32) -> Self {
Self {
counter: Arc::new(Mutex::new(0)),
threshold,
}
}
fn create_callback(&self) -> EventCallback {
let counter = Arc::clone(&self.counter);
let threshold = self.threshold;
Arc::new(move |event_data| {
let mut count = counter.lock().unwrap();
*count += 1;
if *count % threshold == 0 {
println!("🎯 임계값 도달! {} 번째 이벤트: {}", count, event_data.name());
} else {
println!("📊 이벤트 카운트: {} - {}", count, event_data.name());
}
Ok(())
})
}
}
// 사용 예시
let stateful_listener = StatefulListener::new(10);
let callback = stateful_listener.create_callback();
let listener = EventListener::new("*", callback);
배치 처리 리스너
use orbital::event::{EventListener, EventCallback, EventData};
use std::sync::{Arc, Mutex};
use std::collections::VecDeque;
use tokio::time::{interval, Duration};
struct BatchListener {
batch: Arc<Mutex<VecDeque<EventData>>>,
batch_size: usize,
}
impl BatchListener {
fn new(batch_size: usize) -> Self {
let batch_listener = Self {
batch: Arc::new(Mutex::new(VecDeque::new())),
batch_size,
};
// 주기적으로 배치 처리
let batch_clone = Arc::clone(&batch_listener.batch);
let size = batch_size;
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(5));
loop {
interval.tick().await;
let mut batch = batch_clone.lock().unwrap();
if !batch.is_empty() {
println!("🔄 배치 처리 시작 ({} 개 이벤트)", batch.len());
// 배치 처리 로직
while let Some(event) = batch.pop_front() {
println!(" 처리 중: {}", event.name());
}
println!("✅ 배치 처리 완료");
}
}
});
batch_listener
}
fn create_callback(&self) -> EventCallback {
let batch = Arc::clone(&self.batch);
let batch_size = self.batch_size;
Arc::new(move |event_data| {
let mut batch_queue = batch.lock().unwrap();
batch_queue.push_back(event_data.clone());
// 배치 크기에 도달하면 즉시 처리
if batch_queue.len() >= batch_size {
println!("🚀 배치 크기 도달, 즉시 처리 시작");
while let Some(event) = batch_queue.pop_front() {
println!(" 즉시 처리: {}", event.name());
}
}
Ok(())
})
}
}
// 사용 예시
let batch_listener = BatchListener::new(5);
let callback = batch_listener.create_callback();
let listener = EventListener::new("batch:*", callback);
리스너 체인
순차 처리 체인
use orbital::event::{EventListener, EventCallback, EventData};
use std::sync::Arc;
struct ListenerChain {
handlers: Vec<EventCallback>,
}
impl ListenerChain {
fn new() -> Self {
Self {
handlers: Vec::new(),
}
}
fn add_handler(mut self, handler: EventCallback) -> Self {
self.handlers.push(handler);
self
}
fn create_callback(self) -> EventCallback {
Arc::new(move |event_data| {
for (index, handler) in self.handlers.iter().enumerate() {
println!("🔗 체인 단계 {} 실행", index + 1);
if let Err(e) = handler(event_data.clone()) {
eprintln!("❌ 체인 단계 {} 실패: {}", index + 1, e);
return Err(e);
}
}
println!("✅ 체인 처리 완료");
Ok(())
})
}
}
// 체인 구성
let chain_callback = ListenerChain::new()
.add_handler(Arc::new(|event_data| {
println!("1️⃣ 검증 단계: {}", event_data.name());
// 검증 로직
Ok(())
}))
.add_handler(Arc::new(|event_data| {
println!("2️⃣ 변환 단계: {}", event_data.name());
// 데이터 변환 로직
Ok(())
}))
.add_handler(Arc::new(|event_data| {
println!("3️⃣ 저장 단계: {}", event_data.name());
// 데이터 저장 로직
Ok(())
}))
.create_callback();
let chain_listener = EventListener::new("data:process", chain_callback);
병렬 처리 체인
use orbital::event::{EventListener, EventCallback};
use std::sync::Arc;
use tokio::task::JoinSet;
struct ParallelListenerChain {
handlers: Vec<EventCallback>,
}
impl ParallelListenerChain {
fn new() -> Self {
Self {
handlers: Vec::new(),
}
}
fn add_handler(mut self, handler: EventCallback) -> Self {
self.handlers.push(handler);
self
}
fn create_callback(self) -> EventCallback {
Arc::new(move |event_data| {
let mut join_set = JoinSet::new();
// 모든 핸들러를 병렬로 실행
for (index, handler) in self.handlers.iter().enumerate() {
let handler_clone = Arc::clone(handler);
let data_clone = event_data.clone();
join_set.spawn(async move {
println!("🚀 병렬 처리 {} 시작", index + 1);
let result = handler_clone(data_clone);
println!("✅ 병렬 처리 {} 완료", index + 1);
result
});
}
// 모든 핸들러 완료 대기
tokio::spawn(async move {
let mut success_count = 0;
let mut error_count = 0;
while let Some(result) = join_set.join_next().await {
match result {
Ok(Ok(())) => success_count += 1,
Ok(Err(e)) => {
error_count += 1;
eprintln!("❌ 병렬 처리 실패: {}", e);
},
Err(e) => {
error_count += 1;
eprintln!("❌ 태스크 실패: {}", e);
}
}
}
println!("📊 병렬 처리 결과: 성공 {}, 실패 {}", success_count, error_count);
});
Ok(())
})
}
}
// 병렬 체인 구성
let parallel_callback = ParallelListenerChain::new()
.add_handler(Arc::new(|event_data| {
println!("📧 이메일 발송 처리: {}", event_data.name());
std::thread::sleep(std::time::Duration::from_millis(100));
Ok(())
}))
.add_handler(Arc::new(|event_data| {
println!("📱 푸시 알림 처리: {}", event_data.name());
std::thread::sleep(std::time::Duration::from_millis(80));
Ok(())
}))
.add_handler(Arc::new(|event_data| {
println!("📊 분석 데이터 전송: {}", event_data.name());
std::thread::sleep(std::time::Duration::from_millis(50));
Ok(())
}))
.create_callback();
let parallel_listener = EventListener::new("notification:send", parallel_callback);
리스너 성능 최적화
비동기 리스너
use orbital::event::{EventListener, EventCallback};
use std::sync::Arc;
use tokio::time::{sleep, Duration};
// 비동기 처리 리스너
let async_callback: EventCallback = Arc::new(|event_data| {
let data = event_data.clone();
// 백그라운드에서 비동기 처리
tokio::spawn(async move {
println!("🔄 비동기 처리 시작: {}", data.name());
// 외부 API 호출 시뮬레이션
sleep(Duration::from_millis(200)).await;
// 데이터베이스 저장 시뮬레이션
sleep(Duration::from_millis(100)).await;
println!("✅ 비동기 처리 완료: {}", data.name());
});
// 즉시 반환하여 블로킹 방지
Ok(())
});
let async_listener = EventListener::new("async:*", async_callback);
우선순위 기반 리스너
use orbital::event::{EventListener, EventCallback, EventData};
use std::sync::{Arc, Mutex};
use std::collections::BinaryHeap;
use std::cmp::Ordering;
#[derive(Clone)]
struct PriorityEvent {
event: EventData,
priority: u8, // 높을수록 우선순위 높음
}
impl PartialEq for PriorityEvent {
fn eq(&self, other: &Self) -> bool {
self.priority == other.priority
}
}
impl Eq for PriorityEvent {}
impl PartialOrd for PriorityEvent {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PriorityEvent {
fn cmp(&self, other: &Self) -> Ordering {
self.priority.cmp(&other.priority)
}
}
struct PriorityListener {
queue: Arc<Mutex<BinaryHeap<PriorityEvent>>>,
}
impl PriorityListener {
fn new() -> Self {
let listener = Self {
queue: Arc::new(Mutex::new(BinaryHeap::new())),
};
// 백그라운드 처리기
let queue_clone = Arc::clone(&listener.queue);
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
let mut queue = queue_clone.lock().unwrap();
if let Some(priority_event) = queue.pop() {
drop(queue); // 락 해제
println!("🎯 우선순위 {} 이벤트 처리: {}",
priority_event.priority,
priority_event.event.name()
);
// 우선순위에 따른 처리
match priority_event.priority {
9..=10 => {
println!("🚨 긴급 처리");
// 즉시 처리
},
6..=8 => {
println!("⚡ 높은 우선순위 처리");
// 빠른 처리
},
3..=5 => {
println!("📋 일반 처리");
// 일반 처리
},
_ => {
println!("🐌 낮은 우선순위 처리");
// 지연 처리
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
}
}
});
listener
}
fn create_callback(&self) -> EventCallback {
let queue = Arc::clone(&self.queue);
Arc::new(move |event_data| {
// 이벤트 타입에 따른 우선순위 결정
let priority = match event_data.name() {
name if name.contains("critical") => 10,
name if name.contains("error") => 8,
name if name.contains("warning") => 6,
name if name.contains("user") => 4,
_ => 2,
};
let priority_event = PriorityEvent {
event: event_data.clone(),
priority,
};
let mut queue_guard = queue.lock().unwrap();
queue_guard.push(priority_event);
println!("📥 이벤트 큐에 추가 (우선순위: {}): {}", priority, event_data.name());
Ok(())
})
}
}
// 사용 예시
let priority_listener = PriorityListener::new();
let callback = priority_listener.create_callback();
let listener = EventListener::new("*", callback);
리스너 디버깅
디버그 리스너
use orbital::event::{EventListener, EventCallback};
use std::sync::Arc;
use std::time::Instant;
struct DebugListener {
name: String,
}
impl DebugListener {
fn new(name: &str) -> Self {
Self {
name: name.to_string(),
}
}
fn create_callback(&self) -> EventCallback {
let name = self.name.clone();
Arc::new(move |event_data| {
let start_time = Instant::now();
println!("🔍 [{}] 이벤트 처리 시작: {}", name, event_data.name());
println!("📄 [{}] 페이로드 크기: {} bytes",
name,
event_data.payload().to_string().len()
);
// 실제 처리 로직 (시뮬레이션)
std::thread::sleep(std::time::Duration::from_millis(10));
let elapsed = start_time.elapsed();
println!("⏱️ [{}] 처리 시간: {:?}", name, elapsed);
// 메모리 사용량 체크 (간단한 예시)
if elapsed.as_millis() > 100 {
println!("⚠️ [{}] 처리 시간이 긴 이벤트 감지", name);
}
Ok(())
})
}
}
// 디버그 리스너 사용
let debug_listener = DebugListener::new("UserEventDebugger");
let debug_callback = debug_listener.create_callback();
let listener = EventListener::new("user:*", debug_callback);
다음 단계
이벤트 리스너에 대해 알아보았다면, 다음 문서들을 확인해보세요:
- 커스텀 이벤트 - 커스텀 이벤트 생성과 처리
- 이벤트 패턴 - 이벤트 체이닝과 필터링
- 성능 최적화 - 이벤트 시스템 최적화
- Event System 시작하기 - 기본 사용법