kards-env/kards_battle/events/event_system.py

203 lines
6.7 KiB
Python
Raw Permalink Normal View History

2025-09-05 17:05:43 +08:00
"""
KARDS 事件系统核心
实现事件驱动架构解耦引擎逻辑与关键词效果
"""
from enum import Enum, auto
from typing import Dict, List, Any, Optional, Union, Callable
from dataclasses import dataclass, field
from collections import defaultdict
from abc import ABC, abstractmethod
import logging
from uuid import UUID
# 避免循环导入
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..units.unit import Unit
from ..battlefield.battlefield import HQ
class EventType(Enum):
"""事件类型枚举"""
# 单位生命周期事件
UNIT_DEPLOYED = auto() # 单位部署
UNIT_MOVED = auto() # 单位移动
UNIT_DESTROYED = auto() # 单位被摧毁
# 位置和状态变化
UNIT_POSITION_CHANGED = auto() # 位置改变
KEYWORD_ADDED = auto() # 获得关键词
KEYWORD_REMOVED = auto() # 失去关键词
# 战斗事件
BEFORE_ATTACK_CHECK = auto() # 攻击前检查(可取消)
UNIT_ATTACKED = auto() # 单位攻击
UNIT_TAKES_DAMAGE = auto() # 单位受到伤害
UNIT_COUNTER_ATTACKS = auto() # 反击
# 回合事件
TURN_START = auto() # 回合开始
TURN_END = auto() # 回合结束
# 游戏事件
GAME_START = auto() # 游戏开始
GAME_END = auto() # 游戏结束
@dataclass
class GameEvent:
"""游戏事件数据结构"""
event_type: EventType
source: Optional['Unit'] = None # 事件源单位
target: Optional[Union['Unit', 'HQ']] = None # 事件目标
data: Dict[str, Any] = field(default_factory=dict) # 事件数据
# 可取消事件支持
cancellable: bool = False
cancelled: bool = False
cancel_reason: str = ""
# 事件优先级(数字越小优先级越高)
priority: int = 100
def cancel(self, reason: str = ""):
"""取消事件"""
if self.cancellable:
self.cancelled = True
self.cancel_reason = reason
else:
raise RuntimeError(f"Event {self.event_type} is not cancellable")
class EventSubscriber(ABC):
"""事件订阅者抽象基类"""
@abstractmethod
def get_subscribed_events(self) -> List[EventType]:
"""返回此订阅者关心的事件类型列表"""
pass
@abstractmethod
def handle_event(self, event: GameEvent) -> None:
"""处理事件"""
pass
def get_priority(self, event_type: EventType) -> int:
"""返回处理特定事件类型的优先级(数字越小优先级越高)"""
return 100
def can_handle_event(self, event: GameEvent) -> bool:
"""判断是否应该处理此事件(可被子类重写)"""
return event.event_type in self.get_subscribed_events()
class EventBus:
"""事件总线 - 管理事件发布和订阅"""
def __init__(self):
self._subscribers: Dict[EventType, List[EventSubscriber]] = defaultdict(list)
self._event_queue: List[GameEvent] = []
self._processing = False
self._logger = logging.getLogger(__name__)
def subscribe(self, subscriber: EventSubscriber) -> None:
"""订阅事件"""
for event_type in subscriber.get_subscribed_events():
if subscriber not in self._subscribers[event_type]:
self._subscribers[event_type].append(subscriber)
# 按优先级排序
self._subscribers[event_type].sort(
key=lambda s: s.get_priority(event_type)
)
self._logger.debug(f"Subscribed {subscriber.__class__.__name__} to events")
def unsubscribe(self, subscriber: EventSubscriber) -> None:
"""取消订阅"""
for event_type in subscriber.get_subscribed_events():
if subscriber in self._subscribers[event_type]:
self._subscribers[event_type].remove(subscriber)
self._logger.debug(f"Unsubscribed {subscriber.__class__.__name__}")
def publish(self, event: GameEvent) -> GameEvent:
"""发布事件并返回处理后的事件"""
self._event_queue.append(event)
if not self._processing:
self._process_events()
return event
def publish_immediate(self, event: GameEvent) -> GameEvent:
"""立即处理事件,不加入队列"""
return self._handle_event(event)
def _process_events(self) -> None:
"""处理事件队列"""
self._processing = True
try:
while self._event_queue:
event = self._event_queue.pop(0)
self._handle_event(event)
finally:
self._processing = False
def _handle_event(self, event: GameEvent) -> GameEvent:
"""处理单个事件"""
self._logger.debug(f"Processing event: {event.event_type}")
# 获取订阅此事件的所有订阅者
subscribers = self._subscribers[event.event_type]
for subscriber in subscribers:
if not subscriber.can_handle_event(event):
continue
try:
subscriber.handle_event(event)
# 如果事件被取消,停止处理
if event.cancelled:
self._logger.debug(f"Event cancelled by {subscriber.__class__.__name__}: {event.cancel_reason}")
break
except Exception as e:
self._logger.error(f"Error in {subscriber.__class__.__name__} handling {event.event_type}: {e}")
# 继续处理其他订阅者,不让一个错误影响整个系统
return event
def get_subscriber_count(self, event_type: EventType) -> int:
"""获取特定事件类型的订阅者数量"""
return len(self._subscribers[event_type])
def clear_all_subscribers(self) -> None:
"""清空所有订阅者(主要用于测试)"""
self._subscribers.clear()
self._event_queue.clear()
# 全局事件总线实例
_global_event_bus = EventBus()
def get_event_bus() -> EventBus:
"""获取全局事件总线实例"""
return _global_event_bus
def publish_event(event: GameEvent) -> GameEvent:
"""发布事件到全局事件总线"""
return _global_event_bus.publish(event)
def subscribe_to_events(subscriber: EventSubscriber) -> None:
"""订阅全局事件总线"""
_global_event_bus.subscribe(subscriber)
def unsubscribe_from_events(subscriber: EventSubscriber) -> None:
"""从全局事件总线取消订阅"""
_global_event_bus.unsubscribe(subscriber)