diff --git a/app.py b/app.py index 140c18c..a480a79 100644 --- a/app.py +++ b/app.py @@ -1,493 +1,55 @@ -import os -import select +from __future__ import annotations + import sys -import time -import unicodedata -from collections import deque -from importlib import import_module -from pathlib import Path -from threading import Event, Lock, Thread -from typing import Callable, Deque, List, Optional, Type +from threading import Event, Thread +from typing import Optional -from tools import Tool -from agents import Agent, build_agent from agent_runtime import run_agent - -TOOL_REGISTRY = { - "look": { - "module": "tools", - "class": "LookTool", - "kwargs": {}, - "description": "Sends the 'schau' look command to refresh the room description.", - }, - "simple": { - "module": "tools", - "class": "LookTool", - "kwargs": {}, - "description": "Alias of 'look'. Sends the 'schau' look command to refresh the room description.", - }, - "move": { - "module": "movement_tool", - "class": "MovementTool", - "kwargs": {}, - "description": "Looks around and moves in one available direction, chosen randomly among unvisited exits.", - }, - "movement": { - "module": "movement_tool", - "class": "MovementTool", - "kwargs": {}, - "description": "Alias of 'move'. Looks around and moves in one available direction, chosen randomly among unvisited exits.", - }, - "explore": { - "module": "tools", - "class": "ExploreTool", - "kwargs": {}, - "description": "Sends 'schau' once, then 'untersuche ' for each noun found in the room description.", - }, - "communication": { - "module": "tools", - "class": "CommunicationTool", - "kwargs": {}, - "description": "Responds to private tells with a friendly greeting via 'teile mit ...'.", - }, - "intelligent": { - "module": "intelligent_tool", - "class": "IntelligentCommunicationTool", - "kwargs": { - "model": os.environ.get( - "MISTLE_LLM_MODEL", "mistral/mistral-small" - ) - }, - "description": "Uses an LLM to craft a polite reply to private tells.", - }, - "intelligentcommunication": { - "module": "intelligent_tool", - "class": "IntelligentCommunicationTool", - "kwargs": { - "model": os.environ.get( - "MISTLE_LLM_MODEL", "mistral/mistral-small" - ) - }, - "description": "Alias of 'intelligent'. Uses an LLM to craft a polite reply to private tells.", - }, -} - -TOOL_DESCRIPTIONS = { - name: meta["description"] for name, meta in TOOL_REGISTRY.items() -} - -from telnetclient import TelnetClient - - -_UMLAUT_TRANSLATION = str.maketrans( - { - "ä": "ae", - "ö": "oe", - "ü": "ue", - "Ä": "Ae", - "Ö": "Oe", - "Ü": "Ue", - "ß": "ss", - } +from agents import Agent, build_agent +from mud_env import load_env_file, read_connection_settings, read_tool_settings +from mud_session import ( + SessionState, + graceful_shutdown, + interactive_session, + login, + run_tool_loop, ) +from mud_tools import TOOL_DESCRIPTIONS, build_tool +from telnetclient import TelnetClient +from tools import Tool -def sanitize_for_mud(text: str) -> str: - """Return ASCII-only text suitable for Silberland's input parser.""" - if not text: - return "" - - replaced = text.translate(_UMLAUT_TRANSLATION) - normalized = unicodedata.normalize("NFKD", replaced) - cleaned: list[str] = [] - for ch in normalized: - if unicodedata.combining(ch): - continue - code = ord(ch) - if 32 <= code <= 126: - cleaned.append(ch) - else: - cleaned.append("?") - return "".join(cleaned) +DEFAULT_SEND_INTERVAL = 1.0 +AUTO_STOP_IDLE_SECONDS = 2.0 -class SessionState: - """Share Telnet session state safely across threads.""" - - class _OutputListener: - def __init__(self) -> None: - self._queue: Deque[str] = deque() - self._lock = Lock() - self._event = Event() - - def publish(self, text: str) -> None: - with self._lock: - self._queue.append(text) - self._event.set() - - def wait(self, timeout: float) -> bool: - return self._event.wait(timeout) - - def drain(self) -> List[str]: - with self._lock: - items = list(self._queue) - self._queue.clear() - self._event.clear() - return items - - def close(self) -> None: - with self._lock: - self._queue.clear() - self._event.set() - - def __init__(self) -> None: - self._send_lock = Lock() - self._output_lock = Lock() - self._output_event = Event() - self._last_output = "" - self._last_tool_send = 0.0 - self._listeners: set[SessionState._OutputListener] = set() - self._listeners_lock = Lock() - - def send(self, client: TelnetClient, message: str) -> None: - sanitized = sanitize_for_mud(message) - with self._send_lock: - client.send(sanitized) - - def tool_send( - self, - client: TelnetClient, - message: str, - *, - min_interval: float, - stop_event: Event, - ) -> bool: - """Send on behalf of the tool while respecting a minimum cadence.""" - sanitized = sanitize_for_mud(message) - - while not stop_event.is_set(): - with self._send_lock: - now = time.time() - elapsed = now - self._last_tool_send - if elapsed >= min_interval: - client.send(sanitized) - self._last_tool_send = now - return True - wait_time = min_interval - elapsed - if wait_time <= 0: - continue - if stop_event.wait(wait_time): - break - return False - - def update_output(self, text: str) -> None: - if not text: - return - with self._output_lock: - self._last_output = text - with self._listeners_lock: - listeners = list(self._listeners) - for listener in listeners: - listener.publish(text) - self._output_event.set() - - def snapshot_output(self) -> str: - with self._output_lock: - return self._last_output - - def register_listener(self) -> "SessionState._OutputListener": - listener = SessionState._OutputListener() - with self._listeners_lock: - self._listeners.add(listener) - with self._output_lock: - last_output = self._last_output - if last_output: - listener.publish(last_output) - return listener - - def remove_listener(self, listener: "SessionState._OutputListener") -> None: - with self._listeners_lock: - existed = listener in self._listeners - if existed: - self._listeners.remove(listener) - if existed: - listener.close() - - def wait_for_output(self, timeout: float) -> bool: - return self._output_event.wait(timeout) - - def clear_output_event(self) -> None: - self._output_event.clear() - - -def run_tool_loop( +def _start_tool_thread( client: TelnetClient, state: SessionState, tool: Tool, stop_event: Event, *, - idle_delay: float = 0.5, - min_send_interval: float = 1.0, auto_stop: bool = False, - auto_stop_idle: float = 2.0, -) -> None: - """Invoke *tool* whenever new output arrives and send its response.""" - idle_started: Optional[float] = None - listener = state.register_listener() - - def maybe_send() -> None: - nonlocal idle_started - try: - command = tool.decide() - except Exception as exc: # pragma: no cover - defensive logging - print(f"[Tool] Failed: {exc}", file=sys.stderr) - return - if not command: - return - sent = state.tool_send( - client, - command, - min_interval=min_send_interval, - stop_event=stop_event, - ) - if not sent: - return - idle_started = None - - try: - while not stop_event.is_set(): - maybe_send() - if stop_event.is_set(): - break - - triggered = listener.wait(timeout=idle_delay) - if stop_event.is_set(): - break - - if not triggered: - if auto_stop: - now = time.time() - if idle_started is None: - idle_started = now - elif now - idle_started >= auto_stop_idle: - break - continue - - outputs = listener.drain() - if not outputs: - continue - - idle_started = None - for chunk in outputs: - if not chunk: - continue - try: - tool.observe(chunk) - except Exception as exc: # pragma: no cover - defensive logging - print(f"[Tool] Failed during observe: {exc}", file=sys.stderr) - - maybe_send() - finally: - state.remove_listener(listener) - -def load_env_file(path: str = ".env") -> None: - """Populate ``os.environ`` with key/value pairs from a dotenv file.""" - env_path = Path(path) - if not env_path.exists(): - return - for line in env_path.read_text().splitlines(): - stripped = line.strip() - if not stripped or stripped.startswith("#"): - continue - if "=" not in stripped: - continue - key, value = stripped.split("=", 1) - key = key.strip() - value = value.strip().strip('"').strip("'") - if key and key not in os.environ: - os.environ[key] = value + auto_stop_idle: float = AUTO_STOP_IDLE_SECONDS, +) -> Thread: + thread = Thread( + target=run_tool_loop, + args=(client, state, tool, stop_event), + kwargs={ + "min_send_interval": DEFAULT_SEND_INTERVAL, + "auto_stop": auto_stop, + "auto_stop_idle": auto_stop_idle, + }, + daemon=True, + ) + thread.start() + return thread -def require_env(key: str) -> str: - value = os.environ.get(key) - if value is None: - raise RuntimeError(f"Missing required environment variable: {key}") - return value - - -def build_tool(spec: str) -> Tool: - """Instantiate a tool based on configuration.""" - normalized = spec.strip() or "look" - key = normalized.lower() - - if key in TOOL_REGISTRY: - meta = TOOL_REGISTRY[key] - module_name = meta["module"] - class_name = meta["class"] - kwargs = meta.get("kwargs", {}) - try: - module = import_module(module_name) - tool_cls = getattr(module, class_name) - except AttributeError as exc: # pragma: no cover - optional dependency - raise RuntimeError(f"{class_name} is not available in tools module") from exc - tool = _instantiate_tool(tool_cls, normalized, kwargs) - model_name = kwargs.get("model") if kwargs else None - if model_name: - print(f"[Tool] Using LLM model: {model_name}") - return tool - - if ":" in normalized: - module_name, class_name = normalized.split(":", 1) - if not module_name or not class_name: - raise RuntimeError("MISTLE_TOOL must be in 'module:ClassName' format") - module = import_module(module_name) - tool_cls = getattr(module, class_name) - return _instantiate_tool(tool_cls, normalized) - - raise RuntimeError(f"Unknown tool spec '{spec}'.") - - -def _instantiate_tool( - tool_cls: Type[Tool], tool_spec: str, kwargs: Optional[dict] = None -) -> Tool: - if not issubclass(tool_cls, Tool): - raise RuntimeError(f"{tool_spec} is not a Tool subclass") - try: - kwargs = kwargs or {} - return tool_cls(**kwargs) - except TypeError as exc: - raise RuntimeError(f"Failed to instantiate {tool_spec}: {exc}") from exc - - -def login( - client: TelnetClient, - *, - user: str, - password: str, - login_prompt: str, - banner_timeout: float = 10.0, - response_timeout: float = 2.0, - state: Optional[SessionState] = None, -) -> None: - """Handle the banner/prompt exchange and send credentials.""" - if login_prompt: - banner = client.read_until(login_prompt, timeout=banner_timeout) - else: - banner = client.receive(timeout=response_timeout) - if banner: - print(banner, end="" if banner.endswith("\n") else "\n") - if state: - state.update_output(banner) - - if user: - client.send(sanitize_for_mud(user)) - time.sleep(0.2) - if password: - client.send(sanitize_for_mud(password)) - - response = client.receive(timeout=response_timeout) - if response: - print(response, end="" if response.endswith("\n") else "\n") - if state: - state.update_output(response) - - -def interactive_session( - client: TelnetClient, - state: SessionState, - stop_event: Event, - *, - poll_interval: float = 0.2, - receive_timeout: float = 0.2, - exit_command: str, - tool_command: Optional[Callable[[str], None]] = None, - agent_command: Optional[Callable[[str], None]] = None, -) -> None: - """Keep the Telnet session running, proxying input/output until interrupted.""" - if exit_command: - print(f"Connected. Press Ctrl-C to exit (will send {exit_command!r}).") - else: - print("Connected. Press Ctrl-C to exit.") - - while not stop_event.is_set(): - incoming = client.receive(timeout=receive_timeout) - if incoming: - print(incoming, end="" if incoming.endswith("\n") else "\n") - state.update_output(incoming) - - readable, _, _ = select.select([sys.stdin], [], [], poll_interval) - if sys.stdin in readable: - line = sys.stdin.readline() - if line == "": - stop_event.set() - break - line = line.rstrip("\r\n") - if not line: - continue - lowered = line.lower() - if agent_command and lowered.startswith("#agent"): - parts = line.split(maxsplit=1) - if len(parts) == 1: - print("[Agent] Usage: #agent ") - else: - agent_command(parts[1]) - continue - if tool_command and lowered.startswith("#execute"): - parts = line.split(maxsplit=1) - if len(parts) == 1: - print("[Tool] Usage: #execute ") - else: - tool_command(parts[1]) - continue - state.send(client, line) - - -def graceful_shutdown( - client: TelnetClient, - exit_command: str, - *, - state: Optional[SessionState] = None, -) -> None: - if not exit_command: - return - try: - if state: - state.send(client, exit_command) - else: - client.send(sanitize_for_mud(exit_command)) - farewell = client.receive(timeout=2.0) - if farewell: - print(farewell, end="" if farewell.endswith("\n") else "\n") - if state: - state.update_output(farewell) - except Exception as exc: # pragma: no cover - best effort logging - print(f"Failed to send exit command: {exc}", file=sys.stderr) - - -def main() -> int: - load_env_file() - - host = require_env("MISTLE_HOST") - - port_raw = require_env("MISTLE_PORT") - try: - port = int(port_raw) - except ValueError as exc: - raise RuntimeError("MISTLE_PORT must be an integer") from exc - - user = os.environ.get("MISTLE_USER", "") - password = os.environ.get("MISTLE_PASSWORD", "") - login_prompt = os.environ.get("MISTLE_LOGIN_PROMPT", "") - exit_command = os.environ.get("MISTLE_EXIT_COMMAND", "") - tool_mode_env = os.environ.get("MISTLE_TOOL_MODE", "") - tool_mode = tool_mode_env.lower() in {"1", "true", "yes", "on"} - - tool_spec = os.environ.get("MISTLE_TOOL", "") - sideload_env = os.environ.get("MISTLE_SIDELOAD_TOOL", "") - sideload_specs = [spec.strip() for spec in sideload_env.split(",") if spec.strip()] +def _build_sideload_tools(specs: list[str]) -> list[tuple[str, Tool]]: sideload_tools: list[tuple[str, Tool]] = [] seen_sideloads: set[str] = set() - for spec in sideload_specs: + for spec in specs: lowered = spec.lower() if lowered in seen_sideloads: continue @@ -498,6 +60,32 @@ def main() -> int: print(f"[Tool] Failed to load sideload '{spec}': {exc}", file=sys.stderr) continue sideload_tools.append((spec, tool_instance)) + return sideload_tools + + +def _prime_agent(agent: Agent, output: str) -> None: + if not output: + return + observe = getattr(agent, "observe", None) + if callable(observe): + try: + observe(output) + except Exception as exc: # pragma: no cover - defensive + print(f"[Agent] observe failed: {exc}", file=sys.stderr) + + +def _join_threads(threads: list[Thread], *, timeout: float = 1.0) -> None: + for thread in threads: + thread.join(timeout=timeout) + + +def main() -> int: + load_env_file() + + connection = read_connection_settings() + tool_settings = read_tool_settings() + + sideload_tools = _build_sideload_tools(tool_settings.sideload_specs) state = SessionState() stop_event = Event() @@ -507,38 +95,26 @@ def main() -> int: sidecar_threads: list[Thread] = [] agent_threads: list[Thread] = [] - with TelnetClient(host=host, port=port, timeout=10.0) as client: + with TelnetClient(host=connection.host, port=connection.port, timeout=10.0) as client: login( client, - user=user, - password=password, - login_prompt=login_prompt, + user=connection.user, + password=connection.password, + login_prompt=connection.login_prompt, state=state, ) - if tool_mode: - tool = build_tool(tool_spec) - tool_thread = Thread( - target=run_tool_loop, - args=(client, state, tool, stop_event), - kwargs={"min_send_interval": 1.0}, - daemon=True, - ) - tool_thread.start() + if tool_settings.tool_mode: + tool = build_tool(tool_settings.tool_spec) + tool_thread = _start_tool_thread(client, state, tool, stop_event) if sideload_tools: for sidecar_spec, sidecar_tool in sideload_tools: - thread = Thread( - target=run_tool_loop, - args=(client, state, sidecar_tool, stop_event), - kwargs={"min_send_interval": 1.0}, - daemon=True, - ) + thread = _start_tool_thread(client, state, sidecar_tool, stop_event) sidecar_threads.append(thread) print( f"[Tool] Sideloading '{sidecar_spec}' ({sidecar_tool.__class__.__name__})" ) - thread.start() def run_ephemeral_tool(spec: str) -> None: spec = spec.strip() @@ -550,19 +126,16 @@ def main() -> int: except RuntimeError as exc: print(f"[Tool] Failed to load '{spec}': {exc}", file=sys.stderr) return - thread = Thread( - target=run_tool_loop, - args=(client, state, temp_tool, stop_event), - kwargs={ - "min_send_interval": 1.0, - "auto_stop": True, - "auto_stop_idle": 2.0, - }, - daemon=True, + thread = _start_tool_thread( + client, + state, + temp_tool, + stop_event, + auto_stop=True, + auto_stop_idle=AUTO_STOP_IDLE_SECONDS, ) ephemeral_tools.append(thread) print(f"[Tool] Executing {spec!r} once") - thread.start() def run_ephemeral_agent(spec: str) -> None: spec = spec.strip() @@ -575,14 +148,7 @@ def main() -> int: print(f"[Agent] Failed to configure '{spec}': {exc}", file=sys.stderr) return - last_output = state.snapshot_output() - if last_output: - observe = getattr(temp_agent, "observe", None) - if callable(observe): - try: - observe(last_output) - except Exception as exc: # pragma: no cover - defensive - print(f"[Agent] observe failed: {exc}", file=sys.stderr) + _prime_agent(temp_agent, state.snapshot_output()) def run_tool_instance(tool: Tool) -> bool: run_tool_loop( @@ -590,18 +156,11 @@ def main() -> int: state, tool, stop_event, - min_send_interval=1.0, + min_send_interval=DEFAULT_SEND_INTERVAL, auto_stop=True, - auto_stop_idle=2.0, + auto_stop_idle=AUTO_STOP_IDLE_SECONDS, ) - output_after = state.snapshot_output() - if output_after: - observe = getattr(temp_agent, "observe", None) - if callable(observe): - try: - observe(output_after) - except Exception as exc: # pragma: no cover - defensive - print(f"[Agent] observe failed: {exc}", file=sys.stderr) + _prime_agent(temp_agent, state.snapshot_output()) return True thread = Thread( @@ -625,8 +184,8 @@ def main() -> int: client, state=state, stop_event=stop_event, - exit_command=exit_command, - tool_command=None if tool_mode else run_ephemeral_tool, + exit_command=connection.exit_command, + tool_command=None if tool_settings.tool_mode else run_ephemeral_tool, agent_command=run_ephemeral_agent, ) except KeyboardInterrupt: @@ -636,15 +195,12 @@ def main() -> int: stop_event.set() if tool_thread: tool_thread.join(timeout=1.0) - for thread in sidecar_threads: - thread.join(timeout=1.0) - for thread in ephemeral_tools: - thread.join(timeout=1.0) - for thread in agent_threads: - thread.join(timeout=1.0) + _join_threads(sidecar_threads) + _join_threads(ephemeral_tools) + _join_threads(agent_threads) if interrupted: - graceful_shutdown(client, exit_command, state=state) + graceful_shutdown(client, connection.exit_command, state=state) return 0 diff --git a/mud_env.py b/mud_env.py new file mode 100644 index 0000000..c7df8ee --- /dev/null +++ b/mud_env.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + + +@dataclass(frozen=True) +class ConnectionSettings: + host: str + port: int + user: str + password: str + login_prompt: str + exit_command: str + + +@dataclass(frozen=True) +class ToolSettings: + tool_mode: bool + tool_spec: str + sideload_specs: list[str] + + +def load_env_file(path: str = ".env") -> None: + """Populate ``os.environ`` with key/value pairs from a dotenv file.""" + env_path = Path(path) + if not env_path.exists(): + return + for line in env_path.read_text().splitlines(): + stripped = line.strip() + if not stripped or stripped.startswith("#"): + continue + if "=" not in stripped: + continue + key, value = stripped.split("=", 1) + key = key.strip() + value = value.strip().strip('"').strip("'") + if key and key not in os.environ: + os.environ[key] = value + + +def require_env(key: str) -> str: + value = os.environ.get(key) + if value is None: + raise RuntimeError(f"Missing required environment variable: {key}") + return value + + +def parse_bool(raw: str) -> bool: + return raw.strip().lower() in {"1", "true", "yes", "on"} + + +def parse_port(raw: str) -> int: + try: + return int(raw) + except ValueError as exc: + raise RuntimeError("MISTLE_PORT must be an integer") from exc + + +def parse_csv(raw: str) -> list[str]: + return [value.strip() for value in raw.split(",") if value.strip()] + + +def read_connection_settings() -> ConnectionSettings: + host = require_env("MISTLE_HOST") + port = parse_port(require_env("MISTLE_PORT")) + user = os.environ.get("MISTLE_USER", "") + password = os.environ.get("MISTLE_PASSWORD", "") + login_prompt = os.environ.get("MISTLE_LOGIN_PROMPT", "") + exit_command = os.environ.get("MISTLE_EXIT_COMMAND", "") + return ConnectionSettings( + host=host, + port=port, + user=user, + password=password, + login_prompt=login_prompt, + exit_command=exit_command, + ) + + +def read_tool_settings() -> ToolSettings: + tool_mode = parse_bool(os.environ.get("MISTLE_TOOL_MODE", "")) + tool_spec = os.environ.get("MISTLE_TOOL", "") + sideload_specs = parse_csv(os.environ.get("MISTLE_SIDELOAD_TOOL", "")) + return ToolSettings( + tool_mode=tool_mode, + tool_spec=tool_spec, + sideload_specs=sideload_specs, + ) diff --git a/mud_session.py b/mud_session.py new file mode 100644 index 0000000..46cd127 --- /dev/null +++ b/mud_session.py @@ -0,0 +1,329 @@ +from __future__ import annotations + +import select +import sys +import time +import unicodedata +from collections import deque +from threading import Event, Lock +from typing import Callable, Deque, List, Optional + +from telnetclient import TelnetClient +from tools import Tool + + +_UMLAUT_TRANSLATION = str.maketrans( + { + "ä": "ae", + "ö": "oe", + "ü": "ue", + "Ä": "Ae", + "Ö": "Oe", + "Ü": "Ue", + "ß": "ss", + } +) + + +def sanitize_for_mud(text: str) -> str: + """Return ASCII-only text suitable for Silberland's input parser.""" + if not text: + return "" + + replaced = text.translate(_UMLAUT_TRANSLATION) + normalized = unicodedata.normalize("NFKD", replaced) + cleaned: list[str] = [] + for ch in normalized: + if unicodedata.combining(ch): + continue + code = ord(ch) + if 32 <= code <= 126: + cleaned.append(ch) + else: + cleaned.append("?") + return "".join(cleaned) + + +class SessionState: + """Share Telnet session state safely across threads.""" + + class _OutputListener: + def __init__(self) -> None: + self._queue: Deque[str] = deque() + self._lock = Lock() + self._event = Event() + + def publish(self, text: str) -> None: + with self._lock: + self._queue.append(text) + self._event.set() + + def wait(self, timeout: float) -> bool: + return self._event.wait(timeout) + + def drain(self) -> List[str]: + with self._lock: + items = list(self._queue) + self._queue.clear() + self._event.clear() + return items + + def close(self) -> None: + with self._lock: + self._queue.clear() + self._event.set() + + def __init__(self) -> None: + self._send_lock = Lock() + self._output_lock = Lock() + self._output_event = Event() + self._last_output = "" + self._last_tool_send = 0.0 + self._listeners: set[SessionState._OutputListener] = set() + self._listeners_lock = Lock() + + def send(self, client: TelnetClient, message: str) -> None: + sanitized = sanitize_for_mud(message) + with self._send_lock: + client.send(sanitized) + + def tool_send( + self, + client: TelnetClient, + message: str, + *, + min_interval: float, + stop_event: Event, + ) -> bool: + """Send on behalf of the tool while respecting a minimum cadence.""" + sanitized = sanitize_for_mud(message) + + while not stop_event.is_set(): + with self._send_lock: + now = time.time() + elapsed = now - self._last_tool_send + if elapsed >= min_interval: + client.send(sanitized) + self._last_tool_send = now + return True + wait_time = min_interval - elapsed + if wait_time <= 0: + continue + if stop_event.wait(wait_time): + break + return False + + def update_output(self, text: str) -> None: + if not text: + return + with self._output_lock: + self._last_output = text + with self._listeners_lock: + listeners = list(self._listeners) + for listener in listeners: + listener.publish(text) + self._output_event.set() + + def snapshot_output(self) -> str: + with self._output_lock: + return self._last_output + + def register_listener(self) -> "SessionState._OutputListener": + listener = SessionState._OutputListener() + with self._listeners_lock: + self._listeners.add(listener) + with self._output_lock: + last_output = self._last_output + if last_output: + listener.publish(last_output) + return listener + + def remove_listener(self, listener: "SessionState._OutputListener") -> None: + with self._listeners_lock: + existed = listener in self._listeners + if existed: + self._listeners.remove(listener) + if existed: + listener.close() + + def wait_for_output(self, timeout: float) -> bool: + return self._output_event.wait(timeout) + + def clear_output_event(self) -> None: + self._output_event.clear() + + +def run_tool_loop( + client: TelnetClient, + state: SessionState, + tool: Tool, + stop_event: Event, + *, + idle_delay: float = 0.5, + min_send_interval: float = 1.0, + auto_stop: bool = False, + auto_stop_idle: float = 2.0, +) -> None: + """Invoke *tool* whenever new output arrives and send its response.""" + idle_started: Optional[float] = None + listener = state.register_listener() + + def maybe_send() -> None: + nonlocal idle_started + try: + command = tool.decide() + except Exception as exc: # pragma: no cover - defensive logging + print(f"[Tool] Failed: {exc}", file=sys.stderr) + return + if not command: + return + sent = state.tool_send( + client, + command, + min_interval=min_send_interval, + stop_event=stop_event, + ) + if not sent: + return + idle_started = None + + try: + while not stop_event.is_set(): + maybe_send() + if stop_event.is_set(): + break + + triggered = listener.wait(timeout=idle_delay) + if stop_event.is_set(): + break + + if not triggered: + if auto_stop: + now = time.time() + if idle_started is None: + idle_started = now + elif now - idle_started >= auto_stop_idle: + break + continue + + outputs = listener.drain() + if not outputs: + continue + + idle_started = None + for chunk in outputs: + if not chunk: + continue + try: + tool.observe(chunk) + except Exception as exc: # pragma: no cover - defensive logging + print(f"[Tool] Failed during observe: {exc}", file=sys.stderr) + + maybe_send() + finally: + state.remove_listener(listener) + + +def login( + client: TelnetClient, + *, + user: str, + password: str, + login_prompt: str, + banner_timeout: float = 10.0, + response_timeout: float = 2.0, + state: Optional[SessionState] = None, +) -> None: + """Handle the banner/prompt exchange and send credentials.""" + if login_prompt: + banner = client.read_until(login_prompt, timeout=banner_timeout) + else: + banner = client.receive(timeout=response_timeout) + if banner: + print(banner, end="" if banner.endswith("\n") else "\n") + if state: + state.update_output(banner) + + if user: + client.send(sanitize_for_mud(user)) + time.sleep(0.2) + if password: + client.send(sanitize_for_mud(password)) + + response = client.receive(timeout=response_timeout) + if response: + print(response, end="" if response.endswith("\n") else "\n") + if state: + state.update_output(response) + + +def interactive_session( + client: TelnetClient, + state: SessionState, + stop_event: Event, + *, + poll_interval: float = 0.2, + receive_timeout: float = 0.2, + exit_command: str, + tool_command: Optional[Callable[[str], None]] = None, + agent_command: Optional[Callable[[str], None]] = None, +) -> None: + """Keep the Telnet session running, proxying input/output until interrupted.""" + if exit_command: + print(f"Connected. Press Ctrl-C to exit (will send {exit_command!r}).") + else: + print("Connected. Press Ctrl-C to exit.") + + while not stop_event.is_set(): + incoming = client.receive(timeout=receive_timeout) + if incoming: + print(incoming, end="" if incoming.endswith("\n") else "\n") + state.update_output(incoming) + + readable, _, _ = select.select([sys.stdin], [], [], poll_interval) + if sys.stdin in readable: + line = sys.stdin.readline() + if line == "": + stop_event.set() + break + line = line.rstrip("\r\n") + if not line: + continue + lowered = line.lower() + if agent_command and lowered.startswith("#agent"): + parts = line.split(maxsplit=1) + if len(parts) == 1: + print("[Agent] Usage: #agent ") + else: + agent_command(parts[1]) + continue + if tool_command and lowered.startswith("#execute"): + parts = line.split(maxsplit=1) + if len(parts) == 1: + print("[Tool] Usage: #execute ") + else: + tool_command(parts[1]) + continue + state.send(client, line) + + +def graceful_shutdown( + client: TelnetClient, + exit_command: str, + *, + state: Optional[SessionState] = None, +) -> None: + if not exit_command: + return + try: + if state: + state.send(client, exit_command) + else: + client.send(sanitize_for_mud(exit_command)) + farewell = client.receive(timeout=2.0) + if farewell: + print(farewell, end="" if farewell.endswith("\n") else "\n") + if state: + state.update_output(farewell) + except Exception as exc: # pragma: no cover - best effort logging + print(f"Failed to send exit command: {exc}", file=sys.stderr) diff --git a/mud_tools.py b/mud_tools.py new file mode 100644 index 0000000..25ecef3 --- /dev/null +++ b/mud_tools.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import os +from importlib import import_module +from typing import Optional, Type + +from tools import Tool + + +TOOL_REGISTRY = { + "look": { + "module": "tools", + "class": "LookTool", + "kwargs": {}, + "description": "Sends the 'schau' look command to refresh the room description.", + }, + "simple": { + "module": "tools", + "class": "LookTool", + "kwargs": {}, + "description": "Alias of 'look'. Sends the 'schau' look command to refresh the room description.", + }, + "move": { + "module": "movement_tool", + "class": "MovementTool", + "kwargs": {}, + "description": "Looks around and moves in one available direction, chosen randomly among unvisited exits.", + }, + "movement": { + "module": "movement_tool", + "class": "MovementTool", + "kwargs": {}, + "description": "Alias of 'move'. Looks around and moves in one available direction, chosen randomly among unvisited exits.", + }, + "explore": { + "module": "tools", + "class": "ExploreTool", + "kwargs": {}, + "description": "Sends 'schau' once, then 'untersuche ' for each noun found in the room description.", + }, + "communication": { + "module": "tools", + "class": "CommunicationTool", + "kwargs": {}, + "description": "Responds to private tells with a friendly greeting via 'teile mit ...'.", + }, + "intelligent": { + "module": "intelligent_tool", + "class": "IntelligentCommunicationTool", + "kwargs": { + "model": os.environ.get("MISTLE_LLM_MODEL", "mistral/mistral-small") + }, + "description": "Uses an LLM to craft a polite reply to private tells.", + }, + "intelligentcommunication": { + "module": "intelligent_tool", + "class": "IntelligentCommunicationTool", + "kwargs": { + "model": os.environ.get("MISTLE_LLM_MODEL", "mistral/mistral-small") + }, + "description": "Alias of 'intelligent'. Uses an LLM to craft a polite reply to private tells.", + }, +} + +TOOL_DESCRIPTIONS = { + name: meta["description"] for name, meta in TOOL_REGISTRY.items() +} + + +def build_tool(spec: str) -> Tool: + """Instantiate a tool based on configuration.""" + normalized = spec.strip() or "look" + key = normalized.lower() + + if key in TOOL_REGISTRY: + meta = TOOL_REGISTRY[key] + module_name = meta["module"] + class_name = meta["class"] + kwargs = meta.get("kwargs", {}) + try: + module = import_module(module_name) + tool_cls = getattr(module, class_name) + except AttributeError as exc: # pragma: no cover - optional dependency + raise RuntimeError(f"{class_name} is not available in tools module") from exc + tool = _instantiate_tool(tool_cls, normalized, kwargs) + model_name = kwargs.get("model") if kwargs else None + if model_name: + print(f"[Tool] Using LLM model: {model_name}") + return tool + + if ":" in normalized: + module_name, class_name = normalized.split(":", 1) + if not module_name or not class_name: + raise RuntimeError("MISTLE_TOOL must be in 'module:ClassName' format") + module = import_module(module_name) + tool_cls = getattr(module, class_name) + return _instantiate_tool(tool_cls, normalized) + + raise RuntimeError(f"Unknown tool spec '{spec}'.") + + +def _instantiate_tool( + tool_cls: Type[Tool], tool_spec: str, kwargs: Optional[dict] = None +) -> Tool: + if not issubclass(tool_cls, Tool): + raise RuntimeError(f"{tool_spec} is not a Tool subclass") + try: + kwargs = kwargs or {} + return tool_cls(**kwargs) + except TypeError as exc: + raise RuntimeError(f"Failed to instantiate {tool_spec}: {exc}") from exc diff --git a/textual_ui.py b/textual_ui.py index 0757491..430482c 100644 --- a/textual_ui.py +++ b/textual_ui.py @@ -10,15 +10,9 @@ from textual.app import App, ComposeResult from textual.containers import Horizontal, Vertical from textual.widgets import Footer, Header, Input, Label, Log -from app import ( - TOOL_DESCRIPTIONS, - SessionState, - build_tool, - load_env_file, - login, - require_env, - run_tool_loop, -) +from mud_env import load_env_file, read_connection_settings +from mud_session import SessionState, login, run_tool_loop +from mud_tools import TOOL_DESCRIPTIONS, build_tool from agent_runtime import run_agent from agents import build_agent from telnetclient import TelnetClient @@ -83,12 +77,13 @@ class MudUI(App): self._stop_event = Event() self._ui_thread = current_thread() load_env_file() - host = require_env("MISTLE_HOST") - port = int(require_env("MISTLE_PORT")) + connection = read_connection_settings() timeout = float(os.environ.get("MISTLE_TIMEOUT", "10")) self.state = SessionState() - self.client = TelnetClient(host=host, port=port, timeout=timeout) + self.client = TelnetClient( + host=connection.host, port=connection.port, timeout=timeout + ) try: self.client.connect() except Exception as exc: # pragma: no cover - network specific @@ -99,9 +94,9 @@ class MudUI(App): with redirect_stdout(writer), redirect_stderr(writer): login( self.client, - user=os.environ.get("MISTLE_USER", ""), - password=os.environ.get("MISTLE_PASSWORD", ""), - login_prompt=os.environ.get("MISTLE_LOGIN_PROMPT", ""), + user=connection.user, + password=connection.password, + login_prompt=connection.login_prompt, state=self.state, ) writer.flush() @@ -110,7 +105,7 @@ class MudUI(App): self.reader_thread.start() self.input.focus() - self.log_mud(f"Connected to {host}:{port}") + self.log_mud(f"Connected to {connection.host}:{connection.port}") def on_input_submitted(self, event: Input.Submitted) -> None: command = event.value.strip()