이벤트 리스너

Last update - 2025. 9. 5.

개요

이벤트 리스너는 특정 이벤트를 수신하고 처리하는 핵심 컴포넌트입니다. 다양한 패턴과 고급 기능을 활용하여 효율적인 이벤트 처리 시스템을 구축할 수 있습니다.

리스너 등록과 관리

기본 리스너 등록

use orbital::event::{EventListener, EventCallback, EventEmitter};
use std::sync::Arc;

let emitter = Arc::new(EventEmitter::new());

// 단순 콜백 리스너
let simple_callback: EventCallback = Arc::new(|event_data| {
    println!("이벤트 수신: {} - {}", event_data.name(), event_data.payload());
    Ok(())
});

let listener = EventListener::new("user:login", simple_callback);
let listener_id = emitter.add_listener(listener)?;

println!("리스너 등록됨, ID: {}", listener_id);

여러 리스너 관리

use orbital::event::{EventListener, EventCallback, EventEmitter};
use std::sync::Arc;
use std::collections::HashMap;

struct ListenerManager {
    emitter: Arc<EventEmitter>,
    listeners: HashMap<String, String>, // name -> listener_id
}

impl ListenerManager {
    fn new() -> Self {
        Self {
            emitter: Arc::new(EventEmitter::new()),
            listeners: HashMap::new(),
        }
    }

    fn register_listener(&mut self, name: &str, event_name: &str, callback: EventCallback) -> Result<(), Box<dyn std::error::Error>> {
        let listener = EventListener::new(event_name, callback);
        let listener_id = self.emitter.add_listener(listener)?;
        self.listeners.insert(name.to_string(), listener_id);

        println!("리스너 '{}' 등록: {} -> {}", name, event_name, listener_id);
        Ok(())
    }

    fn unregister_listener(&mut self, name: &str) -> Result<(), Box<dyn std::error::Error>> {
        if let Some(listener_id) = self.listeners.remove(name) {
            self.emitter.remove_listener(&listener_id)?;
            println!("리스너 '{}' 제거됨", name);
        }
        Ok(())
    }

    fn list_listeners(&self) {
        println!("등록된 리스너들:");
        for (name, id) in &self.listeners {
            println!("  {} -> {}", name, id);
        }
    }
}

// 사용 예시
let mut manager = ListenerManager::new();

manager.register_listener(
    "user_logger",
    "user:*",
    Arc::new(|event_data| {
        println!("사용자 이벤트: {}", event_data.name());
        Ok(())
    })
)?;

manager.register_listener(
    "error_handler",
    "error:*",
    Arc::new(|event_data| {
        eprintln!("에러 이벤트: {}", event_data.payload());
        Ok(())
    })
)?;

manager.list_listeners();

고급 리스너 패턴

조건부 리스너

use orbital::event::{EventListener, EventCallback};
use std::sync::Arc;

// 조건부 처리 리스너
let conditional_callback: EventCallback = Arc::new(|event_data| {
    match event_data.name() {
        "user:login" => {
            if let Some(user_type) = event_data.payload().get("user_type") {
                if user_type.as_str() == Some("premium") {
                    println!("🌟 프리미엄 사용자 로그인: {}", event_data.payload());
                    // 프리미엄 사용자 전용 처리
                } else {
                    println!("👤 일반 사용자 로그인");
                }
            }
        },
        "user:logout" => {
            println!("👋 사용자 로그아웃");
        },
        _ => {
            println!("🔍 기타 사용자 이벤트: {}", event_data.name());
        }
    }
    Ok(())
});

let conditional_listener = EventListener::new("user:*", conditional_callback);

상태 기반 리스너

use orbital::event::{EventListener, EventCallback};
use std::sync::{Arc, Mutex};

// 상태를 유지하는 리스너
struct StatefulListener {
    counter: Arc<Mutex<u32>>,
    threshold: u32,
}

impl StatefulListener {
    fn new(threshold: u32) -> Self {
        Self {
            counter: Arc::new(Mutex::new(0)),
            threshold,
        }
    }

    fn create_callback(&self) -> EventCallback {
        let counter = Arc::clone(&self.counter);
        let threshold = self.threshold;

        Arc::new(move |event_data| {
            let mut count = counter.lock().unwrap();
            *count += 1;

            if *count % threshold == 0 {
                println!("🎯 임계값 도달! {} 번째 이벤트: {}", count, event_data.name());
            } else {
                println!("📊 이벤트 카운트: {} - {}", count, event_data.name());
            }

            Ok(())
        })
    }
}

// 사용 예시
let stateful_listener = StatefulListener::new(10);
let callback = stateful_listener.create_callback();
let listener = EventListener::new("*", callback);

배치 처리 리스너

use orbital::event::{EventListener, EventCallback, EventData};
use std::sync::{Arc, Mutex};
use std::collections::VecDeque;
use tokio::time::{interval, Duration};

struct BatchListener {
    batch: Arc<Mutex<VecDeque<EventData>>>,
    batch_size: usize,
}

impl BatchListener {
    fn new(batch_size: usize) -> Self {
        let batch_listener = Self {
            batch: Arc::new(Mutex::new(VecDeque::new())),
            batch_size,
        };

        // 주기적으로 배치 처리
        let batch_clone = Arc::clone(&batch_listener.batch);
        let size = batch_size;
        tokio::spawn(async move {
            let mut interval = interval(Duration::from_secs(5));

            loop {
                interval.tick().await;
                let mut batch = batch_clone.lock().unwrap();

                if !batch.is_empty() {
                    println!("🔄 배치 처리 시작 ({} 개 이벤트)", batch.len());

                    // 배치 처리 로직
                    while let Some(event) = batch.pop_front() {
                        println!("  처리 중: {}", event.name());
                    }

                    println!("✅ 배치 처리 완료");
                }
            }
        });

        batch_listener
    }

    fn create_callback(&self) -> EventCallback {
        let batch = Arc::clone(&self.batch);
        let batch_size = self.batch_size;

        Arc::new(move |event_data| {
            let mut batch_queue = batch.lock().unwrap();
            batch_queue.push_back(event_data.clone());

            // 배치 크기에 도달하면 즉시 처리
            if batch_queue.len() >= batch_size {
                println!("🚀 배치 크기 도달, 즉시 처리 시작");

                while let Some(event) = batch_queue.pop_front() {
                    println!("  즉시 처리: {}", event.name());
                }
            }

            Ok(())
        })
    }
}

// 사용 예시
let batch_listener = BatchListener::new(5);
let callback = batch_listener.create_callback();
let listener = EventListener::new("batch:*", callback);

리스너 체인

순차 처리 체인

use orbital::event::{EventListener, EventCallback, EventData};
use std::sync::Arc;

struct ListenerChain {
    handlers: Vec<EventCallback>,
}

impl ListenerChain {
    fn new() -> Self {
        Self {
            handlers: Vec::new(),
        }
    }

    fn add_handler(mut self, handler: EventCallback) -> Self {
        self.handlers.push(handler);
        self
    }

    fn create_callback(self) -> EventCallback {
        Arc::new(move |event_data| {
            for (index, handler) in self.handlers.iter().enumerate() {
                println!("🔗 체인 단계 {} 실행", index + 1);

                if let Err(e) = handler(event_data.clone()) {
                    eprintln!("❌ 체인 단계 {} 실패: {}", index + 1, e);
                    return Err(e);
                }
            }

            println!("✅ 체인 처리 완료");
            Ok(())
        })
    }
}

// 체인 구성
let chain_callback = ListenerChain::new()
    .add_handler(Arc::new(|event_data| {
        println!("1️⃣ 검증 단계: {}", event_data.name());
        // 검증 로직
        Ok(())
    }))
    .add_handler(Arc::new(|event_data| {
        println!("2️⃣ 변환 단계: {}", event_data.name());
        // 데이터 변환 로직
        Ok(())
    }))
    .add_handler(Arc::new(|event_data| {
        println!("3️⃣ 저장 단계: {}", event_data.name());
        // 데이터 저장 로직
        Ok(())
    }))
    .create_callback();

let chain_listener = EventListener::new("data:process", chain_callback);

병렬 처리 체인

use orbital::event::{EventListener, EventCallback};
use std::sync::Arc;
use tokio::task::JoinSet;

struct ParallelListenerChain {
    handlers: Vec<EventCallback>,
}

impl ParallelListenerChain {
    fn new() -> Self {
        Self {
            handlers: Vec::new(),
        }
    }

    fn add_handler(mut self, handler: EventCallback) -> Self {
        self.handlers.push(handler);
        self
    }

    fn create_callback(self) -> EventCallback {
        Arc::new(move |event_data| {
            let mut join_set = JoinSet::new();

            // 모든 핸들러를 병렬로 실행
            for (index, handler) in self.handlers.iter().enumerate() {
                let handler_clone = Arc::clone(handler);
                let data_clone = event_data.clone();

                join_set.spawn(async move {
                    println!("🚀 병렬 처리 {} 시작", index + 1);
                    let result = handler_clone(data_clone);
                    println!("✅ 병렬 처리 {} 완료", index + 1);
                    result
                });
            }

            // 모든 핸들러 완료 대기
            tokio::spawn(async move {
                let mut success_count = 0;
                let mut error_count = 0;

                while let Some(result) = join_set.join_next().await {
                    match result {
                        Ok(Ok(())) => success_count += 1,
                        Ok(Err(e)) => {
                            error_count += 1;
                            eprintln!("❌ 병렬 처리 실패: {}", e);
                        },
                        Err(e) => {
                            error_count += 1;
                            eprintln!("❌ 태스크 실패: {}", e);
                        }
                    }
                }

                println!("📊 병렬 처리 결과: 성공 {}, 실패 {}", success_count, error_count);
            });

            Ok(())
        })
    }
}

// 병렬 체인 구성
let parallel_callback = ParallelListenerChain::new()
    .add_handler(Arc::new(|event_data| {
        println!("📧 이메일 발송 처리: {}", event_data.name());
        std::thread::sleep(std::time::Duration::from_millis(100));
        Ok(())
    }))
    .add_handler(Arc::new(|event_data| {
        println!("📱 푸시 알림 처리: {}", event_data.name());
        std::thread::sleep(std::time::Duration::from_millis(80));
        Ok(())
    }))
    .add_handler(Arc::new(|event_data| {
        println!("📊 분석 데이터 전송: {}", event_data.name());
        std::thread::sleep(std::time::Duration::from_millis(50));
        Ok(())
    }))
    .create_callback();

let parallel_listener = EventListener::new("notification:send", parallel_callback);

리스너 성능 최적화

비동기 리스너

use orbital::event::{EventListener, EventCallback};
use std::sync::Arc;
use tokio::time::{sleep, Duration};

// 비동기 처리 리스너
let async_callback: EventCallback = Arc::new(|event_data| {
    let data = event_data.clone();

    // 백그라운드에서 비동기 처리
    tokio::spawn(async move {
        println!("🔄 비동기 처리 시작: {}", data.name());

        // 외부 API 호출 시뮬레이션
        sleep(Duration::from_millis(200)).await;

        // 데이터베이스 저장 시뮬레이션
        sleep(Duration::from_millis(100)).await;

        println!("✅ 비동기 처리 완료: {}", data.name());
    });

    // 즉시 반환하여 블로킹 방지
    Ok(())
});

let async_listener = EventListener::new("async:*", async_callback);

우선순위 기반 리스너

use orbital::event::{EventListener, EventCallback, EventData};
use std::sync::{Arc, Mutex};
use std::collections::BinaryHeap;
use std::cmp::Ordering;

#[derive(Clone)]
struct PriorityEvent {
    event: EventData,
    priority: u8, // 높을수록 우선순위 높음
}

impl PartialEq for PriorityEvent {
    fn eq(&self, other: &Self) -> bool {
        self.priority == other.priority
    }
}

impl Eq for PriorityEvent {}

impl PartialOrd for PriorityEvent {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for PriorityEvent {
    fn cmp(&self, other: &Self) -> Ordering {
        self.priority.cmp(&other.priority)
    }
}

struct PriorityListener {
    queue: Arc<Mutex<BinaryHeap<PriorityEvent>>>,
}

impl PriorityListener {
    fn new() -> Self {
        let listener = Self {
            queue: Arc::new(Mutex::new(BinaryHeap::new())),
        };

        // 백그라운드 처리기
        let queue_clone = Arc::clone(&listener.queue);
        tokio::spawn(async move {
            loop {
                tokio::time::sleep(Duration::from_millis(100)).await;

                let mut queue = queue_clone.lock().unwrap();
                if let Some(priority_event) = queue.pop() {
                    drop(queue); // 락 해제

                    println!("🎯 우선순위 {} 이벤트 처리: {}",
                        priority_event.priority,
                        priority_event.event.name()
                    );

                    // 우선순위에 따른 처리
                    match priority_event.priority {
                        9..=10 => {
                            println!("🚨 긴급 처리");
                            // 즉시 처리
                        },
                        6..=8 => {
                            println!("⚡ 높은 우선순위 처리");
                            // 빠른 처리
                        },
                        3..=5 => {
                            println!("📋 일반 처리");
                            // 일반 처리
                        },
                        _ => {
                            println!("🐌 낮은 우선순위 처리");
                            // 지연 처리
                            tokio::time::sleep(Duration::from_millis(50)).await;
                        }
                    }
                }
            }
        });

        listener
    }

    fn create_callback(&self) -> EventCallback {
        let queue = Arc::clone(&self.queue);

        Arc::new(move |event_data| {
            // 이벤트 타입에 따른 우선순위 결정
            let priority = match event_data.name() {
                name if name.contains("critical") => 10,
                name if name.contains("error") => 8,
                name if name.contains("warning") => 6,
                name if name.contains("user") => 4,
                _ => 2,
            };

            let priority_event = PriorityEvent {
                event: event_data.clone(),
                priority,
            };

            let mut queue_guard = queue.lock().unwrap();
            queue_guard.push(priority_event);

            println!("📥 이벤트 큐에 추가 (우선순위: {}): {}", priority, event_data.name());

            Ok(())
        })
    }
}

// 사용 예시
let priority_listener = PriorityListener::new();
let callback = priority_listener.create_callback();
let listener = EventListener::new("*", callback);

리스너 디버깅

디버그 리스너

use orbital::event::{EventListener, EventCallback};
use std::sync::Arc;
use std::time::Instant;

struct DebugListener {
    name: String,
}

impl DebugListener {
    fn new(name: &str) -> Self {
        Self {
            name: name.to_string(),
        }
    }

    fn create_callback(&self) -> EventCallback {
        let name = self.name.clone();

        Arc::new(move |event_data| {
            let start_time = Instant::now();

            println!("🔍 [{}] 이벤트 처리 시작: {}", name, event_data.name());
            println!("📄 [{}] 페이로드 크기: {} bytes",
                name,
                event_data.payload().to_string().len()
            );

            // 실제 처리 로직 (시뮬레이션)
            std::thread::sleep(std::time::Duration::from_millis(10));

            let elapsed = start_time.elapsed();
            println!("⏱️ [{}] 처리 시간: {:?}", name, elapsed);

            // 메모리 사용량 체크 (간단한 예시)
            if elapsed.as_millis() > 100 {
                println!("⚠️ [{}] 처리 시간이 긴 이벤트 감지", name);
            }

            Ok(())
        })
    }
}

// 디버그 리스너 사용
let debug_listener = DebugListener::new("UserEventDebugger");
let debug_callback = debug_listener.create_callback();
let listener = EventListener::new("user:*", debug_callback);

다음 단계

이벤트 리스너에 대해 알아보았다면, 다음 문서들을 확인해보세요: