mistle/movement_agent.py

199 lines
6.3 KiB
Python

from __future__ import annotations
import re
import random
from dataclasses import dataclass, field
from typing import Dict, Optional, Pattern, Sequence, Set, Tuple
from agent import Agent
@dataclass
class MovementAgent(Agent):
"""Agent that chooses movement commands based on room descriptions."""
look_command: str = "schau"
command_format: str = "{direction}"
preferred_order: Tuple[str, ...] = (
"n",
"ne",
"e",
"se",
"s",
"sw",
"w",
"nw",
"u",
"d",
)
last_output: str = field(default="", init=False)
current_room_key: Optional[str] = field(default=None, init=False)
visited_directions: Dict[str, Set[str]] = field(default_factory=dict, init=False)
executed: bool = field(default=False, init=False)
needs_look: bool = field(default=True, init=False)
selected_direction: Optional[str] = field(default=None, init=False)
random_choice: bool = True
_direction_aliases: Dict[str, str] = field(
default_factory=lambda: {
"n": "n",
"north": "n",
"norden": "n",
"nord": "n",
"no": "ne",
"ne": "ne",
"nordost": "ne",
"nordosten": "ne",
"e": "e",
"east": "e",
"osten": "e",
"ost": "e",
"so": "se",
"se": "se",
"südost": "se",
"südosten": "se",
"s": "s",
"south": "s",
"süden": "s",
"süd": "s",
"sw": "sw",
"südwest": "sw",
"südwesten": "sw",
"w": "w",
"west": "w",
"westen": "w",
"nw": "nw",
"nordwest": "nw",
"nordwesten": "nw",
"u": "u",
"oben": "u",
"up": "u",
"rauf": "u",
"d": "d",
"down": "d",
"hinunter": "d",
"unten": "d",
},
init=False,
)
_exit_line_pattern: Pattern[str] = field(
default_factory=lambda: re.compile(
r"(?:ausg(?:ä|a)nge?|exits?)[:\s]+(.+)", re.IGNORECASE
),
init=False,
)
def observe(self, output: str) -> None:
if not output or self.executed:
return
self.last_output = output
if self.needs_look:
return
room_key = self._room_key(output)
if self.selected_direction is None:
direction = self._select_direction(output, room_key)
if direction:
self.current_room_key = room_key
self.selected_direction = direction
def decide(self) -> Optional[str]:
if self.executed:
return None
if self.needs_look:
self.needs_look = False
print("[Agent] Requesting room description via look command")
return self.look_command
if self.selected_direction is None:
return None
direction = self.selected_direction
if self.current_room_key is not None:
self.visited_directions.setdefault(self.current_room_key, set()).add(
direction
)
command = self.command_format.format(direction=direction)
print(f"[Agent] Moving via {direction}")
self.executed = True
self.selected_direction = None
return command
def _room_key(self, text: str) -> str:
return re.sub(r"\s+", " ", text.strip())
def _select_direction(self, text: str, room_key: str) -> Optional[str]:
exits = self._parse_exit_lines(text)
if not exits:
exits = self._scan_for_directions(text)
if not exits:
return None
ordered = self._prioritized_exits(exits, room_key)
if not ordered:
return None
visited = self.visited_directions.setdefault(room_key, set())
candidates = [direction for direction in ordered if direction not in visited]
pool = candidates or ordered
return self._choose_direction(pool)
def _prioritized_exits(
self, exits: Sequence[Tuple[str, Optional[str]]], room_key: str
) -> list[str]:
ordered: list[str] = []
seen: Set[str] = set()
for pref in self.preferred_order:
for raw, canonical in exits:
if canonical == pref and raw not in seen:
ordered.append(raw)
seen.add(raw)
for raw, _ in exits:
if raw not in seen:
ordered.append(raw)
seen.add(raw)
return ordered
def _choose_direction(self, pool: Sequence[str]) -> str:
if not pool:
raise ValueError("No exits available to choose from")
if self.random_choice and len(pool) > 1:
return random.choice(list(pool))
return pool[0]
def _parse_exit_lines(self, text: str) -> list[Tuple[str, Optional[str]]]:
exits: list[Tuple[str, Optional[str]]] = []
for line in text.splitlines():
match = self._exit_line_pattern.search(line)
if not match:
continue
candidates = re.split(r"[,;/]| und | oder |\\s", match.group(1))
for candidate in candidates:
normalized = self._normalize_raw(candidate)
if not normalized:
continue
exits.append(normalized)
return exits
def _scan_for_directions(self, text: str) -> list[Tuple[str, Optional[str]]]:
exits: list[Tuple[str, Optional[str]]] = []
lowered = text.lower()
for word, direction in self._direction_aliases.items():
if re.search(rf"\b{re.escape(word)}\b", lowered):
normalized = self._normalize_raw(word)
if normalized:
exits.append((normalized[0], direction))
return exits
def _map_direction(self, raw: str) -> Optional[str]:
cleaned = re.sub(r"[^a-zäöüß]", "", raw.lower())
if not cleaned:
return None
return self._direction_aliases.get(cleaned)
def _normalize_raw(self, raw: str) -> Optional[Tuple[str, Optional[str]]]:
cleaned = re.sub(r"[\s\.,;:!?-]+$", "", raw.strip())
if not cleaned:
return None
return cleaned, self._map_direction(cleaned)