kards-env/archive/game_engine_v2.py

511 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
更新的游戏引擎
适配新的3条战线战场系统
"""
from typing import Dict, List, Optional, Any, Union
from uuid import UUID
from ..battlefield.battlefield_v2 import Battlefield, HQ
from ..units.unit import Unit
from ..abilities.keywords import TriggerTiming, KeywordRegistry
from ..abilities.abilities import AbilityManager
from .enums import GamePhase, LineType
class GameEvent:
"""游戏事件"""
def __init__(self, event_type: str, source: Optional[Unit] = None,
target: Optional[Union[Unit, HQ]] = None, context: Dict[str, Any] = None):
self.event_type = event_type
self.source = source
self.target = target
self.context = context or {}
self.timestamp = 0
class EventManager:
"""事件管理器"""
def __init__(self):
self.event_queue = []
self.event_listeners = {}
self.event_counter = 0
def add_event(self, event: GameEvent) -> None:
"""添加事件到队列"""
event.timestamp = self.event_counter
self.event_counter += 1
self.event_queue.append(event)
def process_events(self, battlefield: Battlefield) -> List[Dict[str, Any]]:
"""处理所有事件"""
results = []
while self.event_queue:
event = self.event_queue.pop(0)
result = self._process_single_event(event, battlefield)
if result:
results.append(result)
return results
def _process_single_event(self, event: GameEvent, battlefield: Battlefield) -> Optional[Dict[str, Any]]:
"""处理单个事件"""
timing_map = {
"unit_deployed": TriggerTiming.ON_DEPLOY,
"unit_attacks": TriggerTiming.ON_ATTACK,
"unit_defends": TriggerTiming.ON_DEFEND,
"unit_takes_damage": TriggerTiming.ON_DAMAGE_TAKEN,
"unit_dies": TriggerTiming.ON_DEATH,
"turn_start": TriggerTiming.ON_TURN_START,
"turn_end": TriggerTiming.ON_TURN_END,
"before_combat": TriggerTiming.BEFORE_COMBAT,
"after_combat": TriggerTiming.AFTER_COMBAT
}
trigger_timing = timing_map.get(event.event_type)
if not trigger_timing:
return None
# 触发所有相关单位的能力
all_units = battlefield.get_all_units()
ability_manager = AbilityManager()
results = []
for unit in all_units:
# 触发关键词效果
self._trigger_keyword_effects(unit, battlefield, trigger_timing, event.context)
# 触发能力
unit_results = ability_manager.trigger_abilities(unit, battlefield, trigger_timing, event.context)
results.extend(unit_results)
return {
"event": event.event_type,
"results": results
} if results else None
def _trigger_keyword_effects(self, unit: Unit, battlefield: Battlefield,
timing: TriggerTiming, context: Dict[str, Any]) -> None:
"""触发单位的关键词效果"""
for keyword_name in unit.keywords:
keyword_effect = KeywordRegistry.create_keyword(keyword_name)
if keyword_effect:
keyword_effect.apply_effect(unit, battlefield, timing, context)
class CombatResolver:
"""战斗解算器"""
def __init__(self, event_manager: EventManager):
self.event_manager = event_manager
def resolve_combat(self, attacker: Unit, target: Union[Unit, HQ],
battlefield: Battlefield) -> Dict[str, Any]:
"""解算战斗"""
# 战斗前事件
self.event_manager.add_event(GameEvent(
"before_combat",
source=attacker,
target=target,
context={"combat_type": "attack"}
))
# HQ战斗特殊处理
if isinstance(target, HQ):
return self._resolve_hq_attack(attacker, target, battlefield)
# 单位间战斗
return self._resolve_unit_combat(attacker, target, battlefield)
def _resolve_hq_attack(self, attacker: Unit, hq: HQ, battlefield: Battlefield) -> Dict[str, Any]:
"""解算对HQ的攻击"""
base_damage = attacker.get_effective_attack()
actual_damage = hq.take_damage(base_damage)
# HQ不会反击
result = {
"attacker_id": attacker.id,
"target_type": "hq",
"target_player": hq.player_id,
"damage_dealt": actual_damage,
"counter_damage": 0,
"hq_destroyed": hq.is_destroyed(),
"combat_resolved": True
}
self.event_manager.add_event(GameEvent(
"after_combat",
source=attacker,
target=hq,
context={"damage_dealt": actual_damage, "counter_damage": 0}
))
return result
def _resolve_unit_combat(self, attacker: Unit, target: Unit, battlefield: Battlefield) -> Dict[str, Any]:
"""解算单位间战斗"""
# 处理伏击
if target.has_keyword("AMBUSH"):
ambush_result = self._resolve_ambush(target, attacker, battlefield)
if ambush_result.get("attacker_destroyed"):
return ambush_result
# 计算最终伤害
base_damage = attacker.get_effective_attack()
final_damage = self._calculate_damage(base_damage, target)
# 应用伤害
actual_damage = target.stats.take_damage(final_damage)
# 触发受伤事件
self.event_manager.add_event(GameEvent(
"unit_takes_damage",
source=attacker,
target=target,
context={"damage": actual_damage}
))
# 反击
counter_damage = 0
if target.stats.is_alive() and self._can_counter_attack(target, attacker):
counter_base_damage = target.get_effective_attack()
counter_final_damage = self._calculate_damage(counter_base_damage, attacker)
counter_damage = attacker.stats.take_damage(counter_final_damage)
if counter_damage > 0:
self.event_manager.add_event(GameEvent(
"unit_takes_damage",
source=target,
target=attacker,
context={"damage": counter_damage, "is_counter": True}
))
# 检查单位死亡
units_destroyed = []
if not target.stats.is_alive():
self.event_manager.add_event(GameEvent("unit_dies", target=target))
battlefield.remove_unit(target)
units_destroyed.append(target.id)
if not attacker.stats.is_alive():
self.event_manager.add_event(GameEvent("unit_dies", target=attacker))
battlefield.remove_unit(attacker)
units_destroyed.append(attacker.id)
# 战斗后事件
self.event_manager.add_event(GameEvent(
"after_combat",
source=attacker,
target=target,
context={"damage_dealt": actual_damage, "counter_damage": counter_damage}
))
return {
"attacker_id": attacker.id,
"target_id": target.id,
"damage_dealt": actual_damage,
"counter_damage": counter_damage,
"units_destroyed": units_destroyed,
"combat_resolved": True
}
def _resolve_ambush(self, defender: Unit, attacker: Unit, battlefield: Battlefield) -> Dict[str, Any]:
"""解算伏击"""
ambush_damage = defender.get_effective_attack()
actual_damage = attacker.stats.take_damage(ambush_damage)
result = {
"ambush_triggered": True,
"ambush_damage": actual_damage,
"attacker_destroyed": not attacker.stats.is_alive()
}
if not attacker.stats.is_alive():
self.event_manager.add_event(GameEvent("unit_dies", target=attacker))
battlefield.remove_unit(attacker)
return result
def _calculate_damage(self, base_damage: int, target: Unit) -> int:
"""计算最终伤害"""
final_damage = base_damage
# 重甲减伤
for keyword in target.keywords:
if keyword.startswith("HEAVY_ARMOR_"):
try:
armor_value = int(keyword.split("_")[-1])
final_damage = max(0, final_damage - armor_value)
break
except ValueError:
continue
return final_damage
def _can_counter_attack(self, defender: Unit, attacker: Unit) -> bool:
"""检查是否可以反击"""
# 轰炸机攻击时目标不能反击(除非目标也是战斗机)
if attacker.unit_type.name == "BOMBER" and defender.unit_type.name != "FIGHTER":
return False
# 被压制的单位不能反击
if defender.has_keyword("PINNED"):
return False
return True
class GameEngineV2:
"""更新的游戏引擎主类"""
def __init__(self, player1_id: str, player2_id: str):
self.battlefield = Battlefield(player1_id, player2_id)
self.event_manager = EventManager()
self.combat_resolver = CombatResolver(self.event_manager)
self.current_turn = 1
self.active_player = player1_id
self.game_phase = GamePhase.PLAYER_TURN
# 指挥点资源管理
self.player1_command_points = 1 # 第一回合1点
self.player2_command_points = 1
self.max_command_points = 12 # 最大12点
self.turn_history = []
def get_command_points(self, player_id: str) -> int:
"""获取玩家当前指挥点"""
if player_id == self.battlefield.player1_id:
return self.player1_command_points
elif player_id == self.battlefield.player2_id:
return self.player2_command_points
return 0
def spend_command_points(self, player_id: str, cost: int) -> bool:
"""消耗指挥点"""
current_points = self.get_command_points(player_id)
if current_points >= cost:
if player_id == self.battlefield.player1_id:
self.player1_command_points -= cost
elif player_id == self.battlefield.player2_id:
self.player2_command_points -= cost
return True
return False
def deploy_unit_to_support(self, unit: Unit, player_id: str) -> Dict[str, Any]:
"""部署单位到支援线(消耗指挥点)"""
if player_id != self.active_player:
return {"success": False, "reason": "Not your turn"}
# 检查指挥点是否足够
if not self.spend_command_points(player_id, unit.stats.operation_cost):
return {"success": False, "reason": "Insufficient command points"}
success = self.battlefield.deploy_unit_to_support(unit, player_id)
if success:
# 触发部署事件
self.event_manager.add_event(GameEvent(
"unit_deployed",
source=unit,
context={"deployment_type": "support"}
))
# 处理事件
self.event_manager.process_events(self.battlefield)
return {"success": True, "unit_deployed": unit.id, "location": "support"}
return {"success": False, "reason": "Failed to deploy to support line"}
def deploy_unit_to_front(self, unit: Unit, player_id: str) -> Dict[str, Any]:
"""部署单位到前线(消耗指挥点)"""
if player_id != self.active_player:
return {"success": False, "reason": "Not your turn"}
# 检查指挥点是否足够
if not self.spend_command_points(player_id, unit.stats.operation_cost):
return {"success": False, "reason": "Insufficient command points"}
success = self.battlefield.deploy_unit_to_front(unit, player_id)
if success:
# 触发部署事件
self.event_manager.add_event(GameEvent(
"unit_deployed",
source=unit,
context={"deployment_type": "front"}
))
# 处理事件
self.event_manager.process_events(self.battlefield)
return {"success": True, "unit_deployed": unit.id, "location": "front"}
return {"success": False, "reason": "Failed to deploy to front line"}
def attack_target(self, attacker_id: UUID, target_id: UUID) -> Dict[str, Any]:
"""单位攻击目标单位或HQ"""
attacker = self.battlefield.find_unit(attacker_id)
if not attacker:
return {"success": False, "reason": "Attacker not found"}
if attacker.owner != self.active_player:
return {"success": False, "reason": "Not your unit"}
# 查找目标可能是单位或HQ
target = self.battlefield.find_unit(target_id)
if not target:
# 检查是否是HQ目标
player1_hq = self.battlefield.player1_hq
player2_hq = self.battlefield.player2_hq
if str(target_id) == "hq1":
target = player1_hq
elif str(target_id) == "hq2":
target = player2_hq
else:
return {"success": False, "reason": "Target not found"}
# 检查攻击有效性
valid_targets = self.battlefield.get_valid_attack_targets_for_unit(attacker)
if target not in valid_targets:
return {"success": False, "reason": "Cannot attack this target"}
# 检查单位是否已经行动过
if attacker.has_operated_this_turn and not attacker.has_keyword("FURY"):
return {"success": False, "reason": "Unit already operated this turn"}
# 触发攻击事件
self.event_manager.add_event(GameEvent(
"unit_attacks",
source=attacker,
target=target
))
# 解算战斗
combat_result = self.combat_resolver.resolve_combat(attacker, target, self.battlefield)
# 标记单位已行动
attacker.has_operated_this_turn = True
# 处理所有事件
event_results = self.event_manager.process_events(self.battlefield)
return {
**combat_result,
"event_results": event_results
}
def move_unit_to_front(self, unit_id: UUID) -> Dict[str, Any]:
"""将支援线单位移动到前线"""
unit = self.battlefield.find_unit(unit_id)
if not unit or unit.owner != self.active_player:
return {"success": False, "reason": "Unit not found or not yours"}
if not unit.position:
return {"success": False, "reason": "Unit not on battlefield"}
line_type, _ = unit.position
if line_type == LineType.FRONT:
return {"success": False, "reason": "Unit already on front line"}
# 只有坦克可以同回合移动并攻击
if unit.unit_type.name != "TANK" and unit.has_operated_this_turn:
return {"success": False, "reason": "Unit already operated this turn"}
# 检查前线是否有空位
if self.battlefield.front_line.is_full():
return {"success": False, "reason": "Front line is full"}
# 从当前位置移除并部署到前线
old_line = self.battlefield.get_line_by_type(line_type)
if old_line:
old_line.remove_unit(unit)
success = self.battlefield.front_line.add_unit(unit)
if success:
# 对于非坦克单位,标记为已行动
if unit.unit_type.name != "TANK":
unit.has_operated_this_turn = True
# 更新前线控制权
self.battlefield._update_front_line_control()
return {"success": True, "unit_moved": unit.id, "to": "front"}
return {"success": False, "reason": "Failed to move to front"}
def end_turn(self) -> Dict[str, Any]:
"""结束回合"""
# 触发回合结束事件
self.event_manager.add_event(GameEvent(
"turn_end",
context={"player": self.active_player, "turn": self.current_turn}
))
# 处理事件
self.event_manager.process_events(self.battlefield)
# 切换玩家
self.active_player = (self.battlefield.player2_id if self.active_player == self.battlefield.player1_id
else self.battlefield.player1_id)
# 增加指挥点并重置
if self.active_player == self.battlefield.player1_id:
self.player1_command_points = min(self.max_command_points, self.current_turn + 1)
else:
self.player2_command_points = min(self.max_command_points, self.current_turn + 1)
# 重置单位状态
for unit in self.battlefield.get_all_units(self.active_player):
unit.reset_operation_status()
# 触发回合开始事件
self.event_manager.add_event(GameEvent(
"turn_start",
context={"player": self.active_player, "turn": self.current_turn}
))
# 处理事件
self.event_manager.process_events(self.battlefield)
self.current_turn += 1
self.game_phase = GamePhase.PLAYER_TURN
# 检查游戏是否结束
game_over, winner = self.battlefield.is_game_over()
return {
"turn_ended": True,
"new_active_player": self.active_player,
"turn_number": self.current_turn,
"game_over": game_over,
"winner": winner,
"front_line_controller": self.battlefield.front_line_controller
}
def get_game_state(self) -> Dict[str, Any]:
"""获取游戏状态"""
return {
"turn": self.current_turn,
"active_player": self.active_player,
"phase": self.game_phase.value,
"battlefield": str(self.battlefield),
"player1_hq": self.battlefield.player1_hq.current_defense,
"player2_hq": self.battlefield.player2_hq.current_defense,
"front_line_controller": self.battlefield.front_line_controller,
"units_count": {
"player1_total": len(self.battlefield.get_all_units(self.battlefield.player1_id)),
"player2_total": len(self.battlefield.get_all_units(self.battlefield.player2_id)),
"player1_support": len(self.battlefield.player1_support.get_all_units()),
"player1_front": len([u for u in self.battlefield.front_line.get_all_units()
if u.owner == self.battlefield.player1_id]),
"player2_support": len(self.battlefield.player2_support.get_all_units()),
"player2_front": len([u for u in self.battlefield.front_line.get_all_units()
if u.owner == self.battlefield.player2_id])
}
}