Compare commits

...

3 commits

Author SHA1 Message Date
9af879eadc
feat: system prompt stored in SYSTEM.md 2026-02-09 10:03:10 +01:00
2a6953db9d
feat: improved TUI 2026-02-09 09:48:55 +01:00
d49b31a3e4
refactor: more readability 2026-02-09 09:35:50 +01:00
10 changed files with 973 additions and 665 deletions

View file

@ -67,7 +67,10 @@ Python-based Telnet helper for connecting to MUD servers, handling login flows,
Add guidance text after the type and optional modifiers separated by `|`, e.g. `#agent intelligent explore carefully|model=mistral/mistral-large-2407|delay=2`. The agent only calls built-in tools (`look`, `move`, `movement`, `explore`, `communication`, `intelligentcommunication`) and refuses unknown names. Add guidance text after the type and optional modifiers separated by `|`, e.g. `#agent intelligent explore carefully|model=mistral/mistral-large-2407|delay=2`. The agent only calls built-in tools (`look`, `move`, `movement`, `explore`, `communication`, `intelligentcommunication`) and refuses unknown names.
9. Prefer a TUI? Run `python textual_ui.py` to open a two-pane interface (MUD output on the left, agent output on the right) with an input box at the bottom. It accepts the same commands as the CLI (`#execute …`, `#agent …`, or raw MUD commands). 9. Prefer a TUI? Run `python textual_ui.py` to open a two-pane interface with a bottom input field:
- Left pane: live MUD stream (server output plus outgoing actions from you, tools, and agents).
- Right pane: LLM reasoning plus tool/agent diagnostic output.
It accepts the same commands as the CLI (`#execute …`, `#agent …`, or raw MUD commands).
## Environment Variables ## Environment Variables
@ -87,6 +90,12 @@ All variables can be placed in the `.env` file (one `KEY=value` per line) or pro
| `MISTLE_LLM_MODEL` | ❌ | Override the `litellm` model used by the intelligent tool (defaults to `mistral/mistral-small-2407`). | | `MISTLE_LLM_MODEL` | ❌ | Override the `litellm` model used by the intelligent tool (defaults to `mistral/mistral-small-2407`). |
| `MISTRAL_API_KEY` | ❌ | API key used by `IntelligentCommunicationTool` (via `litellm`) when calling the `mistral/mistral-small-2407` model. | | `MISTRAL_API_KEY` | ❌ | API key used by `IntelligentCommunicationTool` (via `litellm`) when calling the `mistral/mistral-small-2407` model. |
## System Prompt File
- LLM-backed components now load their system prompt from `SYSTEM.md` in the project root.
- This applies to both the intelligent agent and the intelligent communication tool.
- If `SYSTEM.md` is missing or empty, the app falls back to built-in defaults and logs a warning.
## Tool Development ## Tool Development
- Implement new tools by subclassing `tools.Tool` and overriding `observe()` and `decide()`. - Implement new tools by subclassing `tools.Tool` and overriding `observe()` and `decide()`.

18
SYSTEM.md Normal file
View file

@ -0,0 +1,18 @@
You are Mistle, a autonomous Agent playing the German-speaking MUD silberland.
Your goal is to level up as much as possible and explore the world.
## Behavior
- Be concise, friendly, and informal.
- When replying to players, answer in German.
- Keep answers practical and in-character for a MUD helper bot.
- Use emoticons only when appropriate: :) ;) 8) :( ;( :P X) XD :D
## Tool Usage
Use tools only when appropriate. Think how you can solve problems without
using a tool. Do not use the "exploration" tool multiple times in a row.
## Mud Commands
The following MUD commands may be helpful to you
schau - get a description of the current environment
info - examine your own stats
hilfe - get additional help in general and about specific commands

612
app.py
View file

@ -1,493 +1,55 @@
import os from __future__ import annotations
import select
import sys import sys
import time from threading import Event, Thread
import unicodedata from typing import Optional
from collections import deque
from importlib import import_module
from pathlib import Path
from threading import Event, Lock, Thread
from typing import Callable, Deque, List, Optional, Type
from tools import Tool
from agents import Agent, build_agent
from agent_runtime import run_agent from agent_runtime import run_agent
from agents import Agent, build_agent
TOOL_REGISTRY = { from mud_env import load_env_file, read_connection_settings, read_tool_settings
"look": { from mud_session import (
"module": "tools", SessionState,
"class": "LookTool", graceful_shutdown,
"kwargs": {}, interactive_session,
"description": "Sends the 'schau' look command to refresh the room description.", login,
}, run_tool_loop,
"simple": {
"module": "tools",
"class": "LookTool",
"kwargs": {},
"description": "Alias of 'look'. Sends the 'schau' look command to refresh the room description.",
},
"move": {
"module": "movement_tool",
"class": "MovementTool",
"kwargs": {},
"description": "Looks around and moves in one available direction, chosen randomly among unvisited exits.",
},
"movement": {
"module": "movement_tool",
"class": "MovementTool",
"kwargs": {},
"description": "Alias of 'move'. Looks around and moves in one available direction, chosen randomly among unvisited exits.",
},
"explore": {
"module": "tools",
"class": "ExploreTool",
"kwargs": {},
"description": "Sends 'schau' once, then 'untersuche <noun>' for each noun found in the room description.",
},
"communication": {
"module": "tools",
"class": "CommunicationTool",
"kwargs": {},
"description": "Responds to private tells with a friendly greeting via 'teile <player> mit ...'.",
},
"intelligent": {
"module": "intelligent_tool",
"class": "IntelligentCommunicationTool",
"kwargs": {
"model": os.environ.get(
"MISTLE_LLM_MODEL", "mistral/mistral-small"
)
},
"description": "Uses an LLM to craft a polite reply to private tells.",
},
"intelligentcommunication": {
"module": "intelligent_tool",
"class": "IntelligentCommunicationTool",
"kwargs": {
"model": os.environ.get(
"MISTLE_LLM_MODEL", "mistral/mistral-small"
)
},
"description": "Alias of 'intelligent'. Uses an LLM to craft a polite reply to private tells.",
},
}
TOOL_DESCRIPTIONS = {
name: meta["description"] for name, meta in TOOL_REGISTRY.items()
}
from telnetclient import TelnetClient
_UMLAUT_TRANSLATION = str.maketrans(
{
"ä": "ae",
"ö": "oe",
"ü": "ue",
"Ä": "Ae",
"Ö": "Oe",
"Ü": "Ue",
"ß": "ss",
}
) )
from mud_tools import TOOL_DESCRIPTIONS, build_tool
from telnetclient import TelnetClient
from tools import Tool
def sanitize_for_mud(text: str) -> str: DEFAULT_SEND_INTERVAL = 1.0
"""Return ASCII-only text suitable for Silberland's input parser.""" AUTO_STOP_IDLE_SECONDS = 2.0
if not text:
return ""
replaced = text.translate(_UMLAUT_TRANSLATION)
normalized = unicodedata.normalize("NFKD", replaced)
cleaned: list[str] = []
for ch in normalized:
if unicodedata.combining(ch):
continue
code = ord(ch)
if 32 <= code <= 126:
cleaned.append(ch)
else:
cleaned.append("?")
return "".join(cleaned)
class SessionState: def _start_tool_thread(
"""Share Telnet session state safely across threads."""
class _OutputListener:
def __init__(self) -> None:
self._queue: Deque[str] = deque()
self._lock = Lock()
self._event = Event()
def publish(self, text: str) -> None:
with self._lock:
self._queue.append(text)
self._event.set()
def wait(self, timeout: float) -> bool:
return self._event.wait(timeout)
def drain(self) -> List[str]:
with self._lock:
items = list(self._queue)
self._queue.clear()
self._event.clear()
return items
def close(self) -> None:
with self._lock:
self._queue.clear()
self._event.set()
def __init__(self) -> None:
self._send_lock = Lock()
self._output_lock = Lock()
self._output_event = Event()
self._last_output = ""
self._last_tool_send = 0.0
self._listeners: set[SessionState._OutputListener] = set()
self._listeners_lock = Lock()
def send(self, client: TelnetClient, message: str) -> None:
sanitized = sanitize_for_mud(message)
with self._send_lock:
client.send(sanitized)
def tool_send(
self,
client: TelnetClient,
message: str,
*,
min_interval: float,
stop_event: Event,
) -> bool:
"""Send on behalf of the tool while respecting a minimum cadence."""
sanitized = sanitize_for_mud(message)
while not stop_event.is_set():
with self._send_lock:
now = time.time()
elapsed = now - self._last_tool_send
if elapsed >= min_interval:
client.send(sanitized)
self._last_tool_send = now
return True
wait_time = min_interval - elapsed
if wait_time <= 0:
continue
if stop_event.wait(wait_time):
break
return False
def update_output(self, text: str) -> None:
if not text:
return
with self._output_lock:
self._last_output = text
with self._listeners_lock:
listeners = list(self._listeners)
for listener in listeners:
listener.publish(text)
self._output_event.set()
def snapshot_output(self) -> str:
with self._output_lock:
return self._last_output
def register_listener(self) -> "SessionState._OutputListener":
listener = SessionState._OutputListener()
with self._listeners_lock:
self._listeners.add(listener)
with self._output_lock:
last_output = self._last_output
if last_output:
listener.publish(last_output)
return listener
def remove_listener(self, listener: "SessionState._OutputListener") -> None:
with self._listeners_lock:
existed = listener in self._listeners
if existed:
self._listeners.remove(listener)
if existed:
listener.close()
def wait_for_output(self, timeout: float) -> bool:
return self._output_event.wait(timeout)
def clear_output_event(self) -> None:
self._output_event.clear()
def run_tool_loop(
client: TelnetClient, client: TelnetClient,
state: SessionState, state: SessionState,
tool: Tool, tool: Tool,
stop_event: Event, stop_event: Event,
*, *,
idle_delay: float = 0.5,
min_send_interval: float = 1.0,
auto_stop: bool = False, auto_stop: bool = False,
auto_stop_idle: float = 2.0, auto_stop_idle: float = AUTO_STOP_IDLE_SECONDS,
) -> None: ) -> Thread:
"""Invoke *tool* whenever new output arrives and send its response.""" thread = Thread(
idle_started: Optional[float] = None target=run_tool_loop,
listener = state.register_listener() args=(client, state, tool, stop_event),
kwargs={
def maybe_send() -> None: "min_send_interval": DEFAULT_SEND_INTERVAL,
nonlocal idle_started "auto_stop": auto_stop,
try: "auto_stop_idle": auto_stop_idle,
command = tool.decide() },
except Exception as exc: # pragma: no cover - defensive logging daemon=True,
print(f"[Tool] Failed: {exc}", file=sys.stderr)
return
if not command:
return
sent = state.tool_send(
client,
command,
min_interval=min_send_interval,
stop_event=stop_event,
) )
if not sent: thread.start()
return return thread
idle_started = None
try:
while not stop_event.is_set():
maybe_send()
if stop_event.is_set():
break
triggered = listener.wait(timeout=idle_delay)
if stop_event.is_set():
break
if not triggered:
if auto_stop:
now = time.time()
if idle_started is None:
idle_started = now
elif now - idle_started >= auto_stop_idle:
break
continue
outputs = listener.drain()
if not outputs:
continue
idle_started = None
for chunk in outputs:
if not chunk:
continue
try:
tool.observe(chunk)
except Exception as exc: # pragma: no cover - defensive logging
print(f"[Tool] Failed during observe: {exc}", file=sys.stderr)
maybe_send()
finally:
state.remove_listener(listener)
def load_env_file(path: str = ".env") -> None:
"""Populate ``os.environ`` with key/value pairs from a dotenv file."""
env_path = Path(path)
if not env_path.exists():
return
for line in env_path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
def require_env(key: str) -> str: def _build_sideload_tools(specs: list[str]) -> list[tuple[str, Tool]]:
value = os.environ.get(key)
if value is None:
raise RuntimeError(f"Missing required environment variable: {key}")
return value
def build_tool(spec: str) -> Tool:
"""Instantiate a tool based on configuration."""
normalized = spec.strip() or "look"
key = normalized.lower()
if key in TOOL_REGISTRY:
meta = TOOL_REGISTRY[key]
module_name = meta["module"]
class_name = meta["class"]
kwargs = meta.get("kwargs", {})
try:
module = import_module(module_name)
tool_cls = getattr(module, class_name)
except AttributeError as exc: # pragma: no cover - optional dependency
raise RuntimeError(f"{class_name} is not available in tools module") from exc
tool = _instantiate_tool(tool_cls, normalized, kwargs)
model_name = kwargs.get("model") if kwargs else None
if model_name:
print(f"[Tool] Using LLM model: {model_name}")
return tool
if ":" in normalized:
module_name, class_name = normalized.split(":", 1)
if not module_name or not class_name:
raise RuntimeError("MISTLE_TOOL must be in 'module:ClassName' format")
module = import_module(module_name)
tool_cls = getattr(module, class_name)
return _instantiate_tool(tool_cls, normalized)
raise RuntimeError(f"Unknown tool spec '{spec}'.")
def _instantiate_tool(
tool_cls: Type[Tool], tool_spec: str, kwargs: Optional[dict] = None
) -> Tool:
if not issubclass(tool_cls, Tool):
raise RuntimeError(f"{tool_spec} is not a Tool subclass")
try:
kwargs = kwargs or {}
return tool_cls(**kwargs)
except TypeError as exc:
raise RuntimeError(f"Failed to instantiate {tool_spec}: {exc}") from exc
def login(
client: TelnetClient,
*,
user: str,
password: str,
login_prompt: str,
banner_timeout: float = 10.0,
response_timeout: float = 2.0,
state: Optional[SessionState] = None,
) -> None:
"""Handle the banner/prompt exchange and send credentials."""
if login_prompt:
banner = client.read_until(login_prompt, timeout=banner_timeout)
else:
banner = client.receive(timeout=response_timeout)
if banner:
print(banner, end="" if banner.endswith("\n") else "\n")
if state:
state.update_output(banner)
if user:
client.send(sanitize_for_mud(user))
time.sleep(0.2)
if password:
client.send(sanitize_for_mud(password))
response = client.receive(timeout=response_timeout)
if response:
print(response, end="" if response.endswith("\n") else "\n")
if state:
state.update_output(response)
def interactive_session(
client: TelnetClient,
state: SessionState,
stop_event: Event,
*,
poll_interval: float = 0.2,
receive_timeout: float = 0.2,
exit_command: str,
tool_command: Optional[Callable[[str], None]] = None,
agent_command: Optional[Callable[[str], None]] = None,
) -> None:
"""Keep the Telnet session running, proxying input/output until interrupted."""
if exit_command:
print(f"Connected. Press Ctrl-C to exit (will send {exit_command!r}).")
else:
print("Connected. Press Ctrl-C to exit.")
while not stop_event.is_set():
incoming = client.receive(timeout=receive_timeout)
if incoming:
print(incoming, end="" if incoming.endswith("\n") else "\n")
state.update_output(incoming)
readable, _, _ = select.select([sys.stdin], [], [], poll_interval)
if sys.stdin in readable:
line = sys.stdin.readline()
if line == "":
stop_event.set()
break
line = line.rstrip("\r\n")
if not line:
continue
lowered = line.lower()
if agent_command and lowered.startswith("#agent"):
parts = line.split(maxsplit=1)
if len(parts) == 1:
print("[Agent] Usage: #agent <agent_spec>")
else:
agent_command(parts[1])
continue
if tool_command and lowered.startswith("#execute"):
parts = line.split(maxsplit=1)
if len(parts) == 1:
print("[Tool] Usage: #execute <tool_spec>")
else:
tool_command(parts[1])
continue
state.send(client, line)
def graceful_shutdown(
client: TelnetClient,
exit_command: str,
*,
state: Optional[SessionState] = None,
) -> None:
if not exit_command:
return
try:
if state:
state.send(client, exit_command)
else:
client.send(sanitize_for_mud(exit_command))
farewell = client.receive(timeout=2.0)
if farewell:
print(farewell, end="" if farewell.endswith("\n") else "\n")
if state:
state.update_output(farewell)
except Exception as exc: # pragma: no cover - best effort logging
print(f"Failed to send exit command: {exc}", file=sys.stderr)
def main() -> int:
load_env_file()
host = require_env("MISTLE_HOST")
port_raw = require_env("MISTLE_PORT")
try:
port = int(port_raw)
except ValueError as exc:
raise RuntimeError("MISTLE_PORT must be an integer") from exc
user = os.environ.get("MISTLE_USER", "")
password = os.environ.get("MISTLE_PASSWORD", "")
login_prompt = os.environ.get("MISTLE_LOGIN_PROMPT", "")
exit_command = os.environ.get("MISTLE_EXIT_COMMAND", "")
tool_mode_env = os.environ.get("MISTLE_TOOL_MODE", "")
tool_mode = tool_mode_env.lower() in {"1", "true", "yes", "on"}
tool_spec = os.environ.get("MISTLE_TOOL", "")
sideload_env = os.environ.get("MISTLE_SIDELOAD_TOOL", "")
sideload_specs = [spec.strip() for spec in sideload_env.split(",") if spec.strip()]
sideload_tools: list[tuple[str, Tool]] = [] sideload_tools: list[tuple[str, Tool]] = []
seen_sideloads: set[str] = set() seen_sideloads: set[str] = set()
for spec in sideload_specs: for spec in specs:
lowered = spec.lower() lowered = spec.lower()
if lowered in seen_sideloads: if lowered in seen_sideloads:
continue continue
@ -498,6 +60,32 @@ def main() -> int:
print(f"[Tool] Failed to load sideload '{spec}': {exc}", file=sys.stderr) print(f"[Tool] Failed to load sideload '{spec}': {exc}", file=sys.stderr)
continue continue
sideload_tools.append((spec, tool_instance)) sideload_tools.append((spec, tool_instance))
return sideload_tools
def _prime_agent(agent: Agent, output: str) -> None:
if not output:
return
observe = getattr(agent, "observe", None)
if callable(observe):
try:
observe(output)
except Exception as exc: # pragma: no cover - defensive
print(f"[Agent] observe failed: {exc}", file=sys.stderr)
def _join_threads(threads: list[Thread], *, timeout: float = 1.0) -> None:
for thread in threads:
thread.join(timeout=timeout)
def main() -> int:
load_env_file()
connection = read_connection_settings()
tool_settings = read_tool_settings()
sideload_tools = _build_sideload_tools(tool_settings.sideload_specs)
state = SessionState() state = SessionState()
stop_event = Event() stop_event = Event()
@ -507,38 +95,26 @@ def main() -> int:
sidecar_threads: list[Thread] = [] sidecar_threads: list[Thread] = []
agent_threads: list[Thread] = [] agent_threads: list[Thread] = []
with TelnetClient(host=host, port=port, timeout=10.0) as client: with TelnetClient(host=connection.host, port=connection.port, timeout=10.0) as client:
login( login(
client, client,
user=user, user=connection.user,
password=password, password=connection.password,
login_prompt=login_prompt, login_prompt=connection.login_prompt,
state=state, state=state,
) )
if tool_mode: if tool_settings.tool_mode:
tool = build_tool(tool_spec) tool = build_tool(tool_settings.tool_spec)
tool_thread = Thread( tool_thread = _start_tool_thread(client, state, tool, stop_event)
target=run_tool_loop,
args=(client, state, tool, stop_event),
kwargs={"min_send_interval": 1.0},
daemon=True,
)
tool_thread.start()
if sideload_tools: if sideload_tools:
for sidecar_spec, sidecar_tool in sideload_tools: for sidecar_spec, sidecar_tool in sideload_tools:
thread = Thread( thread = _start_tool_thread(client, state, sidecar_tool, stop_event)
target=run_tool_loop,
args=(client, state, sidecar_tool, stop_event),
kwargs={"min_send_interval": 1.0},
daemon=True,
)
sidecar_threads.append(thread) sidecar_threads.append(thread)
print( print(
f"[Tool] Sideloading '{sidecar_spec}' ({sidecar_tool.__class__.__name__})" f"[Tool] Sideloading '{sidecar_spec}' ({sidecar_tool.__class__.__name__})"
) )
thread.start()
def run_ephemeral_tool(spec: str) -> None: def run_ephemeral_tool(spec: str) -> None:
spec = spec.strip() spec = spec.strip()
@ -550,19 +126,16 @@ def main() -> int:
except RuntimeError as exc: except RuntimeError as exc:
print(f"[Tool] Failed to load '{spec}': {exc}", file=sys.stderr) print(f"[Tool] Failed to load '{spec}': {exc}", file=sys.stderr)
return return
thread = Thread( thread = _start_tool_thread(
target=run_tool_loop, client,
args=(client, state, temp_tool, stop_event), state,
kwargs={ temp_tool,
"min_send_interval": 1.0, stop_event,
"auto_stop": True, auto_stop=True,
"auto_stop_idle": 2.0, auto_stop_idle=AUTO_STOP_IDLE_SECONDS,
},
daemon=True,
) )
ephemeral_tools.append(thread) ephemeral_tools.append(thread)
print(f"[Tool] Executing {spec!r} once") print(f"[Tool] Executing {spec!r} once")
thread.start()
def run_ephemeral_agent(spec: str) -> None: def run_ephemeral_agent(spec: str) -> None:
spec = spec.strip() spec = spec.strip()
@ -575,14 +148,7 @@ def main() -> int:
print(f"[Agent] Failed to configure '{spec}': {exc}", file=sys.stderr) print(f"[Agent] Failed to configure '{spec}': {exc}", file=sys.stderr)
return return
last_output = state.snapshot_output() _prime_agent(temp_agent, state.snapshot_output())
if last_output:
observe = getattr(temp_agent, "observe", None)
if callable(observe):
try:
observe(last_output)
except Exception as exc: # pragma: no cover - defensive
print(f"[Agent] observe failed: {exc}", file=sys.stderr)
def run_tool_instance(tool: Tool) -> bool: def run_tool_instance(tool: Tool) -> bool:
run_tool_loop( run_tool_loop(
@ -590,18 +156,11 @@ def main() -> int:
state, state,
tool, tool,
stop_event, stop_event,
min_send_interval=1.0, min_send_interval=DEFAULT_SEND_INTERVAL,
auto_stop=True, auto_stop=True,
auto_stop_idle=2.0, auto_stop_idle=AUTO_STOP_IDLE_SECONDS,
) )
output_after = state.snapshot_output() _prime_agent(temp_agent, state.snapshot_output())
if output_after:
observe = getattr(temp_agent, "observe", None)
if callable(observe):
try:
observe(output_after)
except Exception as exc: # pragma: no cover - defensive
print(f"[Agent] observe failed: {exc}", file=sys.stderr)
return True return True
thread = Thread( thread = Thread(
@ -625,8 +184,8 @@ def main() -> int:
client, client,
state=state, state=state,
stop_event=stop_event, stop_event=stop_event,
exit_command=exit_command, exit_command=connection.exit_command,
tool_command=None if tool_mode else run_ephemeral_tool, tool_command=None if tool_settings.tool_mode else run_ephemeral_tool,
agent_command=run_ephemeral_agent, agent_command=run_ephemeral_agent,
) )
except KeyboardInterrupt: except KeyboardInterrupt:
@ -636,15 +195,12 @@ def main() -> int:
stop_event.set() stop_event.set()
if tool_thread: if tool_thread:
tool_thread.join(timeout=1.0) tool_thread.join(timeout=1.0)
for thread in sidecar_threads: _join_threads(sidecar_threads)
thread.join(timeout=1.0) _join_threads(ephemeral_tools)
for thread in ephemeral_tools: _join_threads(agent_threads)
thread.join(timeout=1.0)
for thread in agent_threads:
thread.join(timeout=1.0)
if interrupted: if interrupted:
graceful_shutdown(client, exit_command, state=state) graceful_shutdown(client, connection.exit_command, state=state)
return 0 return 0

View file

@ -14,19 +14,24 @@ except ImportError: # pragma: no cover
completion = None # type: ignore[assignment] completion = None # type: ignore[assignment]
from agents import Agent from agents import Agent
from system_prompt import load_system_prompt
ToolInvoker = Callable[[str], bool] ToolInvoker = Callable[[str], bool]
CommandExecutor = Callable[[str], None] CommandExecutor = Callable[[str], None]
DEFAULT_AGENT_SYSTEM_PROMPT = (
"You are Mistle, a helpful MUD assistant. "
"You can either call tools or send plain commands to the MUD."
)
@dataclass @dataclass
class IntelligentAgent(Agent): class IntelligentAgent(Agent):
"""LLM-driven agent that decides between tools and raw commands.""" """LLM-driven agent that decides between tools and raw commands."""
model: str = "mistral/mistral-large-2407" model: str = "mistral/mistral-large-2407"
system_prompt: str = ( system_prompt: str = field(
"You are Mistle, a helpful MUD assistant. " default_factory=lambda: load_system_prompt(default=DEFAULT_AGENT_SYSTEM_PROMPT)
"You can either call tools or send plain commands to the MUD."
) )
temperature: float = 0.7 temperature: float = 0.7
max_output_tokens: int = 200 max_output_tokens: int = 200

View file

@ -11,20 +11,27 @@ try:
except ImportError: # pragma: no cover - optional dependency except ImportError: # pragma: no cover - optional dependency
completion = None # type: ignore[assignment] completion = None # type: ignore[assignment]
from system_prompt import load_system_prompt
from tools import Tool from tools import Tool
DEFAULT_COMMUNICATION_SYSTEM_PROMPT = (
"Du bist Mistle, ein hilfsbereiter MUD-Bot. "
"Antworte freundlich und knapp in deutscher Sprache."
"Sprich informell und freundlich."
"Antworte immer auf Deutsch."
"Verwende emoticons, wenn es angebracht ist: :) ;) 8) :( ;( :P X) XD :D"
)
@dataclass @dataclass
class IntelligentCommunicationTool(Tool): class IntelligentCommunicationTool(Tool):
"""Tool that uses a language model to answer private tells.""" """Tool that uses a language model to answer private tells."""
model: str = "mistral/mistral-tiny" model: str = "mistral/mistral-tiny"
system_prompt: str = ( system_prompt: str = field(
"Du bist Mistle, ein hilfsbereiter MUD-Bot. " default_factory=lambda: load_system_prompt(
"Antworte freundlich und knapp in deutscher Sprache." default=DEFAULT_COMMUNICATION_SYSTEM_PROMPT
"Sprich informell und freundlich." )
"Antworte immer auf Deutsch."
"Verwende emoticons, wenn es angebracht ist: :) ;) 8) :( ;( :P X) XD :D"
) )
temperature: float = 0.7 temperature: float = 0.7
max_output_tokens: int = 120 max_output_tokens: int = 120

90
mud_env.py Normal file
View file

@ -0,0 +1,90 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class ConnectionSettings:
host: str
port: int
user: str
password: str
login_prompt: str
exit_command: str
@dataclass(frozen=True)
class ToolSettings:
tool_mode: bool
tool_spec: str
sideload_specs: list[str]
def load_env_file(path: str = ".env") -> None:
"""Populate ``os.environ`` with key/value pairs from a dotenv file."""
env_path = Path(path)
if not env_path.exists():
return
for line in env_path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
def require_env(key: str) -> str:
value = os.environ.get(key)
if value is None:
raise RuntimeError(f"Missing required environment variable: {key}")
return value
def parse_bool(raw: str) -> bool:
return raw.strip().lower() in {"1", "true", "yes", "on"}
def parse_port(raw: str) -> int:
try:
return int(raw)
except ValueError as exc:
raise RuntimeError("MISTLE_PORT must be an integer") from exc
def parse_csv(raw: str) -> list[str]:
return [value.strip() for value in raw.split(",") if value.strip()]
def read_connection_settings() -> ConnectionSettings:
host = require_env("MISTLE_HOST")
port = parse_port(require_env("MISTLE_PORT"))
user = os.environ.get("MISTLE_USER", "")
password = os.environ.get("MISTLE_PASSWORD", "")
login_prompt = os.environ.get("MISTLE_LOGIN_PROMPT", "")
exit_command = os.environ.get("MISTLE_EXIT_COMMAND", "")
return ConnectionSettings(
host=host,
port=port,
user=user,
password=password,
login_prompt=login_prompt,
exit_command=exit_command,
)
def read_tool_settings() -> ToolSettings:
tool_mode = parse_bool(os.environ.get("MISTLE_TOOL_MODE", ""))
tool_spec = os.environ.get("MISTLE_TOOL", "")
sideload_specs = parse_csv(os.environ.get("MISTLE_SIDELOAD_TOOL", ""))
return ToolSettings(
tool_mode=tool_mode,
tool_spec=tool_spec,
sideload_specs=sideload_specs,
)

329
mud_session.py Normal file
View file

@ -0,0 +1,329 @@
from __future__ import annotations
import select
import sys
import time
import unicodedata
from collections import deque
from threading import Event, Lock
from typing import Callable, Deque, List, Optional
from telnetclient import TelnetClient
from tools import Tool
_UMLAUT_TRANSLATION = str.maketrans(
{
"ä": "ae",
"ö": "oe",
"ü": "ue",
"Ä": "Ae",
"Ö": "Oe",
"Ü": "Ue",
"ß": "ss",
}
)
def sanitize_for_mud(text: str) -> str:
"""Return ASCII-only text suitable for Silberland's input parser."""
if not text:
return ""
replaced = text.translate(_UMLAUT_TRANSLATION)
normalized = unicodedata.normalize("NFKD", replaced)
cleaned: list[str] = []
for ch in normalized:
if unicodedata.combining(ch):
continue
code = ord(ch)
if 32 <= code <= 126:
cleaned.append(ch)
else:
cleaned.append("?")
return "".join(cleaned)
class SessionState:
"""Share Telnet session state safely across threads."""
class _OutputListener:
def __init__(self) -> None:
self._queue: Deque[str] = deque()
self._lock = Lock()
self._event = Event()
def publish(self, text: str) -> None:
with self._lock:
self._queue.append(text)
self._event.set()
def wait(self, timeout: float) -> bool:
return self._event.wait(timeout)
def drain(self) -> List[str]:
with self._lock:
items = list(self._queue)
self._queue.clear()
self._event.clear()
return items
def close(self) -> None:
with self._lock:
self._queue.clear()
self._event.set()
def __init__(self) -> None:
self._send_lock = Lock()
self._output_lock = Lock()
self._output_event = Event()
self._last_output = ""
self._last_tool_send = 0.0
self._listeners: set[SessionState._OutputListener] = set()
self._listeners_lock = Lock()
def send(self, client: TelnetClient, message: str) -> None:
sanitized = sanitize_for_mud(message)
with self._send_lock:
client.send(sanitized)
def tool_send(
self,
client: TelnetClient,
message: str,
*,
min_interval: float,
stop_event: Event,
) -> bool:
"""Send on behalf of the tool while respecting a minimum cadence."""
sanitized = sanitize_for_mud(message)
while not stop_event.is_set():
with self._send_lock:
now = time.time()
elapsed = now - self._last_tool_send
if elapsed >= min_interval:
client.send(sanitized)
self._last_tool_send = now
return True
wait_time = min_interval - elapsed
if wait_time <= 0:
continue
if stop_event.wait(wait_time):
break
return False
def update_output(self, text: str) -> None:
if not text:
return
with self._output_lock:
self._last_output = text
with self._listeners_lock:
listeners = list(self._listeners)
for listener in listeners:
listener.publish(text)
self._output_event.set()
def snapshot_output(self) -> str:
with self._output_lock:
return self._last_output
def register_listener(self) -> "SessionState._OutputListener":
listener = SessionState._OutputListener()
with self._listeners_lock:
self._listeners.add(listener)
with self._output_lock:
last_output = self._last_output
if last_output:
listener.publish(last_output)
return listener
def remove_listener(self, listener: "SessionState._OutputListener") -> None:
with self._listeners_lock:
existed = listener in self._listeners
if existed:
self._listeners.remove(listener)
if existed:
listener.close()
def wait_for_output(self, timeout: float) -> bool:
return self._output_event.wait(timeout)
def clear_output_event(self) -> None:
self._output_event.clear()
def run_tool_loop(
client: TelnetClient,
state: SessionState,
tool: Tool,
stop_event: Event,
*,
idle_delay: float = 0.5,
min_send_interval: float = 1.0,
auto_stop: bool = False,
auto_stop_idle: float = 2.0,
) -> None:
"""Invoke *tool* whenever new output arrives and send its response."""
idle_started: Optional[float] = None
listener = state.register_listener()
def maybe_send() -> None:
nonlocal idle_started
try:
command = tool.decide()
except Exception as exc: # pragma: no cover - defensive logging
print(f"[Tool] Failed: {exc}", file=sys.stderr)
return
if not command:
return
sent = state.tool_send(
client,
command,
min_interval=min_send_interval,
stop_event=stop_event,
)
if not sent:
return
idle_started = None
try:
while not stop_event.is_set():
maybe_send()
if stop_event.is_set():
break
triggered = listener.wait(timeout=idle_delay)
if stop_event.is_set():
break
if not triggered:
if auto_stop:
now = time.time()
if idle_started is None:
idle_started = now
elif now - idle_started >= auto_stop_idle:
break
continue
outputs = listener.drain()
if not outputs:
continue
idle_started = None
for chunk in outputs:
if not chunk:
continue
try:
tool.observe(chunk)
except Exception as exc: # pragma: no cover - defensive logging
print(f"[Tool] Failed during observe: {exc}", file=sys.stderr)
maybe_send()
finally:
state.remove_listener(listener)
def login(
client: TelnetClient,
*,
user: str,
password: str,
login_prompt: str,
banner_timeout: float = 10.0,
response_timeout: float = 2.0,
state: Optional[SessionState] = None,
) -> None:
"""Handle the banner/prompt exchange and send credentials."""
if login_prompt:
banner = client.read_until(login_prompt, timeout=banner_timeout)
else:
banner = client.receive(timeout=response_timeout)
if banner:
print(banner, end="" if banner.endswith("\n") else "\n")
if state:
state.update_output(banner)
if user:
client.send(sanitize_for_mud(user))
time.sleep(0.2)
if password:
client.send(sanitize_for_mud(password))
response = client.receive(timeout=response_timeout)
if response:
print(response, end="" if response.endswith("\n") else "\n")
if state:
state.update_output(response)
def interactive_session(
client: TelnetClient,
state: SessionState,
stop_event: Event,
*,
poll_interval: float = 0.2,
receive_timeout: float = 0.2,
exit_command: str,
tool_command: Optional[Callable[[str], None]] = None,
agent_command: Optional[Callable[[str], None]] = None,
) -> None:
"""Keep the Telnet session running, proxying input/output until interrupted."""
if exit_command:
print(f"Connected. Press Ctrl-C to exit (will send {exit_command!r}).")
else:
print("Connected. Press Ctrl-C to exit.")
while not stop_event.is_set():
incoming = client.receive(timeout=receive_timeout)
if incoming:
print(incoming, end="" if incoming.endswith("\n") else "\n")
state.update_output(incoming)
readable, _, _ = select.select([sys.stdin], [], [], poll_interval)
if sys.stdin in readable:
line = sys.stdin.readline()
if line == "":
stop_event.set()
break
line = line.rstrip("\r\n")
if not line:
continue
lowered = line.lower()
if agent_command and lowered.startswith("#agent"):
parts = line.split(maxsplit=1)
if len(parts) == 1:
print("[Agent] Usage: #agent <agent_spec>")
else:
agent_command(parts[1])
continue
if tool_command and lowered.startswith("#execute"):
parts = line.split(maxsplit=1)
if len(parts) == 1:
print("[Tool] Usage: #execute <tool_spec>")
else:
tool_command(parts[1])
continue
state.send(client, line)
def graceful_shutdown(
client: TelnetClient,
exit_command: str,
*,
state: Optional[SessionState] = None,
) -> None:
if not exit_command:
return
try:
if state:
state.send(client, exit_command)
else:
client.send(sanitize_for_mud(exit_command))
farewell = client.receive(timeout=2.0)
if farewell:
print(farewell, end="" if farewell.endswith("\n") else "\n")
if state:
state.update_output(farewell)
except Exception as exc: # pragma: no cover - best effort logging
print(f"Failed to send exit command: {exc}", file=sys.stderr)

111
mud_tools.py Normal file
View file

@ -0,0 +1,111 @@
from __future__ import annotations
import os
from importlib import import_module
from typing import Optional, Type
from tools import Tool
TOOL_REGISTRY = {
"look": {
"module": "tools",
"class": "LookTool",
"kwargs": {},
"description": "Sends the 'schau' look command to refresh the room description.",
},
"simple": {
"module": "tools",
"class": "LookTool",
"kwargs": {},
"description": "Alias of 'look'. Sends the 'schau' look command to refresh the room description.",
},
"move": {
"module": "movement_tool",
"class": "MovementTool",
"kwargs": {},
"description": "Looks around and moves in one available direction, chosen randomly among unvisited exits.",
},
"movement": {
"module": "movement_tool",
"class": "MovementTool",
"kwargs": {},
"description": "Alias of 'move'. Looks around and moves in one available direction, chosen randomly among unvisited exits.",
},
"explore": {
"module": "tools",
"class": "ExploreTool",
"kwargs": {},
"description": "Sends 'schau' once, then 'untersuche <noun>' for each noun found in the room description.",
},
"communication": {
"module": "tools",
"class": "CommunicationTool",
"kwargs": {},
"description": "Responds to private tells with a friendly greeting via 'teile <player> mit ...'.",
},
"intelligent": {
"module": "intelligent_tool",
"class": "IntelligentCommunicationTool",
"kwargs": {
"model": os.environ.get("MISTLE_LLM_MODEL", "mistral/mistral-small")
},
"description": "Uses an LLM to craft a polite reply to private tells.",
},
"intelligentcommunication": {
"module": "intelligent_tool",
"class": "IntelligentCommunicationTool",
"kwargs": {
"model": os.environ.get("MISTLE_LLM_MODEL", "mistral/mistral-small")
},
"description": "Alias of 'intelligent'. Uses an LLM to craft a polite reply to private tells.",
},
}
TOOL_DESCRIPTIONS = {
name: meta["description"] for name, meta in TOOL_REGISTRY.items()
}
def build_tool(spec: str) -> Tool:
"""Instantiate a tool based on configuration."""
normalized = spec.strip() or "look"
key = normalized.lower()
if key in TOOL_REGISTRY:
meta = TOOL_REGISTRY[key]
module_name = meta["module"]
class_name = meta["class"]
kwargs = meta.get("kwargs", {})
try:
module = import_module(module_name)
tool_cls = getattr(module, class_name)
except AttributeError as exc: # pragma: no cover - optional dependency
raise RuntimeError(f"{class_name} is not available in tools module") from exc
tool = _instantiate_tool(tool_cls, normalized, kwargs)
model_name = kwargs.get("model") if kwargs else None
if model_name:
print(f"[Tool] Using LLM model: {model_name}")
return tool
if ":" in normalized:
module_name, class_name = normalized.split(":", 1)
if not module_name or not class_name:
raise RuntimeError("MISTLE_TOOL must be in 'module:ClassName' format")
module = import_module(module_name)
tool_cls = getattr(module, class_name)
return _instantiate_tool(tool_cls, normalized)
raise RuntimeError(f"Unknown tool spec '{spec}'.")
def _instantiate_tool(
tool_cls: Type[Tool], tool_spec: str, kwargs: Optional[dict] = None
) -> Tool:
if not issubclass(tool_cls, Tool):
raise RuntimeError(f"{tool_spec} is not a Tool subclass")
try:
kwargs = kwargs or {}
return tool_cls(**kwargs)
except TypeError as exc:
raise RuntimeError(f"Failed to instantiate {tool_spec}: {exc}") from exc

34
system_prompt.py Normal file
View file

@ -0,0 +1,34 @@
from __future__ import annotations
import sys
from pathlib import Path
_WARNED_PATHS: set[Path] = set()
def load_system_prompt(*, default: str, path: str = "SYSTEM.md") -> str:
"""Load the system prompt from disk, with a fallback default."""
prompt_path = Path(path)
try:
content = prompt_path.read_text(encoding="utf-8").strip()
except FileNotFoundError:
_warn_once(prompt_path, "not found")
return default
except OSError as exc: # pragma: no cover - environment specific
_warn_once(prompt_path, f"unreadable ({exc})")
return default
if not content:
_warn_once(prompt_path, "empty")
return default
return content
def _warn_once(path: Path, reason: str) -> None:
if path in _WARNED_PATHS:
return
_WARNED_PATHS.add(path)
print(
f"[Prompt] {path} is {reason}; using built-in default prompt.",
file=sys.stderr,
)

View file

@ -2,39 +2,39 @@ from __future__ import annotations
import io import io
import os import os
from contextlib import redirect_stdout, redirect_stderr from contextlib import redirect_stderr, redirect_stdout
from threading import Event, Thread, current_thread from threading import Event, Thread, current_thread
from typing import Callable from typing import Callable, Optional
from textual.app import App, ComposeResult from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical from textual.containers import Horizontal, Vertical
from textual.widgets import Footer, Header, Input, Label, Log from textual.widgets import Footer, Header, Input, Label, Log, Static
from app import (
TOOL_DESCRIPTIONS,
SessionState,
build_tool,
load_env_file,
login,
require_env,
run_tool_loop,
)
from agent_runtime import run_agent from agent_runtime import run_agent
from agents import build_agent from agents import Agent, build_agent
from mud_env import load_env_file, read_connection_settings, read_tool_settings
from mud_session import SessionState, graceful_shutdown, login, run_tool_loop
from mud_tools import TOOL_DESCRIPTIONS, build_tool
from telnetclient import TelnetClient from telnetclient import TelnetClient
from tools import Tool
SEND_INTERVAL_SECONDS = 1.0
AUTO_STOP_IDLE_SECONDS = 2.0
class _QueueWriter(io.TextIOBase): class _QueueWriter(io.TextIOBase):
"""Send captured stdout/stderr lines into a UI logger callback."""
def __init__(self, emit: Callable[[str], None]) -> None: def __init__(self, emit: Callable[[str], None]) -> None:
super().__init__() super().__init__()
self._emit = emit self._emit = emit
self._buffer: str = "" self._buffer = ""
def write(self, s: str) -> int: # type: ignore[override] def write(self, s: str) -> int: # type: ignore[override]
self._buffer += s self._buffer += s
while "\n" in self._buffer: while "\n" in self._buffer:
line, self._buffer = self._buffer.split("\n", 1) line, self._buffer = self._buffer.split("\n", 1)
if line:
self._emit(line) self._emit(line)
return len(s) return len(s)
@ -44,20 +44,83 @@ class _QueueWriter(io.TextIOBase):
self._buffer = "" self._buffer = ""
class UISessionState(SessionState):
"""Session state that emits outgoing commands to the MUD pane."""
def __init__(self, emit_action: Callable[[str, str], None]) -> None:
super().__init__()
self._emit_action = emit_action
def send(self, client: TelnetClient, message: str) -> None:
self.send_with_source(client, message, source="YOU")
def send_with_source(self, client: TelnetClient, message: str, *, source: str) -> None:
super().send(client, message)
self._emit_action(source, message)
def tool_send(
self,
client: TelnetClient,
message: str,
*,
min_interval: float,
stop_event: Event,
) -> bool:
sent = super().tool_send(
client,
message,
min_interval=min_interval,
stop_event=stop_event,
)
if sent:
self._emit_action("TOOL", message)
return sent
class MudUI(App): class MudUI(App):
CSS = """ CSS = """
Screen { Screen {
layout: vertical; layout: vertical;
} }
#logs {
#main {
height: 1fr; height: 1fr;
padding: 0 1;
} }
.pane {
width: 1fr;
height: 1fr;
padding: 0 1;
}
.pane-title {
text-style: bold;
padding: 1 0 0 0;
}
#mud-log {
border: round #4c7a8f;
padding: 0 1;
}
#brain-log {
border: round #7d6f5a;
padding: 0 1;
}
#input-row { #input-row {
padding: 1 2; height: auto;
padding: 0 2 1 2;
} }
Log {
border: round #888881; #command {
padding: 1 1; width: 1fr;
}
#status {
padding: 0 2;
color: #999999;
} }
""" """
@ -67,122 +130,150 @@ class MudUI(App):
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Header(show_clock=True) yield Header(show_clock=True)
with Vertical(id="logs"): with Horizontal(id="main"):
yield Label("MUD Output") with Vertical(classes="pane"):
self.mud_log = Log(classes="mud") yield Label("MUD Stream", classes="pane-title")
self.mud_log = Log(id="mud-log", auto_scroll=True)
yield self.mud_log yield self.mud_log
yield Label("Agent Output") with Vertical(classes="pane"):
self.agent_log = Log(classes="agent") yield Label("LLM, Agent, and Tool Stream", classes="pane-title")
yield self.agent_log self.brain_log = Log(id="brain-log", auto_scroll=True)
yield self.brain_log
with Horizontal(id="input-row"): with Horizontal(id="input-row"):
self.input = Input(placeholder="Type command or #execute/#agent ...", id="command") self.input = Input(
placeholder="Type MUD command, #execute <tool>, or #agent <spec>",
id="command",
)
yield self.input yield self.input
self.status = Static("Disconnected", id="status")
yield self.status
yield Footer() yield Footer()
def on_mount(self) -> None: def on_mount(self) -> None:
self._stop_event = Event()
self._ui_thread = current_thread() self._ui_thread = current_thread()
self._stop_event = Event()
self._worker_threads: list[Thread] = []
self._reader_thread: Optional[Thread] = None
load_env_file() load_env_file()
host = require_env("MISTLE_HOST") connection = read_connection_settings()
port = int(require_env("MISTLE_PORT")) tool_settings = read_tool_settings()
timeout = float(os.environ.get("MISTLE_TIMEOUT", "10")) timeout = float(os.environ.get("MISTLE_TIMEOUT", "10"))
self.state = SessionState() self._exit_command = connection.exit_command
self.client = TelnetClient(host=host, port=port, timeout=timeout) self.state = UISessionState(self._log_action)
self.client = TelnetClient(host=connection.host, port=connection.port, timeout=timeout)
try: try:
self.client.connect() self.client.connect()
except Exception as exc: # pragma: no cover - network specific except Exception as exc: # pragma: no cover - network specific
self.log_mud(f"[error] Failed to connect: {exc}") self.log_mud(f"[error] Failed to connect: {exc}")
self._set_status("Connection failed")
return return
writer = _QueueWriter(lambda line: self._emit_to_log(self.mud_log, line)) login_writer = _QueueWriter(self.log_mud)
with redirect_stdout(writer), redirect_stderr(writer): with redirect_stdout(login_writer), redirect_stderr(login_writer):
login( login(
self.client, self.client,
user=os.environ.get("MISTLE_USER", ""), user=connection.user,
password=os.environ.get("MISTLE_PASSWORD", ""), password=connection.password,
login_prompt=os.environ.get("MISTLE_LOGIN_PROMPT", ""), login_prompt=connection.login_prompt,
state=self.state, state=self.state,
) )
writer.flush() login_writer.flush()
self.reader_thread = Thread(target=self._reader_loop, daemon=True) self._reader_thread = Thread(target=self._reader_loop, daemon=True, name="mud-reader")
self.reader_thread.start() self._reader_thread.start()
if tool_settings.tool_mode:
self._launch_persistent_tool(tool_settings.tool_spec)
sideload_seen: set[str] = set()
for spec in tool_settings.sideload_specs:
lowered = spec.lower()
if lowered in sideload_seen:
continue
sideload_seen.add(lowered)
self._launch_persistent_tool(spec)
self._set_status(f"Connected to {connection.host}:{connection.port}")
self.input.focus() self.input.focus()
self.log_mud(f"Connected to {host}:{port}") self.log_mud(f"Connected to {connection.host}:{connection.port}")
def on_input_submitted(self, event: Input.Submitted) -> None:
command = event.value.strip()
event.input.value = ""
if not command:
return
if command.startswith("#execute"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
self.log_agent("Usage: #execute <tool_spec>")
else:
self._start_tool(parts[1])
return
if command.startswith("#agent"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
self.log_agent("Usage: #agent <agent_spec>")
else:
self._start_agent(parts[1])
return
self.state.send(self.client, command)
self.log_agent(f"> {command}")
def on_unmount(self) -> None: def on_unmount(self) -> None:
self._stop_event.set() self._stop_event.set()
try:
graceful_shutdown(self.client, self._exit_command, state=self.state)
except Exception:
pass
if self._reader_thread:
self._reader_thread.join(timeout=1.0)
for thread in self._worker_threads:
thread.join(timeout=1.0)
try: try:
self.client.close() self.client.close()
except Exception: except Exception:
pass pass
def _emit_to_log(self, log: Log, message: str) -> None: def on_input_submitted(self, event: Input.Submitted) -> None:
if current_thread() is self._ui_thread: raw = event.value
log.write(message) event.input.value = ""
command = raw.strip()
if not command:
return
if command.startswith("#execute"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
self.log_brain("[Tool] Usage: #execute <tool_spec>")
else: else:
log.write(message) self._start_tool(parts[1])
return
def log_mud(self, message: str) -> None: if command.startswith("#agent"):
self._emit_to_log(self.mud_log, message) parts = command.split(maxsplit=1)
if len(parts) == 1:
self.log_brain("[Agent] Usage: #agent <agent_spec>")
else:
self._start_agent(parts[1])
return
def log_agent(self, message: str) -> None: self.state.send(self.client, command)
self._emit_to_log(self.agent_log, message)
def _reader_loop(self) -> None: def _reader_loop(self) -> None:
while not self._stop_event.is_set(): while not self._stop_event.is_set():
data = self.client.receive(timeout=0.3) data = self.client.receive(timeout=0.3)
if data: if not data:
continue
self.state.update_output(data) self.state.update_output(data)
self._emit_to_log(self.mud_log, data) self.log_mud(data)
def _wrap_run(self, func: Callable[[], None]) -> Thread: def _start_worker(self, fn: Callable[[], None], *, name: str) -> Thread:
def runner() -> None: def runner() -> None:
writer = _QueueWriter(lambda line: self._emit_to_log(self.agent_log, line)) writer = _QueueWriter(self.log_brain)
with redirect_stdout(writer), redirect_stderr(writer): with redirect_stdout(writer), redirect_stderr(writer):
func() fn()
writer.flush() writer.flush()
thread = Thread(target=runner, daemon=True) thread = Thread(target=runner, daemon=True, name=name)
thread.start() thread.start()
self._worker_threads.append(thread)
return thread return thread
def _start_tool(self, raw_spec: str) -> None: def _launch_persistent_tool(self, spec: str) -> None:
spec = raw_spec.strip() clean_spec = spec.strip()
if not spec: if not clean_spec:
self.log_agent("Usage: #execute <tool_spec>")
return return
self.log_agent(f"[Tool] Executing {spec!r}") self.log_brain(f"[Tool] Starting persistent tool {clean_spec!r}")
def worker() -> None: def worker() -> None:
try: try:
tool = build_tool(spec) tool = build_tool(clean_spec)
except RuntimeError as exc: except RuntimeError as exc:
print(f"[Agent] Failed to load tool {spec}: {exc}") print(f"[Tool] Failed to load '{clean_spec}': {exc}")
return return
run_tool_loop( run_tool_loop(
@ -190,22 +281,53 @@ class MudUI(App):
self.state, self.state,
tool, tool,
self._stop_event, self._stop_event,
min_send_interval=1.0, min_send_interval=SEND_INTERVAL_SECONDS,
auto_stop=True, auto_stop=False,
auto_stop_idle=2.0,
) )
self._wrap_run(worker) self._start_worker(worker, name=f"tool-{clean_spec}")
def _start_tool(self, raw_spec: str) -> None:
spec = raw_spec.strip()
if not spec:
self.log_brain("[Tool] Usage: #execute <tool_spec>")
return
self.log_brain(f"[Tool] Executing {spec!r}")
def worker() -> None:
try:
tool = build_tool(spec)
except RuntimeError as exc:
print(f"[Tool] Failed to load '{spec}': {exc}")
return
run_tool_loop(
self.client,
self.state,
tool,
self._stop_event,
min_send_interval=SEND_INTERVAL_SECONDS,
auto_stop=True,
auto_stop_idle=AUTO_STOP_IDLE_SECONDS,
)
self._start_worker(worker, name=f"ephemeral-tool-{spec}")
def _start_agent(self, raw_spec: str) -> None: def _start_agent(self, raw_spec: str) -> None:
spec = raw_spec.strip() spec = raw_spec.strip()
if not spec: if not spec:
self.log_agent("Usage: #agent <agent_spec>") self.log_brain("[Agent] Usage: #agent <agent_spec>")
return return
self.log_agent(f"[Agent] Executing {spec!r}")
def build(spec_str: str) -> Tool: try:
return build_tool(spec_str) agent = build_agent(spec, allowed_tools=TOOL_DESCRIPTIONS)
except RuntimeError as exc:
self.log_brain(f"[Agent] Failed to configure '{spec}': {exc}")
return
self.log_brain(f"[Agent] Executing {spec!r}")
self._prime_agent(agent)
def run_tool_instance(tool: Tool) -> bool: def run_tool_instance(tool: Tool) -> bool:
run_tool_loop( run_tool_loop(
@ -213,47 +335,74 @@ class MudUI(App):
self.state, self.state,
tool, tool,
self._stop_event, self._stop_event,
min_send_interval=1.0, min_send_interval=SEND_INTERVAL_SECONDS,
auto_stop=True, auto_stop=True,
auto_stop_idle=2.0, auto_stop_idle=AUTO_STOP_IDLE_SECONDS,
) )
output_after = self.state.snapshot_output() self._prime_agent(agent)
if output_after:
observe = getattr(agent, "observe", None)
if callable(observe):
try:
observe(output_after)
except Exception as exc: # pragma: no cover
print(f"[Agent] observe failed: {exc}")
return True return True
def send_command(command: str) -> None: def send_command(command: str) -> None:
self.state.send(self.client, command) self.state.send_with_source(self.client, command, source="AGENT")
self._emit_to_log(self.agent_log, f"[Agent] command: {command}") self.log_brain(f"[Agent] command: {command}")
try: def run() -> None:
agent = build_agent(spec, allowed_tools=TOOL_DESCRIPTIONS) run_agent(
except RuntimeError as exc:
self.log_agent(f"[Agent] Failed to configure '{spec}': {exc}")
return
last_output = self.state.snapshot_output()
observe = getattr(agent, "observe", None)
if last_output and callable(observe):
try:
observe(last_output)
except Exception as exc: # pragma: no cover
self.log_agent(f"[Agent] observe failed: {exc}")
self._wrap_run(
lambda: run_agent(
agent, agent,
build_tool=build, build_tool=build_tool,
run_tool=run_tool_instance, run_tool=run_tool_instance,
send_command=send_command, send_command=send_command,
stop_event=self._stop_event, stop_event=self._stop_event,
) )
)
self._start_worker(run, name=f"agent-{spec}")
def _prime_agent(self, agent: Agent) -> None:
last_output = self.state.snapshot_output()
if not last_output:
return
observe = getattr(agent, "observe", None)
if not callable(observe):
return
try:
observe(last_output)
except Exception as exc: # pragma: no cover - defensive guard
self.log_brain(f"[Agent] observe failed: {exc}")
def _set_status(self, text: str) -> None:
def update() -> None:
self.status.update(text)
if current_thread() is self._ui_thread:
update()
return
try:
self.call_from_thread(update)
except RuntimeError:
pass
def _write_log(self, log: Log, message: str) -> None:
def update() -> None:
normalized = message.replace("\r\n", "\n").replace("\r", "\n")
for line in normalized.split("\n"):
log.write_line(line)
if current_thread() is self._ui_thread:
update()
return
try:
self.call_from_thread(update)
except RuntimeError:
pass
def _log_action(self, source: str, message: str) -> None:
self.log_mud(f"[{source}] {message}")
def log_mud(self, message: str) -> None:
self._write_log(self.mud_log, message)
def log_brain(self, message: str) -> None:
self._write_log(self.brain_log, message)
if __name__ == "__main__": if __name__ == "__main__":