kards-env/archive/game_engine_v2.py

511 lines
19 KiB
Python
Raw Normal View History

2025-09-05 17:05:43 +08:00
"""
更新的游戏引擎
适配新的3条战线战场系统
"""
from typing import Dict, List, Optional, Any, Union
from uuid import UUID
from ..battlefield.battlefield_v2 import Battlefield, HQ
from ..units.unit import Unit
from ..abilities.keywords import TriggerTiming, KeywordRegistry
from ..abilities.abilities import AbilityManager
from .enums import GamePhase, LineType
class GameEvent:
"""游戏事件"""
def __init__(self, event_type: str, source: Optional[Unit] = None,
target: Optional[Union[Unit, HQ]] = None, context: Dict[str, Any] = None):
self.event_type = event_type
self.source = source
self.target = target
self.context = context or {}
self.timestamp = 0
class EventManager:
"""事件管理器"""
def __init__(self):
self.event_queue = []
self.event_listeners = {}
self.event_counter = 0
def add_event(self, event: GameEvent) -> None:
"""添加事件到队列"""
event.timestamp = self.event_counter
self.event_counter += 1
self.event_queue.append(event)
def process_events(self, battlefield: Battlefield) -> List[Dict[str, Any]]:
"""处理所有事件"""
results = []
while self.event_queue:
event = self.event_queue.pop(0)
result = self._process_single_event(event, battlefield)
if result:
results.append(result)
return results
def _process_single_event(self, event: GameEvent, battlefield: Battlefield) -> Optional[Dict[str, Any]]:
"""处理单个事件"""
timing_map = {
"unit_deployed": TriggerTiming.ON_DEPLOY,
"unit_attacks": TriggerTiming.ON_ATTACK,
"unit_defends": TriggerTiming.ON_DEFEND,
"unit_takes_damage": TriggerTiming.ON_DAMAGE_TAKEN,
"unit_dies": TriggerTiming.ON_DEATH,
"turn_start": TriggerTiming.ON_TURN_START,
"turn_end": TriggerTiming.ON_TURN_END,
"before_combat": TriggerTiming.BEFORE_COMBAT,
"after_combat": TriggerTiming.AFTER_COMBAT
}
trigger_timing = timing_map.get(event.event_type)
if not trigger_timing:
return None
# 触发所有相关单位的能力
all_units = battlefield.get_all_units()
ability_manager = AbilityManager()
results = []
for unit in all_units:
# 触发关键词效果
self._trigger_keyword_effects(unit, battlefield, trigger_timing, event.context)
# 触发能力
unit_results = ability_manager.trigger_abilities(unit, battlefield, trigger_timing, event.context)
results.extend(unit_results)
return {
"event": event.event_type,
"results": results
} if results else None
def _trigger_keyword_effects(self, unit: Unit, battlefield: Battlefield,
timing: TriggerTiming, context: Dict[str, Any]) -> None:
"""触发单位的关键词效果"""
for keyword_name in unit.keywords:
keyword_effect = KeywordRegistry.create_keyword(keyword_name)
if keyword_effect:
keyword_effect.apply_effect(unit, battlefield, timing, context)
class CombatResolver:
"""战斗解算器"""
def __init__(self, event_manager: EventManager):
self.event_manager = event_manager
def resolve_combat(self, attacker: Unit, target: Union[Unit, HQ],
battlefield: Battlefield) -> Dict[str, Any]:
"""解算战斗"""
# 战斗前事件
self.event_manager.add_event(GameEvent(
"before_combat",
source=attacker,
target=target,
context={"combat_type": "attack"}
))
# HQ战斗特殊处理
if isinstance(target, HQ):
return self._resolve_hq_attack(attacker, target, battlefield)
# 单位间战斗
return self._resolve_unit_combat(attacker, target, battlefield)
def _resolve_hq_attack(self, attacker: Unit, hq: HQ, battlefield: Battlefield) -> Dict[str, Any]:
"""解算对HQ的攻击"""
base_damage = attacker.get_effective_attack()
actual_damage = hq.take_damage(base_damage)
# HQ不会反击
result = {
"attacker_id": attacker.id,
"target_type": "hq",
"target_player": hq.player_id,
"damage_dealt": actual_damage,
"counter_damage": 0,
"hq_destroyed": hq.is_destroyed(),
"combat_resolved": True
}
self.event_manager.add_event(GameEvent(
"after_combat",
source=attacker,
target=hq,
context={"damage_dealt": actual_damage, "counter_damage": 0}
))
return result
def _resolve_unit_combat(self, attacker: Unit, target: Unit, battlefield: Battlefield) -> Dict[str, Any]:
"""解算单位间战斗"""
# 处理伏击
if target.has_keyword("AMBUSH"):
ambush_result = self._resolve_ambush(target, attacker, battlefield)
if ambush_result.get("attacker_destroyed"):
return ambush_result
# 计算最终伤害
base_damage = attacker.get_effective_attack()
final_damage = self._calculate_damage(base_damage, target)
# 应用伤害
actual_damage = target.stats.take_damage(final_damage)
# 触发受伤事件
self.event_manager.add_event(GameEvent(
"unit_takes_damage",
source=attacker,
target=target,
context={"damage": actual_damage}
))
# 反击
counter_damage = 0
if target.stats.is_alive() and self._can_counter_attack(target, attacker):
counter_base_damage = target.get_effective_attack()
counter_final_damage = self._calculate_damage(counter_base_damage, attacker)
counter_damage = attacker.stats.take_damage(counter_final_damage)
if counter_damage > 0:
self.event_manager.add_event(GameEvent(
"unit_takes_damage",
source=target,
target=attacker,
context={"damage": counter_damage, "is_counter": True}
))
# 检查单位死亡
units_destroyed = []
if not target.stats.is_alive():
self.event_manager.add_event(GameEvent("unit_dies", target=target))
battlefield.remove_unit(target)
units_destroyed.append(target.id)
if not attacker.stats.is_alive():
self.event_manager.add_event(GameEvent("unit_dies", target=attacker))
battlefield.remove_unit(attacker)
units_destroyed.append(attacker.id)
# 战斗后事件
self.event_manager.add_event(GameEvent(
"after_combat",
source=attacker,
target=target,
context={"damage_dealt": actual_damage, "counter_damage": counter_damage}
))
return {
"attacker_id": attacker.id,
"target_id": target.id,
"damage_dealt": actual_damage,
"counter_damage": counter_damage,
"units_destroyed": units_destroyed,
"combat_resolved": True
}
def _resolve_ambush(self, defender: Unit, attacker: Unit, battlefield: Battlefield) -> Dict[str, Any]:
"""解算伏击"""
ambush_damage = defender.get_effective_attack()
actual_damage = attacker.stats.take_damage(ambush_damage)
result = {
"ambush_triggered": True,
"ambush_damage": actual_damage,
"attacker_destroyed": not attacker.stats.is_alive()
}
if not attacker.stats.is_alive():
self.event_manager.add_event(GameEvent("unit_dies", target=attacker))
battlefield.remove_unit(attacker)
return result
def _calculate_damage(self, base_damage: int, target: Unit) -> int:
"""计算最终伤害"""
final_damage = base_damage
# 重甲减伤
for keyword in target.keywords:
if keyword.startswith("HEAVY_ARMOR_"):
try:
armor_value = int(keyword.split("_")[-1])
final_damage = max(0, final_damage - armor_value)
break
except ValueError:
continue
return final_damage
def _can_counter_attack(self, defender: Unit, attacker: Unit) -> bool:
"""检查是否可以反击"""
# 轰炸机攻击时目标不能反击(除非目标也是战斗机)
if attacker.unit_type.name == "BOMBER" and defender.unit_type.name != "FIGHTER":
return False
# 被压制的单位不能反击
if defender.has_keyword("PINNED"):
return False
return True
class GameEngineV2:
"""更新的游戏引擎主类"""
def __init__(self, player1_id: str, player2_id: str):
self.battlefield = Battlefield(player1_id, player2_id)
self.event_manager = EventManager()
self.combat_resolver = CombatResolver(self.event_manager)
self.current_turn = 1
self.active_player = player1_id
self.game_phase = GamePhase.PLAYER_TURN
# 指挥点资源管理
self.player1_command_points = 1 # 第一回合1点
self.player2_command_points = 1
self.max_command_points = 12 # 最大12点
self.turn_history = []
def get_command_points(self, player_id: str) -> int:
"""获取玩家当前指挥点"""
if player_id == self.battlefield.player1_id:
return self.player1_command_points
elif player_id == self.battlefield.player2_id:
return self.player2_command_points
return 0
def spend_command_points(self, player_id: str, cost: int) -> bool:
"""消耗指挥点"""
current_points = self.get_command_points(player_id)
if current_points >= cost:
if player_id == self.battlefield.player1_id:
self.player1_command_points -= cost
elif player_id == self.battlefield.player2_id:
self.player2_command_points -= cost
return True
return False
def deploy_unit_to_support(self, unit: Unit, player_id: str) -> Dict[str, Any]:
"""部署单位到支援线(消耗指挥点)"""
if player_id != self.active_player:
return {"success": False, "reason": "Not your turn"}
# 检查指挥点是否足够
if not self.spend_command_points(player_id, unit.stats.operation_cost):
return {"success": False, "reason": "Insufficient command points"}
success = self.battlefield.deploy_unit_to_support(unit, player_id)
if success:
# 触发部署事件
self.event_manager.add_event(GameEvent(
"unit_deployed",
source=unit,
context={"deployment_type": "support"}
))
# 处理事件
self.event_manager.process_events(self.battlefield)
return {"success": True, "unit_deployed": unit.id, "location": "support"}
return {"success": False, "reason": "Failed to deploy to support line"}
def deploy_unit_to_front(self, unit: Unit, player_id: str) -> Dict[str, Any]:
"""部署单位到前线(消耗指挥点)"""
if player_id != self.active_player:
return {"success": False, "reason": "Not your turn"}
# 检查指挥点是否足够
if not self.spend_command_points(player_id, unit.stats.operation_cost):
return {"success": False, "reason": "Insufficient command points"}
success = self.battlefield.deploy_unit_to_front(unit, player_id)
if success:
# 触发部署事件
self.event_manager.add_event(GameEvent(
"unit_deployed",
source=unit,
context={"deployment_type": "front"}
))
# 处理事件
self.event_manager.process_events(self.battlefield)
return {"success": True, "unit_deployed": unit.id, "location": "front"}
return {"success": False, "reason": "Failed to deploy to front line"}
def attack_target(self, attacker_id: UUID, target_id: UUID) -> Dict[str, Any]:
"""单位攻击目标单位或HQ"""
attacker = self.battlefield.find_unit(attacker_id)
if not attacker:
return {"success": False, "reason": "Attacker not found"}
if attacker.owner != self.active_player:
return {"success": False, "reason": "Not your unit"}
# 查找目标可能是单位或HQ
target = self.battlefield.find_unit(target_id)
if not target:
# 检查是否是HQ目标
player1_hq = self.battlefield.player1_hq
player2_hq = self.battlefield.player2_hq
if str(target_id) == "hq1":
target = player1_hq
elif str(target_id) == "hq2":
target = player2_hq
else:
return {"success": False, "reason": "Target not found"}
# 检查攻击有效性
valid_targets = self.battlefield.get_valid_attack_targets_for_unit(attacker)
if target not in valid_targets:
return {"success": False, "reason": "Cannot attack this target"}
# 检查单位是否已经行动过
if attacker.has_operated_this_turn and not attacker.has_keyword("FURY"):
return {"success": False, "reason": "Unit already operated this turn"}
# 触发攻击事件
self.event_manager.add_event(GameEvent(
"unit_attacks",
source=attacker,
target=target
))
# 解算战斗
combat_result = self.combat_resolver.resolve_combat(attacker, target, self.battlefield)
# 标记单位已行动
attacker.has_operated_this_turn = True
# 处理所有事件
event_results = self.event_manager.process_events(self.battlefield)
return {
**combat_result,
"event_results": event_results
}
def move_unit_to_front(self, unit_id: UUID) -> Dict[str, Any]:
"""将支援线单位移动到前线"""
unit = self.battlefield.find_unit(unit_id)
if not unit or unit.owner != self.active_player:
return {"success": False, "reason": "Unit not found or not yours"}
if not unit.position:
return {"success": False, "reason": "Unit not on battlefield"}
line_type, _ = unit.position
if line_type == LineType.FRONT:
return {"success": False, "reason": "Unit already on front line"}
# 只有坦克可以同回合移动并攻击
if unit.unit_type.name != "TANK" and unit.has_operated_this_turn:
return {"success": False, "reason": "Unit already operated this turn"}
# 检查前线是否有空位
if self.battlefield.front_line.is_full():
return {"success": False, "reason": "Front line is full"}
# 从当前位置移除并部署到前线
old_line = self.battlefield.get_line_by_type(line_type)
if old_line:
old_line.remove_unit(unit)
success = self.battlefield.front_line.add_unit(unit)
if success:
# 对于非坦克单位,标记为已行动
if unit.unit_type.name != "TANK":
unit.has_operated_this_turn = True
# 更新前线控制权
self.battlefield._update_front_line_control()
return {"success": True, "unit_moved": unit.id, "to": "front"}
return {"success": False, "reason": "Failed to move to front"}
def end_turn(self) -> Dict[str, Any]:
"""结束回合"""
# 触发回合结束事件
self.event_manager.add_event(GameEvent(
"turn_end",
context={"player": self.active_player, "turn": self.current_turn}
))
# 处理事件
self.event_manager.process_events(self.battlefield)
# 切换玩家
self.active_player = (self.battlefield.player2_id if self.active_player == self.battlefield.player1_id
else self.battlefield.player1_id)
# 增加指挥点并重置
if self.active_player == self.battlefield.player1_id:
self.player1_command_points = min(self.max_command_points, self.current_turn + 1)
else:
self.player2_command_points = min(self.max_command_points, self.current_turn + 1)
# 重置单位状态
for unit in self.battlefield.get_all_units(self.active_player):
unit.reset_operation_status()
# 触发回合开始事件
self.event_manager.add_event(GameEvent(
"turn_start",
context={"player": self.active_player, "turn": self.current_turn}
))
# 处理事件
self.event_manager.process_events(self.battlefield)
self.current_turn += 1
self.game_phase = GamePhase.PLAYER_TURN
# 检查游戏是否结束
game_over, winner = self.battlefield.is_game_over()
return {
"turn_ended": True,
"new_active_player": self.active_player,
"turn_number": self.current_turn,
"game_over": game_over,
"winner": winner,
"front_line_controller": self.battlefield.front_line_controller
}
def get_game_state(self) -> Dict[str, Any]:
"""获取游戏状态"""
return {
"turn": self.current_turn,
"active_player": self.active_player,
"phase": self.game_phase.value,
"battlefield": str(self.battlefield),
"player1_hq": self.battlefield.player1_hq.current_defense,
"player2_hq": self.battlefield.player2_hq.current_defense,
"front_line_controller": self.battlefield.front_line_controller,
"units_count": {
"player1_total": len(self.battlefield.get_all_units(self.battlefield.player1_id)),
"player2_total": len(self.battlefield.get_all_units(self.battlefield.player2_id)),
"player1_support": len(self.battlefield.player1_support.get_all_units()),
"player1_front": len([u for u in self.battlefield.front_line.get_all_units()
if u.owner == self.battlefield.player1_id]),
"player2_support": len(self.battlefield.player2_support.get_all_units()),
"player2_front": len([u for u in self.battlefield.front_line.get_all_units()
if u.owner == self.battlefield.player2_id])
}
}