refactor: more readability

This commit is contained in:
Daniel Eder 2026-02-09 09:35:50 +01:00
parent f5a2192a25
commit d49b31a3e4
No known key found for this signature in database
GPG key ID: CE7446DFCE599F32
5 changed files with 626 additions and 545 deletions

614
app.py
View file

@ -1,493 +1,55 @@
import os from __future__ import annotations
import select
import sys import sys
import time from threading import Event, Thread
import unicodedata from typing import Optional
from collections import deque
from importlib import import_module
from pathlib import Path
from threading import Event, Lock, Thread
from typing import Callable, Deque, List, Optional, Type
from tools import Tool
from agents import Agent, build_agent
from agent_runtime import run_agent from agent_runtime import run_agent
from agents import Agent, build_agent
TOOL_REGISTRY = { from mud_env import load_env_file, read_connection_settings, read_tool_settings
"look": { from mud_session import (
"module": "tools", SessionState,
"class": "LookTool", graceful_shutdown,
"kwargs": {}, interactive_session,
"description": "Sends the 'schau' look command to refresh the room description.", login,
}, run_tool_loop,
"simple": {
"module": "tools",
"class": "LookTool",
"kwargs": {},
"description": "Alias of 'look'. Sends the 'schau' look command to refresh the room description.",
},
"move": {
"module": "movement_tool",
"class": "MovementTool",
"kwargs": {},
"description": "Looks around and moves in one available direction, chosen randomly among unvisited exits.",
},
"movement": {
"module": "movement_tool",
"class": "MovementTool",
"kwargs": {},
"description": "Alias of 'move'. Looks around and moves in one available direction, chosen randomly among unvisited exits.",
},
"explore": {
"module": "tools",
"class": "ExploreTool",
"kwargs": {},
"description": "Sends 'schau' once, then 'untersuche <noun>' for each noun found in the room description.",
},
"communication": {
"module": "tools",
"class": "CommunicationTool",
"kwargs": {},
"description": "Responds to private tells with a friendly greeting via 'teile <player> mit ...'.",
},
"intelligent": {
"module": "intelligent_tool",
"class": "IntelligentCommunicationTool",
"kwargs": {
"model": os.environ.get(
"MISTLE_LLM_MODEL", "mistral/mistral-small"
)
},
"description": "Uses an LLM to craft a polite reply to private tells.",
},
"intelligentcommunication": {
"module": "intelligent_tool",
"class": "IntelligentCommunicationTool",
"kwargs": {
"model": os.environ.get(
"MISTLE_LLM_MODEL", "mistral/mistral-small"
)
},
"description": "Alias of 'intelligent'. Uses an LLM to craft a polite reply to private tells.",
},
}
TOOL_DESCRIPTIONS = {
name: meta["description"] for name, meta in TOOL_REGISTRY.items()
}
from telnetclient import TelnetClient
_UMLAUT_TRANSLATION = str.maketrans(
{
"ä": "ae",
"ö": "oe",
"ü": "ue",
"Ä": "Ae",
"Ö": "Oe",
"Ü": "Ue",
"ß": "ss",
}
) )
from mud_tools import TOOL_DESCRIPTIONS, build_tool
from telnetclient import TelnetClient
from tools import Tool
def sanitize_for_mud(text: str) -> str: DEFAULT_SEND_INTERVAL = 1.0
"""Return ASCII-only text suitable for Silberland's input parser.""" AUTO_STOP_IDLE_SECONDS = 2.0
if not text:
return ""
replaced = text.translate(_UMLAUT_TRANSLATION)
normalized = unicodedata.normalize("NFKD", replaced)
cleaned: list[str] = []
for ch in normalized:
if unicodedata.combining(ch):
continue
code = ord(ch)
if 32 <= code <= 126:
cleaned.append(ch)
else:
cleaned.append("?")
return "".join(cleaned)
class SessionState: def _start_tool_thread(
"""Share Telnet session state safely across threads."""
class _OutputListener:
def __init__(self) -> None:
self._queue: Deque[str] = deque()
self._lock = Lock()
self._event = Event()
def publish(self, text: str) -> None:
with self._lock:
self._queue.append(text)
self._event.set()
def wait(self, timeout: float) -> bool:
return self._event.wait(timeout)
def drain(self) -> List[str]:
with self._lock:
items = list(self._queue)
self._queue.clear()
self._event.clear()
return items
def close(self) -> None:
with self._lock:
self._queue.clear()
self._event.set()
def __init__(self) -> None:
self._send_lock = Lock()
self._output_lock = Lock()
self._output_event = Event()
self._last_output = ""
self._last_tool_send = 0.0
self._listeners: set[SessionState._OutputListener] = set()
self._listeners_lock = Lock()
def send(self, client: TelnetClient, message: str) -> None:
sanitized = sanitize_for_mud(message)
with self._send_lock:
client.send(sanitized)
def tool_send(
self,
client: TelnetClient,
message: str,
*,
min_interval: float,
stop_event: Event,
) -> bool:
"""Send on behalf of the tool while respecting a minimum cadence."""
sanitized = sanitize_for_mud(message)
while not stop_event.is_set():
with self._send_lock:
now = time.time()
elapsed = now - self._last_tool_send
if elapsed >= min_interval:
client.send(sanitized)
self._last_tool_send = now
return True
wait_time = min_interval - elapsed
if wait_time <= 0:
continue
if stop_event.wait(wait_time):
break
return False
def update_output(self, text: str) -> None:
if not text:
return
with self._output_lock:
self._last_output = text
with self._listeners_lock:
listeners = list(self._listeners)
for listener in listeners:
listener.publish(text)
self._output_event.set()
def snapshot_output(self) -> str:
with self._output_lock:
return self._last_output
def register_listener(self) -> "SessionState._OutputListener":
listener = SessionState._OutputListener()
with self._listeners_lock:
self._listeners.add(listener)
with self._output_lock:
last_output = self._last_output
if last_output:
listener.publish(last_output)
return listener
def remove_listener(self, listener: "SessionState._OutputListener") -> None:
with self._listeners_lock:
existed = listener in self._listeners
if existed:
self._listeners.remove(listener)
if existed:
listener.close()
def wait_for_output(self, timeout: float) -> bool:
return self._output_event.wait(timeout)
def clear_output_event(self) -> None:
self._output_event.clear()
def run_tool_loop(
client: TelnetClient, client: TelnetClient,
state: SessionState, state: SessionState,
tool: Tool, tool: Tool,
stop_event: Event, stop_event: Event,
*, *,
idle_delay: float = 0.5,
min_send_interval: float = 1.0,
auto_stop: bool = False, auto_stop: bool = False,
auto_stop_idle: float = 2.0, auto_stop_idle: float = AUTO_STOP_IDLE_SECONDS,
) -> None: ) -> Thread:
"""Invoke *tool* whenever new output arrives and send its response.""" thread = Thread(
idle_started: Optional[float] = None target=run_tool_loop,
listener = state.register_listener() args=(client, state, tool, stop_event),
kwargs={
def maybe_send() -> None: "min_send_interval": DEFAULT_SEND_INTERVAL,
nonlocal idle_started "auto_stop": auto_stop,
try: "auto_stop_idle": auto_stop_idle,
command = tool.decide() },
except Exception as exc: # pragma: no cover - defensive logging daemon=True,
print(f"[Tool] Failed: {exc}", file=sys.stderr) )
return thread.start()
if not command: return thread
return
sent = state.tool_send(
client,
command,
min_interval=min_send_interval,
stop_event=stop_event,
)
if not sent:
return
idle_started = None
try:
while not stop_event.is_set():
maybe_send()
if stop_event.is_set():
break
triggered = listener.wait(timeout=idle_delay)
if stop_event.is_set():
break
if not triggered:
if auto_stop:
now = time.time()
if idle_started is None:
idle_started = now
elif now - idle_started >= auto_stop_idle:
break
continue
outputs = listener.drain()
if not outputs:
continue
idle_started = None
for chunk in outputs:
if not chunk:
continue
try:
tool.observe(chunk)
except Exception as exc: # pragma: no cover - defensive logging
print(f"[Tool] Failed during observe: {exc}", file=sys.stderr)
maybe_send()
finally:
state.remove_listener(listener)
def load_env_file(path: str = ".env") -> None:
"""Populate ``os.environ`` with key/value pairs from a dotenv file."""
env_path = Path(path)
if not env_path.exists():
return
for line in env_path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
def require_env(key: str) -> str: def _build_sideload_tools(specs: list[str]) -> list[tuple[str, Tool]]:
value = os.environ.get(key)
if value is None:
raise RuntimeError(f"Missing required environment variable: {key}")
return value
def build_tool(spec: str) -> Tool:
"""Instantiate a tool based on configuration."""
normalized = spec.strip() or "look"
key = normalized.lower()
if key in TOOL_REGISTRY:
meta = TOOL_REGISTRY[key]
module_name = meta["module"]
class_name = meta["class"]
kwargs = meta.get("kwargs", {})
try:
module = import_module(module_name)
tool_cls = getattr(module, class_name)
except AttributeError as exc: # pragma: no cover - optional dependency
raise RuntimeError(f"{class_name} is not available in tools module") from exc
tool = _instantiate_tool(tool_cls, normalized, kwargs)
model_name = kwargs.get("model") if kwargs else None
if model_name:
print(f"[Tool] Using LLM model: {model_name}")
return tool
if ":" in normalized:
module_name, class_name = normalized.split(":", 1)
if not module_name or not class_name:
raise RuntimeError("MISTLE_TOOL must be in 'module:ClassName' format")
module = import_module(module_name)
tool_cls = getattr(module, class_name)
return _instantiate_tool(tool_cls, normalized)
raise RuntimeError(f"Unknown tool spec '{spec}'.")
def _instantiate_tool(
tool_cls: Type[Tool], tool_spec: str, kwargs: Optional[dict] = None
) -> Tool:
if not issubclass(tool_cls, Tool):
raise RuntimeError(f"{tool_spec} is not a Tool subclass")
try:
kwargs = kwargs or {}
return tool_cls(**kwargs)
except TypeError as exc:
raise RuntimeError(f"Failed to instantiate {tool_spec}: {exc}") from exc
def login(
client: TelnetClient,
*,
user: str,
password: str,
login_prompt: str,
banner_timeout: float = 10.0,
response_timeout: float = 2.0,
state: Optional[SessionState] = None,
) -> None:
"""Handle the banner/prompt exchange and send credentials."""
if login_prompt:
banner = client.read_until(login_prompt, timeout=banner_timeout)
else:
banner = client.receive(timeout=response_timeout)
if banner:
print(banner, end="" if banner.endswith("\n") else "\n")
if state:
state.update_output(banner)
if user:
client.send(sanitize_for_mud(user))
time.sleep(0.2)
if password:
client.send(sanitize_for_mud(password))
response = client.receive(timeout=response_timeout)
if response:
print(response, end="" if response.endswith("\n") else "\n")
if state:
state.update_output(response)
def interactive_session(
client: TelnetClient,
state: SessionState,
stop_event: Event,
*,
poll_interval: float = 0.2,
receive_timeout: float = 0.2,
exit_command: str,
tool_command: Optional[Callable[[str], None]] = None,
agent_command: Optional[Callable[[str], None]] = None,
) -> None:
"""Keep the Telnet session running, proxying input/output until interrupted."""
if exit_command:
print(f"Connected. Press Ctrl-C to exit (will send {exit_command!r}).")
else:
print("Connected. Press Ctrl-C to exit.")
while not stop_event.is_set():
incoming = client.receive(timeout=receive_timeout)
if incoming:
print(incoming, end="" if incoming.endswith("\n") else "\n")
state.update_output(incoming)
readable, _, _ = select.select([sys.stdin], [], [], poll_interval)
if sys.stdin in readable:
line = sys.stdin.readline()
if line == "":
stop_event.set()
break
line = line.rstrip("\r\n")
if not line:
continue
lowered = line.lower()
if agent_command and lowered.startswith("#agent"):
parts = line.split(maxsplit=1)
if len(parts) == 1:
print("[Agent] Usage: #agent <agent_spec>")
else:
agent_command(parts[1])
continue
if tool_command and lowered.startswith("#execute"):
parts = line.split(maxsplit=1)
if len(parts) == 1:
print("[Tool] Usage: #execute <tool_spec>")
else:
tool_command(parts[1])
continue
state.send(client, line)
def graceful_shutdown(
client: TelnetClient,
exit_command: str,
*,
state: Optional[SessionState] = None,
) -> None:
if not exit_command:
return
try:
if state:
state.send(client, exit_command)
else:
client.send(sanitize_for_mud(exit_command))
farewell = client.receive(timeout=2.0)
if farewell:
print(farewell, end="" if farewell.endswith("\n") else "\n")
if state:
state.update_output(farewell)
except Exception as exc: # pragma: no cover - best effort logging
print(f"Failed to send exit command: {exc}", file=sys.stderr)
def main() -> int:
load_env_file()
host = require_env("MISTLE_HOST")
port_raw = require_env("MISTLE_PORT")
try:
port = int(port_raw)
except ValueError as exc:
raise RuntimeError("MISTLE_PORT must be an integer") from exc
user = os.environ.get("MISTLE_USER", "")
password = os.environ.get("MISTLE_PASSWORD", "")
login_prompt = os.environ.get("MISTLE_LOGIN_PROMPT", "")
exit_command = os.environ.get("MISTLE_EXIT_COMMAND", "")
tool_mode_env = os.environ.get("MISTLE_TOOL_MODE", "")
tool_mode = tool_mode_env.lower() in {"1", "true", "yes", "on"}
tool_spec = os.environ.get("MISTLE_TOOL", "")
sideload_env = os.environ.get("MISTLE_SIDELOAD_TOOL", "")
sideload_specs = [spec.strip() for spec in sideload_env.split(",") if spec.strip()]
sideload_tools: list[tuple[str, Tool]] = [] sideload_tools: list[tuple[str, Tool]] = []
seen_sideloads: set[str] = set() seen_sideloads: set[str] = set()
for spec in sideload_specs: for spec in specs:
lowered = spec.lower() lowered = spec.lower()
if lowered in seen_sideloads: if lowered in seen_sideloads:
continue continue
@ -498,6 +60,32 @@ def main() -> int:
print(f"[Tool] Failed to load sideload '{spec}': {exc}", file=sys.stderr) print(f"[Tool] Failed to load sideload '{spec}': {exc}", file=sys.stderr)
continue continue
sideload_tools.append((spec, tool_instance)) sideload_tools.append((spec, tool_instance))
return sideload_tools
def _prime_agent(agent: Agent, output: str) -> None:
if not output:
return
observe = getattr(agent, "observe", None)
if callable(observe):
try:
observe(output)
except Exception as exc: # pragma: no cover - defensive
print(f"[Agent] observe failed: {exc}", file=sys.stderr)
def _join_threads(threads: list[Thread], *, timeout: float = 1.0) -> None:
for thread in threads:
thread.join(timeout=timeout)
def main() -> int:
load_env_file()
connection = read_connection_settings()
tool_settings = read_tool_settings()
sideload_tools = _build_sideload_tools(tool_settings.sideload_specs)
state = SessionState() state = SessionState()
stop_event = Event() stop_event = Event()
@ -507,38 +95,26 @@ def main() -> int:
sidecar_threads: list[Thread] = [] sidecar_threads: list[Thread] = []
agent_threads: list[Thread] = [] agent_threads: list[Thread] = []
with TelnetClient(host=host, port=port, timeout=10.0) as client: with TelnetClient(host=connection.host, port=connection.port, timeout=10.0) as client:
login( login(
client, client,
user=user, user=connection.user,
password=password, password=connection.password,
login_prompt=login_prompt, login_prompt=connection.login_prompt,
state=state, state=state,
) )
if tool_mode: if tool_settings.tool_mode:
tool = build_tool(tool_spec) tool = build_tool(tool_settings.tool_spec)
tool_thread = Thread( tool_thread = _start_tool_thread(client, state, tool, stop_event)
target=run_tool_loop,
args=(client, state, tool, stop_event),
kwargs={"min_send_interval": 1.0},
daemon=True,
)
tool_thread.start()
if sideload_tools: if sideload_tools:
for sidecar_spec, sidecar_tool in sideload_tools: for sidecar_spec, sidecar_tool in sideload_tools:
thread = Thread( thread = _start_tool_thread(client, state, sidecar_tool, stop_event)
target=run_tool_loop,
args=(client, state, sidecar_tool, stop_event),
kwargs={"min_send_interval": 1.0},
daemon=True,
)
sidecar_threads.append(thread) sidecar_threads.append(thread)
print( print(
f"[Tool] Sideloading '{sidecar_spec}' ({sidecar_tool.__class__.__name__})" f"[Tool] Sideloading '{sidecar_spec}' ({sidecar_tool.__class__.__name__})"
) )
thread.start()
def run_ephemeral_tool(spec: str) -> None: def run_ephemeral_tool(spec: str) -> None:
spec = spec.strip() spec = spec.strip()
@ -550,19 +126,16 @@ def main() -> int:
except RuntimeError as exc: except RuntimeError as exc:
print(f"[Tool] Failed to load '{spec}': {exc}", file=sys.stderr) print(f"[Tool] Failed to load '{spec}': {exc}", file=sys.stderr)
return return
thread = Thread( thread = _start_tool_thread(
target=run_tool_loop, client,
args=(client, state, temp_tool, stop_event), state,
kwargs={ temp_tool,
"min_send_interval": 1.0, stop_event,
"auto_stop": True, auto_stop=True,
"auto_stop_idle": 2.0, auto_stop_idle=AUTO_STOP_IDLE_SECONDS,
},
daemon=True,
) )
ephemeral_tools.append(thread) ephemeral_tools.append(thread)
print(f"[Tool] Executing {spec!r} once") print(f"[Tool] Executing {spec!r} once")
thread.start()
def run_ephemeral_agent(spec: str) -> None: def run_ephemeral_agent(spec: str) -> None:
spec = spec.strip() spec = spec.strip()
@ -575,14 +148,7 @@ def main() -> int:
print(f"[Agent] Failed to configure '{spec}': {exc}", file=sys.stderr) print(f"[Agent] Failed to configure '{spec}': {exc}", file=sys.stderr)
return return
last_output = state.snapshot_output() _prime_agent(temp_agent, state.snapshot_output())
if last_output:
observe = getattr(temp_agent, "observe", None)
if callable(observe):
try:
observe(last_output)
except Exception as exc: # pragma: no cover - defensive
print(f"[Agent] observe failed: {exc}", file=sys.stderr)
def run_tool_instance(tool: Tool) -> bool: def run_tool_instance(tool: Tool) -> bool:
run_tool_loop( run_tool_loop(
@ -590,18 +156,11 @@ def main() -> int:
state, state,
tool, tool,
stop_event, stop_event,
min_send_interval=1.0, min_send_interval=DEFAULT_SEND_INTERVAL,
auto_stop=True, auto_stop=True,
auto_stop_idle=2.0, auto_stop_idle=AUTO_STOP_IDLE_SECONDS,
) )
output_after = state.snapshot_output() _prime_agent(temp_agent, state.snapshot_output())
if output_after:
observe = getattr(temp_agent, "observe", None)
if callable(observe):
try:
observe(output_after)
except Exception as exc: # pragma: no cover - defensive
print(f"[Agent] observe failed: {exc}", file=sys.stderr)
return True return True
thread = Thread( thread = Thread(
@ -625,8 +184,8 @@ def main() -> int:
client, client,
state=state, state=state,
stop_event=stop_event, stop_event=stop_event,
exit_command=exit_command, exit_command=connection.exit_command,
tool_command=None if tool_mode else run_ephemeral_tool, tool_command=None if tool_settings.tool_mode else run_ephemeral_tool,
agent_command=run_ephemeral_agent, agent_command=run_ephemeral_agent,
) )
except KeyboardInterrupt: except KeyboardInterrupt:
@ -636,15 +195,12 @@ def main() -> int:
stop_event.set() stop_event.set()
if tool_thread: if tool_thread:
tool_thread.join(timeout=1.0) tool_thread.join(timeout=1.0)
for thread in sidecar_threads: _join_threads(sidecar_threads)
thread.join(timeout=1.0) _join_threads(ephemeral_tools)
for thread in ephemeral_tools: _join_threads(agent_threads)
thread.join(timeout=1.0)
for thread in agent_threads:
thread.join(timeout=1.0)
if interrupted: if interrupted:
graceful_shutdown(client, exit_command, state=state) graceful_shutdown(client, connection.exit_command, state=state)
return 0 return 0

90
mud_env.py Normal file
View file

@ -0,0 +1,90 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class ConnectionSettings:
host: str
port: int
user: str
password: str
login_prompt: str
exit_command: str
@dataclass(frozen=True)
class ToolSettings:
tool_mode: bool
tool_spec: str
sideload_specs: list[str]
def load_env_file(path: str = ".env") -> None:
"""Populate ``os.environ`` with key/value pairs from a dotenv file."""
env_path = Path(path)
if not env_path.exists():
return
for line in env_path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
def require_env(key: str) -> str:
value = os.environ.get(key)
if value is None:
raise RuntimeError(f"Missing required environment variable: {key}")
return value
def parse_bool(raw: str) -> bool:
return raw.strip().lower() in {"1", "true", "yes", "on"}
def parse_port(raw: str) -> int:
try:
return int(raw)
except ValueError as exc:
raise RuntimeError("MISTLE_PORT must be an integer") from exc
def parse_csv(raw: str) -> list[str]:
return [value.strip() for value in raw.split(",") if value.strip()]
def read_connection_settings() -> ConnectionSettings:
host = require_env("MISTLE_HOST")
port = parse_port(require_env("MISTLE_PORT"))
user = os.environ.get("MISTLE_USER", "")
password = os.environ.get("MISTLE_PASSWORD", "")
login_prompt = os.environ.get("MISTLE_LOGIN_PROMPT", "")
exit_command = os.environ.get("MISTLE_EXIT_COMMAND", "")
return ConnectionSettings(
host=host,
port=port,
user=user,
password=password,
login_prompt=login_prompt,
exit_command=exit_command,
)
def read_tool_settings() -> ToolSettings:
tool_mode = parse_bool(os.environ.get("MISTLE_TOOL_MODE", ""))
tool_spec = os.environ.get("MISTLE_TOOL", "")
sideload_specs = parse_csv(os.environ.get("MISTLE_SIDELOAD_TOOL", ""))
return ToolSettings(
tool_mode=tool_mode,
tool_spec=tool_spec,
sideload_specs=sideload_specs,
)

329
mud_session.py Normal file
View file

@ -0,0 +1,329 @@
from __future__ import annotations
import select
import sys
import time
import unicodedata
from collections import deque
from threading import Event, Lock
from typing import Callable, Deque, List, Optional
from telnetclient import TelnetClient
from tools import Tool
_UMLAUT_TRANSLATION = str.maketrans(
{
"ä": "ae",
"ö": "oe",
"ü": "ue",
"Ä": "Ae",
"Ö": "Oe",
"Ü": "Ue",
"ß": "ss",
}
)
def sanitize_for_mud(text: str) -> str:
"""Return ASCII-only text suitable for Silberland's input parser."""
if not text:
return ""
replaced = text.translate(_UMLAUT_TRANSLATION)
normalized = unicodedata.normalize("NFKD", replaced)
cleaned: list[str] = []
for ch in normalized:
if unicodedata.combining(ch):
continue
code = ord(ch)
if 32 <= code <= 126:
cleaned.append(ch)
else:
cleaned.append("?")
return "".join(cleaned)
class SessionState:
"""Share Telnet session state safely across threads."""
class _OutputListener:
def __init__(self) -> None:
self._queue: Deque[str] = deque()
self._lock = Lock()
self._event = Event()
def publish(self, text: str) -> None:
with self._lock:
self._queue.append(text)
self._event.set()
def wait(self, timeout: float) -> bool:
return self._event.wait(timeout)
def drain(self) -> List[str]:
with self._lock:
items = list(self._queue)
self._queue.clear()
self._event.clear()
return items
def close(self) -> None:
with self._lock:
self._queue.clear()
self._event.set()
def __init__(self) -> None:
self._send_lock = Lock()
self._output_lock = Lock()
self._output_event = Event()
self._last_output = ""
self._last_tool_send = 0.0
self._listeners: set[SessionState._OutputListener] = set()
self._listeners_lock = Lock()
def send(self, client: TelnetClient, message: str) -> None:
sanitized = sanitize_for_mud(message)
with self._send_lock:
client.send(sanitized)
def tool_send(
self,
client: TelnetClient,
message: str,
*,
min_interval: float,
stop_event: Event,
) -> bool:
"""Send on behalf of the tool while respecting a minimum cadence."""
sanitized = sanitize_for_mud(message)
while not stop_event.is_set():
with self._send_lock:
now = time.time()
elapsed = now - self._last_tool_send
if elapsed >= min_interval:
client.send(sanitized)
self._last_tool_send = now
return True
wait_time = min_interval - elapsed
if wait_time <= 0:
continue
if stop_event.wait(wait_time):
break
return False
def update_output(self, text: str) -> None:
if not text:
return
with self._output_lock:
self._last_output = text
with self._listeners_lock:
listeners = list(self._listeners)
for listener in listeners:
listener.publish(text)
self._output_event.set()
def snapshot_output(self) -> str:
with self._output_lock:
return self._last_output
def register_listener(self) -> "SessionState._OutputListener":
listener = SessionState._OutputListener()
with self._listeners_lock:
self._listeners.add(listener)
with self._output_lock:
last_output = self._last_output
if last_output:
listener.publish(last_output)
return listener
def remove_listener(self, listener: "SessionState._OutputListener") -> None:
with self._listeners_lock:
existed = listener in self._listeners
if existed:
self._listeners.remove(listener)
if existed:
listener.close()
def wait_for_output(self, timeout: float) -> bool:
return self._output_event.wait(timeout)
def clear_output_event(self) -> None:
self._output_event.clear()
def run_tool_loop(
client: TelnetClient,
state: SessionState,
tool: Tool,
stop_event: Event,
*,
idle_delay: float = 0.5,
min_send_interval: float = 1.0,
auto_stop: bool = False,
auto_stop_idle: float = 2.0,
) -> None:
"""Invoke *tool* whenever new output arrives and send its response."""
idle_started: Optional[float] = None
listener = state.register_listener()
def maybe_send() -> None:
nonlocal idle_started
try:
command = tool.decide()
except Exception as exc: # pragma: no cover - defensive logging
print(f"[Tool] Failed: {exc}", file=sys.stderr)
return
if not command:
return
sent = state.tool_send(
client,
command,
min_interval=min_send_interval,
stop_event=stop_event,
)
if not sent:
return
idle_started = None
try:
while not stop_event.is_set():
maybe_send()
if stop_event.is_set():
break
triggered = listener.wait(timeout=idle_delay)
if stop_event.is_set():
break
if not triggered:
if auto_stop:
now = time.time()
if idle_started is None:
idle_started = now
elif now - idle_started >= auto_stop_idle:
break
continue
outputs = listener.drain()
if not outputs:
continue
idle_started = None
for chunk in outputs:
if not chunk:
continue
try:
tool.observe(chunk)
except Exception as exc: # pragma: no cover - defensive logging
print(f"[Tool] Failed during observe: {exc}", file=sys.stderr)
maybe_send()
finally:
state.remove_listener(listener)
def login(
client: TelnetClient,
*,
user: str,
password: str,
login_prompt: str,
banner_timeout: float = 10.0,
response_timeout: float = 2.0,
state: Optional[SessionState] = None,
) -> None:
"""Handle the banner/prompt exchange and send credentials."""
if login_prompt:
banner = client.read_until(login_prompt, timeout=banner_timeout)
else:
banner = client.receive(timeout=response_timeout)
if banner:
print(banner, end="" if banner.endswith("\n") else "\n")
if state:
state.update_output(banner)
if user:
client.send(sanitize_for_mud(user))
time.sleep(0.2)
if password:
client.send(sanitize_for_mud(password))
response = client.receive(timeout=response_timeout)
if response:
print(response, end="" if response.endswith("\n") else "\n")
if state:
state.update_output(response)
def interactive_session(
client: TelnetClient,
state: SessionState,
stop_event: Event,
*,
poll_interval: float = 0.2,
receive_timeout: float = 0.2,
exit_command: str,
tool_command: Optional[Callable[[str], None]] = None,
agent_command: Optional[Callable[[str], None]] = None,
) -> None:
"""Keep the Telnet session running, proxying input/output until interrupted."""
if exit_command:
print(f"Connected. Press Ctrl-C to exit (will send {exit_command!r}).")
else:
print("Connected. Press Ctrl-C to exit.")
while not stop_event.is_set():
incoming = client.receive(timeout=receive_timeout)
if incoming:
print(incoming, end="" if incoming.endswith("\n") else "\n")
state.update_output(incoming)
readable, _, _ = select.select([sys.stdin], [], [], poll_interval)
if sys.stdin in readable:
line = sys.stdin.readline()
if line == "":
stop_event.set()
break
line = line.rstrip("\r\n")
if not line:
continue
lowered = line.lower()
if agent_command and lowered.startswith("#agent"):
parts = line.split(maxsplit=1)
if len(parts) == 1:
print("[Agent] Usage: #agent <agent_spec>")
else:
agent_command(parts[1])
continue
if tool_command and lowered.startswith("#execute"):
parts = line.split(maxsplit=1)
if len(parts) == 1:
print("[Tool] Usage: #execute <tool_spec>")
else:
tool_command(parts[1])
continue
state.send(client, line)
def graceful_shutdown(
client: TelnetClient,
exit_command: str,
*,
state: Optional[SessionState] = None,
) -> None:
if not exit_command:
return
try:
if state:
state.send(client, exit_command)
else:
client.send(sanitize_for_mud(exit_command))
farewell = client.receive(timeout=2.0)
if farewell:
print(farewell, end="" if farewell.endswith("\n") else "\n")
if state:
state.update_output(farewell)
except Exception as exc: # pragma: no cover - best effort logging
print(f"Failed to send exit command: {exc}", file=sys.stderr)

111
mud_tools.py Normal file
View file

@ -0,0 +1,111 @@
from __future__ import annotations
import os
from importlib import import_module
from typing import Optional, Type
from tools import Tool
TOOL_REGISTRY = {
"look": {
"module": "tools",
"class": "LookTool",
"kwargs": {},
"description": "Sends the 'schau' look command to refresh the room description.",
},
"simple": {
"module": "tools",
"class": "LookTool",
"kwargs": {},
"description": "Alias of 'look'. Sends the 'schau' look command to refresh the room description.",
},
"move": {
"module": "movement_tool",
"class": "MovementTool",
"kwargs": {},
"description": "Looks around and moves in one available direction, chosen randomly among unvisited exits.",
},
"movement": {
"module": "movement_tool",
"class": "MovementTool",
"kwargs": {},
"description": "Alias of 'move'. Looks around and moves in one available direction, chosen randomly among unvisited exits.",
},
"explore": {
"module": "tools",
"class": "ExploreTool",
"kwargs": {},
"description": "Sends 'schau' once, then 'untersuche <noun>' for each noun found in the room description.",
},
"communication": {
"module": "tools",
"class": "CommunicationTool",
"kwargs": {},
"description": "Responds to private tells with a friendly greeting via 'teile <player> mit ...'.",
},
"intelligent": {
"module": "intelligent_tool",
"class": "IntelligentCommunicationTool",
"kwargs": {
"model": os.environ.get("MISTLE_LLM_MODEL", "mistral/mistral-small")
},
"description": "Uses an LLM to craft a polite reply to private tells.",
},
"intelligentcommunication": {
"module": "intelligent_tool",
"class": "IntelligentCommunicationTool",
"kwargs": {
"model": os.environ.get("MISTLE_LLM_MODEL", "mistral/mistral-small")
},
"description": "Alias of 'intelligent'. Uses an LLM to craft a polite reply to private tells.",
},
}
TOOL_DESCRIPTIONS = {
name: meta["description"] for name, meta in TOOL_REGISTRY.items()
}
def build_tool(spec: str) -> Tool:
"""Instantiate a tool based on configuration."""
normalized = spec.strip() or "look"
key = normalized.lower()
if key in TOOL_REGISTRY:
meta = TOOL_REGISTRY[key]
module_name = meta["module"]
class_name = meta["class"]
kwargs = meta.get("kwargs", {})
try:
module = import_module(module_name)
tool_cls = getattr(module, class_name)
except AttributeError as exc: # pragma: no cover - optional dependency
raise RuntimeError(f"{class_name} is not available in tools module") from exc
tool = _instantiate_tool(tool_cls, normalized, kwargs)
model_name = kwargs.get("model") if kwargs else None
if model_name:
print(f"[Tool] Using LLM model: {model_name}")
return tool
if ":" in normalized:
module_name, class_name = normalized.split(":", 1)
if not module_name or not class_name:
raise RuntimeError("MISTLE_TOOL must be in 'module:ClassName' format")
module = import_module(module_name)
tool_cls = getattr(module, class_name)
return _instantiate_tool(tool_cls, normalized)
raise RuntimeError(f"Unknown tool spec '{spec}'.")
def _instantiate_tool(
tool_cls: Type[Tool], tool_spec: str, kwargs: Optional[dict] = None
) -> Tool:
if not issubclass(tool_cls, Tool):
raise RuntimeError(f"{tool_spec} is not a Tool subclass")
try:
kwargs = kwargs or {}
return tool_cls(**kwargs)
except TypeError as exc:
raise RuntimeError(f"Failed to instantiate {tool_spec}: {exc}") from exc

View file

@ -10,15 +10,9 @@ from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical from textual.containers import Horizontal, Vertical
from textual.widgets import Footer, Header, Input, Label, Log from textual.widgets import Footer, Header, Input, Label, Log
from app import ( from mud_env import load_env_file, read_connection_settings
TOOL_DESCRIPTIONS, from mud_session import SessionState, login, run_tool_loop
SessionState, from mud_tools import TOOL_DESCRIPTIONS, build_tool
build_tool,
load_env_file,
login,
require_env,
run_tool_loop,
)
from agent_runtime import run_agent from agent_runtime import run_agent
from agents import build_agent from agents import build_agent
from telnetclient import TelnetClient from telnetclient import TelnetClient
@ -83,12 +77,13 @@ class MudUI(App):
self._stop_event = Event() self._stop_event = Event()
self._ui_thread = current_thread() self._ui_thread = current_thread()
load_env_file() load_env_file()
host = require_env("MISTLE_HOST") connection = read_connection_settings()
port = int(require_env("MISTLE_PORT"))
timeout = float(os.environ.get("MISTLE_TIMEOUT", "10")) timeout = float(os.environ.get("MISTLE_TIMEOUT", "10"))
self.state = SessionState() self.state = SessionState()
self.client = TelnetClient(host=host, port=port, timeout=timeout) self.client = TelnetClient(
host=connection.host, port=connection.port, timeout=timeout
)
try: try:
self.client.connect() self.client.connect()
except Exception as exc: # pragma: no cover - network specific except Exception as exc: # pragma: no cover - network specific
@ -99,9 +94,9 @@ class MudUI(App):
with redirect_stdout(writer), redirect_stderr(writer): with redirect_stdout(writer), redirect_stderr(writer):
login( login(
self.client, self.client,
user=os.environ.get("MISTLE_USER", ""), user=connection.user,
password=os.environ.get("MISTLE_PASSWORD", ""), password=connection.password,
login_prompt=os.environ.get("MISTLE_LOGIN_PROMPT", ""), login_prompt=connection.login_prompt,
state=self.state, state=self.state,
) )
writer.flush() writer.flush()
@ -110,7 +105,7 @@ class MudUI(App):
self.reader_thread.start() self.reader_thread.start()
self.input.focus() self.input.focus()
self.log_mud(f"Connected to {host}:{port}") self.log_mud(f"Connected to {connection.host}:{connection.port}")
def on_input_submitted(self, event: Input.Submitted) -> None: def on_input_submitted(self, event: Input.Submitted) -> None:
command = event.value.strip() command = event.value.strip()