Compare commits

..

No commits in common. "9af879eadc9d65f4f3e11c035ea10aa431c715f9" and "f5a2192a258a0059ba13ea6f631306e2a3050aee" have entirely different histories.

10 changed files with 661 additions and 969 deletions

View file

@ -67,10 +67,7 @@ Python-based Telnet helper for connecting to MUD servers, handling login flows,
Add guidance text after the type and optional modifiers separated by `|`, e.g. `#agent intelligent explore carefully|model=mistral/mistral-large-2407|delay=2`. The agent only calls built-in tools (`look`, `move`, `movement`, `explore`, `communication`, `intelligentcommunication`) and refuses unknown names.
9. Prefer a TUI? Run `python textual_ui.py` to open a two-pane interface with a bottom input field:
- Left pane: live MUD stream (server output plus outgoing actions from you, tools, and agents).
- Right pane: LLM reasoning plus tool/agent diagnostic output.
It accepts the same commands as the CLI (`#execute …`, `#agent …`, or raw MUD commands).
9. Prefer a TUI? Run `python textual_ui.py` to open a two-pane interface (MUD output on the left, agent output on the right) with an input box at the bottom. It accepts the same commands as the CLI (`#execute …`, `#agent …`, or raw MUD commands).
## Environment Variables
@ -90,12 +87,6 @@ All variables can be placed in the `.env` file (one `KEY=value` per line) or pro
| `MISTLE_LLM_MODEL` | ❌ | Override the `litellm` model used by the intelligent tool (defaults to `mistral/mistral-small-2407`). |
| `MISTRAL_API_KEY` | ❌ | API key used by `IntelligentCommunicationTool` (via `litellm`) when calling the `mistral/mistral-small-2407` model. |
## System Prompt File
- LLM-backed components now load their system prompt from `SYSTEM.md` in the project root.
- This applies to both the intelligent agent and the intelligent communication tool.
- If `SYSTEM.md` is missing or empty, the app falls back to built-in defaults and logs a warning.
## Tool Development
- Implement new tools by subclassing `tools.Tool` and overriding `observe()` and `decide()`.

View file

@ -1,18 +0,0 @@
You are Mistle, a autonomous Agent playing the German-speaking MUD silberland.
Your goal is to level up as much as possible and explore the world.
## Behavior
- Be concise, friendly, and informal.
- When replying to players, answer in German.
- Keep answers practical and in-character for a MUD helper bot.
- Use emoticons only when appropriate: :) ;) 8) :( ;( :P X) XD :D
## Tool Usage
Use tools only when appropriate. Think how you can solve problems without
using a tool. Do not use the "exploration" tool multiple times in a row.
## Mud Commands
The following MUD commands may be helpful to you
schau - get a description of the current environment
info - examine your own stats
hilfe - get additional help in general and about specific commands

614
app.py
View file

@ -1,55 +1,493 @@
from __future__ import annotations
import os
import select
import sys
from threading import Event, Thread
from typing import Optional
import time
import unicodedata
from collections import deque
from importlib import import_module
from pathlib import Path
from threading import Event, Lock, Thread
from typing import Callable, Deque, List, Optional, Type
from agent_runtime import run_agent
from agents import Agent, build_agent
from mud_env import load_env_file, read_connection_settings, read_tool_settings
from mud_session import (
SessionState,
graceful_shutdown,
interactive_session,
login,
run_tool_loop,
)
from mud_tools import TOOL_DESCRIPTIONS, build_tool
from telnetclient import TelnetClient
from tools import Tool
from agents import Agent, build_agent
from agent_runtime import run_agent
TOOL_REGISTRY = {
"look": {
"module": "tools",
"class": "LookTool",
"kwargs": {},
"description": "Sends the 'schau' look command to refresh the room description.",
},
"simple": {
"module": "tools",
"class": "LookTool",
"kwargs": {},
"description": "Alias of 'look'. Sends the 'schau' look command to refresh the room description.",
},
"move": {
"module": "movement_tool",
"class": "MovementTool",
"kwargs": {},
"description": "Looks around and moves in one available direction, chosen randomly among unvisited exits.",
},
"movement": {
"module": "movement_tool",
"class": "MovementTool",
"kwargs": {},
"description": "Alias of 'move'. Looks around and moves in one available direction, chosen randomly among unvisited exits.",
},
"explore": {
"module": "tools",
"class": "ExploreTool",
"kwargs": {},
"description": "Sends 'schau' once, then 'untersuche <noun>' for each noun found in the room description.",
},
"communication": {
"module": "tools",
"class": "CommunicationTool",
"kwargs": {},
"description": "Responds to private tells with a friendly greeting via 'teile <player> mit ...'.",
},
"intelligent": {
"module": "intelligent_tool",
"class": "IntelligentCommunicationTool",
"kwargs": {
"model": os.environ.get(
"MISTLE_LLM_MODEL", "mistral/mistral-small"
)
},
"description": "Uses an LLM to craft a polite reply to private tells.",
},
"intelligentcommunication": {
"module": "intelligent_tool",
"class": "IntelligentCommunicationTool",
"kwargs": {
"model": os.environ.get(
"MISTLE_LLM_MODEL", "mistral/mistral-small"
)
},
"description": "Alias of 'intelligent'. Uses an LLM to craft a polite reply to private tells.",
},
}
TOOL_DESCRIPTIONS = {
name: meta["description"] for name, meta in TOOL_REGISTRY.items()
}
from telnetclient import TelnetClient
DEFAULT_SEND_INTERVAL = 1.0
AUTO_STOP_IDLE_SECONDS = 2.0
_UMLAUT_TRANSLATION = str.maketrans(
{
"ä": "ae",
"ö": "oe",
"ü": "ue",
"Ä": "Ae",
"Ö": "Oe",
"Ü": "Ue",
"ß": "ss",
}
)
def _start_tool_thread(
def sanitize_for_mud(text: str) -> str:
"""Return ASCII-only text suitable for Silberland's input parser."""
if not text:
return ""
replaced = text.translate(_UMLAUT_TRANSLATION)
normalized = unicodedata.normalize("NFKD", replaced)
cleaned: list[str] = []
for ch in normalized:
if unicodedata.combining(ch):
continue
code = ord(ch)
if 32 <= code <= 126:
cleaned.append(ch)
else:
cleaned.append("?")
return "".join(cleaned)
class SessionState:
"""Share Telnet session state safely across threads."""
class _OutputListener:
def __init__(self) -> None:
self._queue: Deque[str] = deque()
self._lock = Lock()
self._event = Event()
def publish(self, text: str) -> None:
with self._lock:
self._queue.append(text)
self._event.set()
def wait(self, timeout: float) -> bool:
return self._event.wait(timeout)
def drain(self) -> List[str]:
with self._lock:
items = list(self._queue)
self._queue.clear()
self._event.clear()
return items
def close(self) -> None:
with self._lock:
self._queue.clear()
self._event.set()
def __init__(self) -> None:
self._send_lock = Lock()
self._output_lock = Lock()
self._output_event = Event()
self._last_output = ""
self._last_tool_send = 0.0
self._listeners: set[SessionState._OutputListener] = set()
self._listeners_lock = Lock()
def send(self, client: TelnetClient, message: str) -> None:
sanitized = sanitize_for_mud(message)
with self._send_lock:
client.send(sanitized)
def tool_send(
self,
client: TelnetClient,
message: str,
*,
min_interval: float,
stop_event: Event,
) -> bool:
"""Send on behalf of the tool while respecting a minimum cadence."""
sanitized = sanitize_for_mud(message)
while not stop_event.is_set():
with self._send_lock:
now = time.time()
elapsed = now - self._last_tool_send
if elapsed >= min_interval:
client.send(sanitized)
self._last_tool_send = now
return True
wait_time = min_interval - elapsed
if wait_time <= 0:
continue
if stop_event.wait(wait_time):
break
return False
def update_output(self, text: str) -> None:
if not text:
return
with self._output_lock:
self._last_output = text
with self._listeners_lock:
listeners = list(self._listeners)
for listener in listeners:
listener.publish(text)
self._output_event.set()
def snapshot_output(self) -> str:
with self._output_lock:
return self._last_output
def register_listener(self) -> "SessionState._OutputListener":
listener = SessionState._OutputListener()
with self._listeners_lock:
self._listeners.add(listener)
with self._output_lock:
last_output = self._last_output
if last_output:
listener.publish(last_output)
return listener
def remove_listener(self, listener: "SessionState._OutputListener") -> None:
with self._listeners_lock:
existed = listener in self._listeners
if existed:
self._listeners.remove(listener)
if existed:
listener.close()
def wait_for_output(self, timeout: float) -> bool:
return self._output_event.wait(timeout)
def clear_output_event(self) -> None:
self._output_event.clear()
def run_tool_loop(
client: TelnetClient,
state: SessionState,
tool: Tool,
stop_event: Event,
*,
idle_delay: float = 0.5,
min_send_interval: float = 1.0,
auto_stop: bool = False,
auto_stop_idle: float = AUTO_STOP_IDLE_SECONDS,
) -> Thread:
thread = Thread(
target=run_tool_loop,
args=(client, state, tool, stop_event),
kwargs={
"min_send_interval": DEFAULT_SEND_INTERVAL,
"auto_stop": auto_stop,
"auto_stop_idle": auto_stop_idle,
},
daemon=True,
auto_stop_idle: float = 2.0,
) -> None:
"""Invoke *tool* whenever new output arrives and send its response."""
idle_started: Optional[float] = None
listener = state.register_listener()
def maybe_send() -> None:
nonlocal idle_started
try:
command = tool.decide()
except Exception as exc: # pragma: no cover - defensive logging
print(f"[Tool] Failed: {exc}", file=sys.stderr)
return
if not command:
return
sent = state.tool_send(
client,
command,
min_interval=min_send_interval,
stop_event=stop_event,
)
thread.start()
return thread
if not sent:
return
idle_started = None
try:
while not stop_event.is_set():
maybe_send()
if stop_event.is_set():
break
triggered = listener.wait(timeout=idle_delay)
if stop_event.is_set():
break
if not triggered:
if auto_stop:
now = time.time()
if idle_started is None:
idle_started = now
elif now - idle_started >= auto_stop_idle:
break
continue
outputs = listener.drain()
if not outputs:
continue
idle_started = None
for chunk in outputs:
if not chunk:
continue
try:
tool.observe(chunk)
except Exception as exc: # pragma: no cover - defensive logging
print(f"[Tool] Failed during observe: {exc}", file=sys.stderr)
maybe_send()
finally:
state.remove_listener(listener)
def load_env_file(path: str = ".env") -> None:
"""Populate ``os.environ`` with key/value pairs from a dotenv file."""
env_path = Path(path)
if not env_path.exists():
return
for line in env_path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
def _build_sideload_tools(specs: list[str]) -> list[tuple[str, Tool]]:
def require_env(key: str) -> str:
value = os.environ.get(key)
if value is None:
raise RuntimeError(f"Missing required environment variable: {key}")
return value
def build_tool(spec: str) -> Tool:
"""Instantiate a tool based on configuration."""
normalized = spec.strip() or "look"
key = normalized.lower()
if key in TOOL_REGISTRY:
meta = TOOL_REGISTRY[key]
module_name = meta["module"]
class_name = meta["class"]
kwargs = meta.get("kwargs", {})
try:
module = import_module(module_name)
tool_cls = getattr(module, class_name)
except AttributeError as exc: # pragma: no cover - optional dependency
raise RuntimeError(f"{class_name} is not available in tools module") from exc
tool = _instantiate_tool(tool_cls, normalized, kwargs)
model_name = kwargs.get("model") if kwargs else None
if model_name:
print(f"[Tool] Using LLM model: {model_name}")
return tool
if ":" in normalized:
module_name, class_name = normalized.split(":", 1)
if not module_name or not class_name:
raise RuntimeError("MISTLE_TOOL must be in 'module:ClassName' format")
module = import_module(module_name)
tool_cls = getattr(module, class_name)
return _instantiate_tool(tool_cls, normalized)
raise RuntimeError(f"Unknown tool spec '{spec}'.")
def _instantiate_tool(
tool_cls: Type[Tool], tool_spec: str, kwargs: Optional[dict] = None
) -> Tool:
if not issubclass(tool_cls, Tool):
raise RuntimeError(f"{tool_spec} is not a Tool subclass")
try:
kwargs = kwargs or {}
return tool_cls(**kwargs)
except TypeError as exc:
raise RuntimeError(f"Failed to instantiate {tool_spec}: {exc}") from exc
def login(
client: TelnetClient,
*,
user: str,
password: str,
login_prompt: str,
banner_timeout: float = 10.0,
response_timeout: float = 2.0,
state: Optional[SessionState] = None,
) -> None:
"""Handle the banner/prompt exchange and send credentials."""
if login_prompt:
banner = client.read_until(login_prompt, timeout=banner_timeout)
else:
banner = client.receive(timeout=response_timeout)
if banner:
print(banner, end="" if banner.endswith("\n") else "\n")
if state:
state.update_output(banner)
if user:
client.send(sanitize_for_mud(user))
time.sleep(0.2)
if password:
client.send(sanitize_for_mud(password))
response = client.receive(timeout=response_timeout)
if response:
print(response, end="" if response.endswith("\n") else "\n")
if state:
state.update_output(response)
def interactive_session(
client: TelnetClient,
state: SessionState,
stop_event: Event,
*,
poll_interval: float = 0.2,
receive_timeout: float = 0.2,
exit_command: str,
tool_command: Optional[Callable[[str], None]] = None,
agent_command: Optional[Callable[[str], None]] = None,
) -> None:
"""Keep the Telnet session running, proxying input/output until interrupted."""
if exit_command:
print(f"Connected. Press Ctrl-C to exit (will send {exit_command!r}).")
else:
print("Connected. Press Ctrl-C to exit.")
while not stop_event.is_set():
incoming = client.receive(timeout=receive_timeout)
if incoming:
print(incoming, end="" if incoming.endswith("\n") else "\n")
state.update_output(incoming)
readable, _, _ = select.select([sys.stdin], [], [], poll_interval)
if sys.stdin in readable:
line = sys.stdin.readline()
if line == "":
stop_event.set()
break
line = line.rstrip("\r\n")
if not line:
continue
lowered = line.lower()
if agent_command and lowered.startswith("#agent"):
parts = line.split(maxsplit=1)
if len(parts) == 1:
print("[Agent] Usage: #agent <agent_spec>")
else:
agent_command(parts[1])
continue
if tool_command and lowered.startswith("#execute"):
parts = line.split(maxsplit=1)
if len(parts) == 1:
print("[Tool] Usage: #execute <tool_spec>")
else:
tool_command(parts[1])
continue
state.send(client, line)
def graceful_shutdown(
client: TelnetClient,
exit_command: str,
*,
state: Optional[SessionState] = None,
) -> None:
if not exit_command:
return
try:
if state:
state.send(client, exit_command)
else:
client.send(sanitize_for_mud(exit_command))
farewell = client.receive(timeout=2.0)
if farewell:
print(farewell, end="" if farewell.endswith("\n") else "\n")
if state:
state.update_output(farewell)
except Exception as exc: # pragma: no cover - best effort logging
print(f"Failed to send exit command: {exc}", file=sys.stderr)
def main() -> int:
load_env_file()
host = require_env("MISTLE_HOST")
port_raw = require_env("MISTLE_PORT")
try:
port = int(port_raw)
except ValueError as exc:
raise RuntimeError("MISTLE_PORT must be an integer") from exc
user = os.environ.get("MISTLE_USER", "")
password = os.environ.get("MISTLE_PASSWORD", "")
login_prompt = os.environ.get("MISTLE_LOGIN_PROMPT", "")
exit_command = os.environ.get("MISTLE_EXIT_COMMAND", "")
tool_mode_env = os.environ.get("MISTLE_TOOL_MODE", "")
tool_mode = tool_mode_env.lower() in {"1", "true", "yes", "on"}
tool_spec = os.environ.get("MISTLE_TOOL", "")
sideload_env = os.environ.get("MISTLE_SIDELOAD_TOOL", "")
sideload_specs = [spec.strip() for spec in sideload_env.split(",") if spec.strip()]
sideload_tools: list[tuple[str, Tool]] = []
seen_sideloads: set[str] = set()
for spec in specs:
for spec in sideload_specs:
lowered = spec.lower()
if lowered in seen_sideloads:
continue
@ -60,32 +498,6 @@ def _build_sideload_tools(specs: list[str]) -> list[tuple[str, Tool]]:
print(f"[Tool] Failed to load sideload '{spec}': {exc}", file=sys.stderr)
continue
sideload_tools.append((spec, tool_instance))
return sideload_tools
def _prime_agent(agent: Agent, output: str) -> None:
if not output:
return
observe = getattr(agent, "observe", None)
if callable(observe):
try:
observe(output)
except Exception as exc: # pragma: no cover - defensive
print(f"[Agent] observe failed: {exc}", file=sys.stderr)
def _join_threads(threads: list[Thread], *, timeout: float = 1.0) -> None:
for thread in threads:
thread.join(timeout=timeout)
def main() -> int:
load_env_file()
connection = read_connection_settings()
tool_settings = read_tool_settings()
sideload_tools = _build_sideload_tools(tool_settings.sideload_specs)
state = SessionState()
stop_event = Event()
@ -95,26 +507,38 @@ def main() -> int:
sidecar_threads: list[Thread] = []
agent_threads: list[Thread] = []
with TelnetClient(host=connection.host, port=connection.port, timeout=10.0) as client:
with TelnetClient(host=host, port=port, timeout=10.0) as client:
login(
client,
user=connection.user,
password=connection.password,
login_prompt=connection.login_prompt,
user=user,
password=password,
login_prompt=login_prompt,
state=state,
)
if tool_settings.tool_mode:
tool = build_tool(tool_settings.tool_spec)
tool_thread = _start_tool_thread(client, state, tool, stop_event)
if tool_mode:
tool = build_tool(tool_spec)
tool_thread = Thread(
target=run_tool_loop,
args=(client, state, tool, stop_event),
kwargs={"min_send_interval": 1.0},
daemon=True,
)
tool_thread.start()
if sideload_tools:
for sidecar_spec, sidecar_tool in sideload_tools:
thread = _start_tool_thread(client, state, sidecar_tool, stop_event)
thread = Thread(
target=run_tool_loop,
args=(client, state, sidecar_tool, stop_event),
kwargs={"min_send_interval": 1.0},
daemon=True,
)
sidecar_threads.append(thread)
print(
f"[Tool] Sideloading '{sidecar_spec}' ({sidecar_tool.__class__.__name__})"
)
thread.start()
def run_ephemeral_tool(spec: str) -> None:
spec = spec.strip()
@ -126,16 +550,19 @@ def main() -> int:
except RuntimeError as exc:
print(f"[Tool] Failed to load '{spec}': {exc}", file=sys.stderr)
return
thread = _start_tool_thread(
client,
state,
temp_tool,
stop_event,
auto_stop=True,
auto_stop_idle=AUTO_STOP_IDLE_SECONDS,
thread = Thread(
target=run_tool_loop,
args=(client, state, temp_tool, stop_event),
kwargs={
"min_send_interval": 1.0,
"auto_stop": True,
"auto_stop_idle": 2.0,
},
daemon=True,
)
ephemeral_tools.append(thread)
print(f"[Tool] Executing {spec!r} once")
thread.start()
def run_ephemeral_agent(spec: str) -> None:
spec = spec.strip()
@ -148,7 +575,14 @@ def main() -> int:
print(f"[Agent] Failed to configure '{spec}': {exc}", file=sys.stderr)
return
_prime_agent(temp_agent, state.snapshot_output())
last_output = state.snapshot_output()
if last_output:
observe = getattr(temp_agent, "observe", None)
if callable(observe):
try:
observe(last_output)
except Exception as exc: # pragma: no cover - defensive
print(f"[Agent] observe failed: {exc}", file=sys.stderr)
def run_tool_instance(tool: Tool) -> bool:
run_tool_loop(
@ -156,11 +590,18 @@ def main() -> int:
state,
tool,
stop_event,
min_send_interval=DEFAULT_SEND_INTERVAL,
min_send_interval=1.0,
auto_stop=True,
auto_stop_idle=AUTO_STOP_IDLE_SECONDS,
auto_stop_idle=2.0,
)
_prime_agent(temp_agent, state.snapshot_output())
output_after = state.snapshot_output()
if output_after:
observe = getattr(temp_agent, "observe", None)
if callable(observe):
try:
observe(output_after)
except Exception as exc: # pragma: no cover - defensive
print(f"[Agent] observe failed: {exc}", file=sys.stderr)
return True
thread = Thread(
@ -184,8 +625,8 @@ def main() -> int:
client,
state=state,
stop_event=stop_event,
exit_command=connection.exit_command,
tool_command=None if tool_settings.tool_mode else run_ephemeral_tool,
exit_command=exit_command,
tool_command=None if tool_mode else run_ephemeral_tool,
agent_command=run_ephemeral_agent,
)
except KeyboardInterrupt:
@ -195,12 +636,15 @@ def main() -> int:
stop_event.set()
if tool_thread:
tool_thread.join(timeout=1.0)
_join_threads(sidecar_threads)
_join_threads(ephemeral_tools)
_join_threads(agent_threads)
for thread in sidecar_threads:
thread.join(timeout=1.0)
for thread in ephemeral_tools:
thread.join(timeout=1.0)
for thread in agent_threads:
thread.join(timeout=1.0)
if interrupted:
graceful_shutdown(client, connection.exit_command, state=state)
graceful_shutdown(client, exit_command, state=state)
return 0

View file

@ -14,24 +14,19 @@ except ImportError: # pragma: no cover
completion = None # type: ignore[assignment]
from agents import Agent
from system_prompt import load_system_prompt
ToolInvoker = Callable[[str], bool]
CommandExecutor = Callable[[str], None]
DEFAULT_AGENT_SYSTEM_PROMPT = (
"You are Mistle, a helpful MUD assistant. "
"You can either call tools or send plain commands to the MUD."
)
@dataclass
class IntelligentAgent(Agent):
"""LLM-driven agent that decides between tools and raw commands."""
model: str = "mistral/mistral-large-2407"
system_prompt: str = field(
default_factory=lambda: load_system_prompt(default=DEFAULT_AGENT_SYSTEM_PROMPT)
system_prompt: str = (
"You are Mistle, a helpful MUD assistant. "
"You can either call tools or send plain commands to the MUD."
)
temperature: float = 0.7
max_output_tokens: int = 200

View file

@ -11,27 +11,20 @@ try:
except ImportError: # pragma: no cover - optional dependency
completion = None # type: ignore[assignment]
from system_prompt import load_system_prompt
from tools import Tool
DEFAULT_COMMUNICATION_SYSTEM_PROMPT = (
"Du bist Mistle, ein hilfsbereiter MUD-Bot. "
"Antworte freundlich und knapp in deutscher Sprache."
"Sprich informell und freundlich."
"Antworte immer auf Deutsch."
"Verwende emoticons, wenn es angebracht ist: :) ;) 8) :( ;( :P X) XD :D"
)
@dataclass
class IntelligentCommunicationTool(Tool):
"""Tool that uses a language model to answer private tells."""
model: str = "mistral/mistral-tiny"
system_prompt: str = field(
default_factory=lambda: load_system_prompt(
default=DEFAULT_COMMUNICATION_SYSTEM_PROMPT
)
system_prompt: str = (
"Du bist Mistle, ein hilfsbereiter MUD-Bot. "
"Antworte freundlich und knapp in deutscher Sprache."
"Sprich informell und freundlich."
"Antworte immer auf Deutsch."
"Verwende emoticons, wenn es angebracht ist: :) ;) 8) :( ;( :P X) XD :D"
)
temperature: float = 0.7
max_output_tokens: int = 120

View file

@ -1,90 +0,0 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class ConnectionSettings:
host: str
port: int
user: str
password: str
login_prompt: str
exit_command: str
@dataclass(frozen=True)
class ToolSettings:
tool_mode: bool
tool_spec: str
sideload_specs: list[str]
def load_env_file(path: str = ".env") -> None:
"""Populate ``os.environ`` with key/value pairs from a dotenv file."""
env_path = Path(path)
if not env_path.exists():
return
for line in env_path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
def require_env(key: str) -> str:
value = os.environ.get(key)
if value is None:
raise RuntimeError(f"Missing required environment variable: {key}")
return value
def parse_bool(raw: str) -> bool:
return raw.strip().lower() in {"1", "true", "yes", "on"}
def parse_port(raw: str) -> int:
try:
return int(raw)
except ValueError as exc:
raise RuntimeError("MISTLE_PORT must be an integer") from exc
def parse_csv(raw: str) -> list[str]:
return [value.strip() for value in raw.split(",") if value.strip()]
def read_connection_settings() -> ConnectionSettings:
host = require_env("MISTLE_HOST")
port = parse_port(require_env("MISTLE_PORT"))
user = os.environ.get("MISTLE_USER", "")
password = os.environ.get("MISTLE_PASSWORD", "")
login_prompt = os.environ.get("MISTLE_LOGIN_PROMPT", "")
exit_command = os.environ.get("MISTLE_EXIT_COMMAND", "")
return ConnectionSettings(
host=host,
port=port,
user=user,
password=password,
login_prompt=login_prompt,
exit_command=exit_command,
)
def read_tool_settings() -> ToolSettings:
tool_mode = parse_bool(os.environ.get("MISTLE_TOOL_MODE", ""))
tool_spec = os.environ.get("MISTLE_TOOL", "")
sideload_specs = parse_csv(os.environ.get("MISTLE_SIDELOAD_TOOL", ""))
return ToolSettings(
tool_mode=tool_mode,
tool_spec=tool_spec,
sideload_specs=sideload_specs,
)

View file

@ -1,329 +0,0 @@
from __future__ import annotations
import select
import sys
import time
import unicodedata
from collections import deque
from threading import Event, Lock
from typing import Callable, Deque, List, Optional
from telnetclient import TelnetClient
from tools import Tool
_UMLAUT_TRANSLATION = str.maketrans(
{
"ä": "ae",
"ö": "oe",
"ü": "ue",
"Ä": "Ae",
"Ö": "Oe",
"Ü": "Ue",
"ß": "ss",
}
)
def sanitize_for_mud(text: str) -> str:
"""Return ASCII-only text suitable for Silberland's input parser."""
if not text:
return ""
replaced = text.translate(_UMLAUT_TRANSLATION)
normalized = unicodedata.normalize("NFKD", replaced)
cleaned: list[str] = []
for ch in normalized:
if unicodedata.combining(ch):
continue
code = ord(ch)
if 32 <= code <= 126:
cleaned.append(ch)
else:
cleaned.append("?")
return "".join(cleaned)
class SessionState:
"""Share Telnet session state safely across threads."""
class _OutputListener:
def __init__(self) -> None:
self._queue: Deque[str] = deque()
self._lock = Lock()
self._event = Event()
def publish(self, text: str) -> None:
with self._lock:
self._queue.append(text)
self._event.set()
def wait(self, timeout: float) -> bool:
return self._event.wait(timeout)
def drain(self) -> List[str]:
with self._lock:
items = list(self._queue)
self._queue.clear()
self._event.clear()
return items
def close(self) -> None:
with self._lock:
self._queue.clear()
self._event.set()
def __init__(self) -> None:
self._send_lock = Lock()
self._output_lock = Lock()
self._output_event = Event()
self._last_output = ""
self._last_tool_send = 0.0
self._listeners: set[SessionState._OutputListener] = set()
self._listeners_lock = Lock()
def send(self, client: TelnetClient, message: str) -> None:
sanitized = sanitize_for_mud(message)
with self._send_lock:
client.send(sanitized)
def tool_send(
self,
client: TelnetClient,
message: str,
*,
min_interval: float,
stop_event: Event,
) -> bool:
"""Send on behalf of the tool while respecting a minimum cadence."""
sanitized = sanitize_for_mud(message)
while not stop_event.is_set():
with self._send_lock:
now = time.time()
elapsed = now - self._last_tool_send
if elapsed >= min_interval:
client.send(sanitized)
self._last_tool_send = now
return True
wait_time = min_interval - elapsed
if wait_time <= 0:
continue
if stop_event.wait(wait_time):
break
return False
def update_output(self, text: str) -> None:
if not text:
return
with self._output_lock:
self._last_output = text
with self._listeners_lock:
listeners = list(self._listeners)
for listener in listeners:
listener.publish(text)
self._output_event.set()
def snapshot_output(self) -> str:
with self._output_lock:
return self._last_output
def register_listener(self) -> "SessionState._OutputListener":
listener = SessionState._OutputListener()
with self._listeners_lock:
self._listeners.add(listener)
with self._output_lock:
last_output = self._last_output
if last_output:
listener.publish(last_output)
return listener
def remove_listener(self, listener: "SessionState._OutputListener") -> None:
with self._listeners_lock:
existed = listener in self._listeners
if existed:
self._listeners.remove(listener)
if existed:
listener.close()
def wait_for_output(self, timeout: float) -> bool:
return self._output_event.wait(timeout)
def clear_output_event(self) -> None:
self._output_event.clear()
def run_tool_loop(
client: TelnetClient,
state: SessionState,
tool: Tool,
stop_event: Event,
*,
idle_delay: float = 0.5,
min_send_interval: float = 1.0,
auto_stop: bool = False,
auto_stop_idle: float = 2.0,
) -> None:
"""Invoke *tool* whenever new output arrives and send its response."""
idle_started: Optional[float] = None
listener = state.register_listener()
def maybe_send() -> None:
nonlocal idle_started
try:
command = tool.decide()
except Exception as exc: # pragma: no cover - defensive logging
print(f"[Tool] Failed: {exc}", file=sys.stderr)
return
if not command:
return
sent = state.tool_send(
client,
command,
min_interval=min_send_interval,
stop_event=stop_event,
)
if not sent:
return
idle_started = None
try:
while not stop_event.is_set():
maybe_send()
if stop_event.is_set():
break
triggered = listener.wait(timeout=idle_delay)
if stop_event.is_set():
break
if not triggered:
if auto_stop:
now = time.time()
if idle_started is None:
idle_started = now
elif now - idle_started >= auto_stop_idle:
break
continue
outputs = listener.drain()
if not outputs:
continue
idle_started = None
for chunk in outputs:
if not chunk:
continue
try:
tool.observe(chunk)
except Exception as exc: # pragma: no cover - defensive logging
print(f"[Tool] Failed during observe: {exc}", file=sys.stderr)
maybe_send()
finally:
state.remove_listener(listener)
def login(
client: TelnetClient,
*,
user: str,
password: str,
login_prompt: str,
banner_timeout: float = 10.0,
response_timeout: float = 2.0,
state: Optional[SessionState] = None,
) -> None:
"""Handle the banner/prompt exchange and send credentials."""
if login_prompt:
banner = client.read_until(login_prompt, timeout=banner_timeout)
else:
banner = client.receive(timeout=response_timeout)
if banner:
print(banner, end="" if banner.endswith("\n") else "\n")
if state:
state.update_output(banner)
if user:
client.send(sanitize_for_mud(user))
time.sleep(0.2)
if password:
client.send(sanitize_for_mud(password))
response = client.receive(timeout=response_timeout)
if response:
print(response, end="" if response.endswith("\n") else "\n")
if state:
state.update_output(response)
def interactive_session(
client: TelnetClient,
state: SessionState,
stop_event: Event,
*,
poll_interval: float = 0.2,
receive_timeout: float = 0.2,
exit_command: str,
tool_command: Optional[Callable[[str], None]] = None,
agent_command: Optional[Callable[[str], None]] = None,
) -> None:
"""Keep the Telnet session running, proxying input/output until interrupted."""
if exit_command:
print(f"Connected. Press Ctrl-C to exit (will send {exit_command!r}).")
else:
print("Connected. Press Ctrl-C to exit.")
while not stop_event.is_set():
incoming = client.receive(timeout=receive_timeout)
if incoming:
print(incoming, end="" if incoming.endswith("\n") else "\n")
state.update_output(incoming)
readable, _, _ = select.select([sys.stdin], [], [], poll_interval)
if sys.stdin in readable:
line = sys.stdin.readline()
if line == "":
stop_event.set()
break
line = line.rstrip("\r\n")
if not line:
continue
lowered = line.lower()
if agent_command and lowered.startswith("#agent"):
parts = line.split(maxsplit=1)
if len(parts) == 1:
print("[Agent] Usage: #agent <agent_spec>")
else:
agent_command(parts[1])
continue
if tool_command and lowered.startswith("#execute"):
parts = line.split(maxsplit=1)
if len(parts) == 1:
print("[Tool] Usage: #execute <tool_spec>")
else:
tool_command(parts[1])
continue
state.send(client, line)
def graceful_shutdown(
client: TelnetClient,
exit_command: str,
*,
state: Optional[SessionState] = None,
) -> None:
if not exit_command:
return
try:
if state:
state.send(client, exit_command)
else:
client.send(sanitize_for_mud(exit_command))
farewell = client.receive(timeout=2.0)
if farewell:
print(farewell, end="" if farewell.endswith("\n") else "\n")
if state:
state.update_output(farewell)
except Exception as exc: # pragma: no cover - best effort logging
print(f"Failed to send exit command: {exc}", file=sys.stderr)

View file

@ -1,111 +0,0 @@
from __future__ import annotations
import os
from importlib import import_module
from typing import Optional, Type
from tools import Tool
TOOL_REGISTRY = {
"look": {
"module": "tools",
"class": "LookTool",
"kwargs": {},
"description": "Sends the 'schau' look command to refresh the room description.",
},
"simple": {
"module": "tools",
"class": "LookTool",
"kwargs": {},
"description": "Alias of 'look'. Sends the 'schau' look command to refresh the room description.",
},
"move": {
"module": "movement_tool",
"class": "MovementTool",
"kwargs": {},
"description": "Looks around and moves in one available direction, chosen randomly among unvisited exits.",
},
"movement": {
"module": "movement_tool",
"class": "MovementTool",
"kwargs": {},
"description": "Alias of 'move'. Looks around and moves in one available direction, chosen randomly among unvisited exits.",
},
"explore": {
"module": "tools",
"class": "ExploreTool",
"kwargs": {},
"description": "Sends 'schau' once, then 'untersuche <noun>' for each noun found in the room description.",
},
"communication": {
"module": "tools",
"class": "CommunicationTool",
"kwargs": {},
"description": "Responds to private tells with a friendly greeting via 'teile <player> mit ...'.",
},
"intelligent": {
"module": "intelligent_tool",
"class": "IntelligentCommunicationTool",
"kwargs": {
"model": os.environ.get("MISTLE_LLM_MODEL", "mistral/mistral-small")
},
"description": "Uses an LLM to craft a polite reply to private tells.",
},
"intelligentcommunication": {
"module": "intelligent_tool",
"class": "IntelligentCommunicationTool",
"kwargs": {
"model": os.environ.get("MISTLE_LLM_MODEL", "mistral/mistral-small")
},
"description": "Alias of 'intelligent'. Uses an LLM to craft a polite reply to private tells.",
},
}
TOOL_DESCRIPTIONS = {
name: meta["description"] for name, meta in TOOL_REGISTRY.items()
}
def build_tool(spec: str) -> Tool:
"""Instantiate a tool based on configuration."""
normalized = spec.strip() or "look"
key = normalized.lower()
if key in TOOL_REGISTRY:
meta = TOOL_REGISTRY[key]
module_name = meta["module"]
class_name = meta["class"]
kwargs = meta.get("kwargs", {})
try:
module = import_module(module_name)
tool_cls = getattr(module, class_name)
except AttributeError as exc: # pragma: no cover - optional dependency
raise RuntimeError(f"{class_name} is not available in tools module") from exc
tool = _instantiate_tool(tool_cls, normalized, kwargs)
model_name = kwargs.get("model") if kwargs else None
if model_name:
print(f"[Tool] Using LLM model: {model_name}")
return tool
if ":" in normalized:
module_name, class_name = normalized.split(":", 1)
if not module_name or not class_name:
raise RuntimeError("MISTLE_TOOL must be in 'module:ClassName' format")
module = import_module(module_name)
tool_cls = getattr(module, class_name)
return _instantiate_tool(tool_cls, normalized)
raise RuntimeError(f"Unknown tool spec '{spec}'.")
def _instantiate_tool(
tool_cls: Type[Tool], tool_spec: str, kwargs: Optional[dict] = None
) -> Tool:
if not issubclass(tool_cls, Tool):
raise RuntimeError(f"{tool_spec} is not a Tool subclass")
try:
kwargs = kwargs or {}
return tool_cls(**kwargs)
except TypeError as exc:
raise RuntimeError(f"Failed to instantiate {tool_spec}: {exc}") from exc

View file

@ -1,34 +0,0 @@
from __future__ import annotations
import sys
from pathlib import Path
_WARNED_PATHS: set[Path] = set()
def load_system_prompt(*, default: str, path: str = "SYSTEM.md") -> str:
"""Load the system prompt from disk, with a fallback default."""
prompt_path = Path(path)
try:
content = prompt_path.read_text(encoding="utf-8").strip()
except FileNotFoundError:
_warn_once(prompt_path, "not found")
return default
except OSError as exc: # pragma: no cover - environment specific
_warn_once(prompt_path, f"unreadable ({exc})")
return default
if not content:
_warn_once(prompt_path, "empty")
return default
return content
def _warn_once(path: Path, reason: str) -> None:
if path in _WARNED_PATHS:
return
_WARNED_PATHS.add(path)
print(
f"[Prompt] {path} is {reason}; using built-in default prompt.",
file=sys.stderr,
)

View file

@ -2,39 +2,39 @@ from __future__ import annotations
import io
import os
from contextlib import redirect_stderr, redirect_stdout
from contextlib import redirect_stdout, redirect_stderr
from threading import Event, Thread, current_thread
from typing import Callable, Optional
from typing import Callable
from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical
from textual.widgets import Footer, Header, Input, Label, Log, Static
from textual.widgets import Footer, Header, Input, Label, Log
from app import (
TOOL_DESCRIPTIONS,
SessionState,
build_tool,
load_env_file,
login,
require_env,
run_tool_loop,
)
from agent_runtime import run_agent
from agents import Agent, build_agent
from mud_env import load_env_file, read_connection_settings, read_tool_settings
from mud_session import SessionState, graceful_shutdown, login, run_tool_loop
from mud_tools import TOOL_DESCRIPTIONS, build_tool
from agents import build_agent
from telnetclient import TelnetClient
from tools import Tool
SEND_INTERVAL_SECONDS = 1.0
AUTO_STOP_IDLE_SECONDS = 2.0
class _QueueWriter(io.TextIOBase):
"""Send captured stdout/stderr lines into a UI logger callback."""
def __init__(self, emit: Callable[[str], None]) -> None:
super().__init__()
self._emit = emit
self._buffer = ""
self._buffer: str = ""
def write(self, s: str) -> int: # type: ignore[override]
self._buffer += s
while "\n" in self._buffer:
line, self._buffer = self._buffer.split("\n", 1)
if line:
self._emit(line)
return len(s)
@ -44,83 +44,20 @@ class _QueueWriter(io.TextIOBase):
self._buffer = ""
class UISessionState(SessionState):
"""Session state that emits outgoing commands to the MUD pane."""
def __init__(self, emit_action: Callable[[str, str], None]) -> None:
super().__init__()
self._emit_action = emit_action
def send(self, client: TelnetClient, message: str) -> None:
self.send_with_source(client, message, source="YOU")
def send_with_source(self, client: TelnetClient, message: str, *, source: str) -> None:
super().send(client, message)
self._emit_action(source, message)
def tool_send(
self,
client: TelnetClient,
message: str,
*,
min_interval: float,
stop_event: Event,
) -> bool:
sent = super().tool_send(
client,
message,
min_interval=min_interval,
stop_event=stop_event,
)
if sent:
self._emit_action("TOOL", message)
return sent
class MudUI(App):
CSS = """
Screen {
layout: vertical;
}
#main {
#logs {
height: 1fr;
padding: 0 1;
}
.pane {
width: 1fr;
height: 1fr;
padding: 0 1;
}
.pane-title {
text-style: bold;
padding: 1 0 0 0;
}
#mud-log {
border: round #4c7a8f;
padding: 0 1;
}
#brain-log {
border: round #7d6f5a;
padding: 0 1;
}
#input-row {
height: auto;
padding: 0 2 1 2;
padding: 1 2;
}
#command {
width: 1fr;
}
#status {
padding: 0 2;
color: #999999;
Log {
border: round #888881;
padding: 1 1;
}
"""
@ -130,176 +67,122 @@ class MudUI(App):
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Horizontal(id="main"):
with Vertical(classes="pane"):
yield Label("MUD Stream", classes="pane-title")
self.mud_log = Log(id="mud-log", auto_scroll=True)
with Vertical(id="logs"):
yield Label("MUD Output")
self.mud_log = Log(classes="mud")
yield self.mud_log
with Vertical(classes="pane"):
yield Label("LLM, Agent, and Tool Stream", classes="pane-title")
self.brain_log = Log(id="brain-log", auto_scroll=True)
yield self.brain_log
yield Label("Agent Output")
self.agent_log = Log(classes="agent")
yield self.agent_log
with Horizontal(id="input-row"):
self.input = Input(
placeholder="Type MUD command, #execute <tool>, or #agent <spec>",
id="command",
)
self.input = Input(placeholder="Type command or #execute/#agent ...", id="command")
yield self.input
self.status = Static("Disconnected", id="status")
yield self.status
yield Footer()
def on_mount(self) -> None:
self._ui_thread = current_thread()
self._stop_event = Event()
self._worker_threads: list[Thread] = []
self._reader_thread: Optional[Thread] = None
self._ui_thread = current_thread()
load_env_file()
connection = read_connection_settings()
tool_settings = read_tool_settings()
host = require_env("MISTLE_HOST")
port = int(require_env("MISTLE_PORT"))
timeout = float(os.environ.get("MISTLE_TIMEOUT", "10"))
self._exit_command = connection.exit_command
self.state = UISessionState(self._log_action)
self.client = TelnetClient(host=connection.host, port=connection.port, timeout=timeout)
self.state = SessionState()
self.client = TelnetClient(host=host, port=port, timeout=timeout)
try:
self.client.connect()
except Exception as exc: # pragma: no cover - network specific
self.log_mud(f"[error] Failed to connect: {exc}")
self._set_status("Connection failed")
return
login_writer = _QueueWriter(self.log_mud)
with redirect_stdout(login_writer), redirect_stderr(login_writer):
writer = _QueueWriter(lambda line: self._emit_to_log(self.mud_log, line))
with redirect_stdout(writer), redirect_stderr(writer):
login(
self.client,
user=connection.user,
password=connection.password,
login_prompt=connection.login_prompt,
user=os.environ.get("MISTLE_USER", ""),
password=os.environ.get("MISTLE_PASSWORD", ""),
login_prompt=os.environ.get("MISTLE_LOGIN_PROMPT", ""),
state=self.state,
)
login_writer.flush()
writer.flush()
self._reader_thread = Thread(target=self._reader_loop, daemon=True, name="mud-reader")
self._reader_thread.start()
self.reader_thread = Thread(target=self._reader_loop, daemon=True)
self.reader_thread.start()
if tool_settings.tool_mode:
self._launch_persistent_tool(tool_settings.tool_spec)
sideload_seen: set[str] = set()
for spec in tool_settings.sideload_specs:
lowered = spec.lower()
if lowered in sideload_seen:
continue
sideload_seen.add(lowered)
self._launch_persistent_tool(spec)
self._set_status(f"Connected to {connection.host}:{connection.port}")
self.input.focus()
self.log_mud(f"Connected to {connection.host}:{connection.port}")
self.log_mud(f"Connected to {host}:{port}")
def on_input_submitted(self, event: Input.Submitted) -> None:
command = event.value.strip()
event.input.value = ""
if not command:
return
if command.startswith("#execute"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
self.log_agent("Usage: #execute <tool_spec>")
else:
self._start_tool(parts[1])
return
if command.startswith("#agent"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
self.log_agent("Usage: #agent <agent_spec>")
else:
self._start_agent(parts[1])
return
self.state.send(self.client, command)
self.log_agent(f"> {command}")
def on_unmount(self) -> None:
self._stop_event.set()
try:
graceful_shutdown(self.client, self._exit_command, state=self.state)
except Exception:
pass
if self._reader_thread:
self._reader_thread.join(timeout=1.0)
for thread in self._worker_threads:
thread.join(timeout=1.0)
try:
self.client.close()
except Exception:
pass
def on_input_submitted(self, event: Input.Submitted) -> None:
raw = event.value
event.input.value = ""
command = raw.strip()
if not command:
return
if command.startswith("#execute"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
self.log_brain("[Tool] Usage: #execute <tool_spec>")
def _emit_to_log(self, log: Log, message: str) -> None:
if current_thread() is self._ui_thread:
log.write(message)
else:
self._start_tool(parts[1])
return
log.write(message)
if command.startswith("#agent"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
self.log_brain("[Agent] Usage: #agent <agent_spec>")
else:
self._start_agent(parts[1])
return
def log_mud(self, message: str) -> None:
self._emit_to_log(self.mud_log, message)
self.state.send(self.client, command)
def log_agent(self, message: str) -> None:
self._emit_to_log(self.agent_log, message)
def _reader_loop(self) -> None:
while not self._stop_event.is_set():
data = self.client.receive(timeout=0.3)
if not data:
continue
if data:
self.state.update_output(data)
self.log_mud(data)
self._emit_to_log(self.mud_log, data)
def _start_worker(self, fn: Callable[[], None], *, name: str) -> Thread:
def _wrap_run(self, func: Callable[[], None]) -> Thread:
def runner() -> None:
writer = _QueueWriter(self.log_brain)
writer = _QueueWriter(lambda line: self._emit_to_log(self.agent_log, line))
with redirect_stdout(writer), redirect_stderr(writer):
fn()
func()
writer.flush()
thread = Thread(target=runner, daemon=True, name=name)
thread = Thread(target=runner, daemon=True)
thread.start()
self._worker_threads.append(thread)
return thread
def _launch_persistent_tool(self, spec: str) -> None:
clean_spec = spec.strip()
if not clean_spec:
return
self.log_brain(f"[Tool] Starting persistent tool {clean_spec!r}")
def worker() -> None:
try:
tool = build_tool(clean_spec)
except RuntimeError as exc:
print(f"[Tool] Failed to load '{clean_spec}': {exc}")
return
run_tool_loop(
self.client,
self.state,
tool,
self._stop_event,
min_send_interval=SEND_INTERVAL_SECONDS,
auto_stop=False,
)
self._start_worker(worker, name=f"tool-{clean_spec}")
def _start_tool(self, raw_spec: str) -> None:
spec = raw_spec.strip()
if not spec:
self.log_brain("[Tool] Usage: #execute <tool_spec>")
self.log_agent("Usage: #execute <tool_spec>")
return
self.log_brain(f"[Tool] Executing {spec!r}")
self.log_agent(f"[Tool] Executing {spec!r}")
def worker() -> None:
try:
tool = build_tool(spec)
except RuntimeError as exc:
print(f"[Tool] Failed to load '{spec}': {exc}")
print(f"[Agent] Failed to load tool {spec}: {exc}")
return
run_tool_loop(
@ -307,27 +190,22 @@ class MudUI(App):
self.state,
tool,
self._stop_event,
min_send_interval=SEND_INTERVAL_SECONDS,
min_send_interval=1.0,
auto_stop=True,
auto_stop_idle=AUTO_STOP_IDLE_SECONDS,
auto_stop_idle=2.0,
)
self._start_worker(worker, name=f"ephemeral-tool-{spec}")
self._wrap_run(worker)
def _start_agent(self, raw_spec: str) -> None:
spec = raw_spec.strip()
if not spec:
self.log_brain("[Agent] Usage: #agent <agent_spec>")
self.log_agent("Usage: #agent <agent_spec>")
return
self.log_agent(f"[Agent] Executing {spec!r}")
try:
agent = build_agent(spec, allowed_tools=TOOL_DESCRIPTIONS)
except RuntimeError as exc:
self.log_brain(f"[Agent] Failed to configure '{spec}': {exc}")
return
self.log_brain(f"[Agent] Executing {spec!r}")
self._prime_agent(agent)
def build(spec_str: str) -> Tool:
return build_tool(spec_str)
def run_tool_instance(tool: Tool) -> bool:
run_tool_loop(
@ -335,74 +213,47 @@ class MudUI(App):
self.state,
tool,
self._stop_event,
min_send_interval=SEND_INTERVAL_SECONDS,
min_send_interval=1.0,
auto_stop=True,
auto_stop_idle=AUTO_STOP_IDLE_SECONDS,
auto_stop_idle=2.0,
)
self._prime_agent(agent)
output_after = self.state.snapshot_output()
if output_after:
observe = getattr(agent, "observe", None)
if callable(observe):
try:
observe(output_after)
except Exception as exc: # pragma: no cover
print(f"[Agent] observe failed: {exc}")
return True
def send_command(command: str) -> None:
self.state.send_with_source(self.client, command, source="AGENT")
self.log_brain(f"[Agent] command: {command}")
self.state.send(self.client, command)
self._emit_to_log(self.agent_log, f"[Agent] command: {command}")
def run() -> None:
run_agent(
try:
agent = build_agent(spec, allowed_tools=TOOL_DESCRIPTIONS)
except RuntimeError as exc:
self.log_agent(f"[Agent] Failed to configure '{spec}': {exc}")
return
last_output = self.state.snapshot_output()
observe = getattr(agent, "observe", None)
if last_output and callable(observe):
try:
observe(last_output)
except Exception as exc: # pragma: no cover
self.log_agent(f"[Agent] observe failed: {exc}")
self._wrap_run(
lambda: run_agent(
agent,
build_tool=build_tool,
build_tool=build,
run_tool=run_tool_instance,
send_command=send_command,
stop_event=self._stop_event,
)
self._start_worker(run, name=f"agent-{spec}")
def _prime_agent(self, agent: Agent) -> None:
last_output = self.state.snapshot_output()
if not last_output:
return
observe = getattr(agent, "observe", None)
if not callable(observe):
return
try:
observe(last_output)
except Exception as exc: # pragma: no cover - defensive guard
self.log_brain(f"[Agent] observe failed: {exc}")
def _set_status(self, text: str) -> None:
def update() -> None:
self.status.update(text)
if current_thread() is self._ui_thread:
update()
return
try:
self.call_from_thread(update)
except RuntimeError:
pass
def _write_log(self, log: Log, message: str) -> None:
def update() -> None:
normalized = message.replace("\r\n", "\n").replace("\r", "\n")
for line in normalized.split("\n"):
log.write_line(line)
if current_thread() is self._ui_thread:
update()
return
try:
self.call_from_thread(update)
except RuntimeError:
pass
def _log_action(self, source: str, message: str) -> None:
self.log_mud(f"[{source}] {message}")
def log_mud(self, message: str) -> None:
self._write_log(self.mud_log, message)
def log_brain(self, message: str) -> None:
self._write_log(self.brain_log, message)
)
if __name__ == "__main__":