from __future__ import annotations import io import os from contextlib import redirect_stdout, redirect_stderr from threading import Event, Thread, current_thread from typing import Callable from textual.app import App, ComposeResult from textual.containers import Horizontal, Vertical from textual.widgets import Footer, Header, Input, Label, Log from app import ( TOOL_DESCRIPTIONS, SessionState, build_tool, load_env_file, login, require_env, run_tool_loop, ) from agent_runtime import run_agent from agents import build_agent from telnetclient import TelnetClient class _QueueWriter(io.TextIOBase): def __init__(self, emit: Callable[[str], None]) -> None: super().__init__() self._emit = emit self._buffer: str = "" def write(self, s: str) -> int: # type: ignore[override] self._buffer += s while "\n" in self._buffer: line, self._buffer = self._buffer.split("\n", 1) if line: self._emit(line) return len(s) def flush(self) -> None: # type: ignore[override] if self._buffer: self._emit(self._buffer) self._buffer = "" class MudUI(App): CSS = """ Screen { layout: vertical; } #logs { height: 1fr; } #input-row { padding: 1 2; } Log { border: round #888881; padding: 1 1; } """ BINDINGS = [ ("ctrl+c", "quit", "Quit"), ] def compose(self) -> ComposeResult: yield Header(show_clock=True) with Vertical(id="logs"): yield Label("MUD Output") self.mud_log = Log(classes="mud") yield self.mud_log yield Label("Agent Output") self.agent_log = Log(classes="agent") yield self.agent_log with Horizontal(id="input-row"): self.input = Input(placeholder="Type command or #execute/#agent ...", id="command") yield self.input yield Footer() def on_mount(self) -> None: self._stop_event = Event() self._ui_thread = current_thread() load_env_file() host = require_env("MISTLE_HOST") port = int(require_env("MISTLE_PORT")) timeout = float(os.environ.get("MISTLE_TIMEOUT", "10")) self.state = SessionState() self.client = TelnetClient(host=host, port=port, timeout=timeout) try: self.client.connect() except Exception as exc: # pragma: no cover - network specific self.log_mud(f"[error] Failed to connect: {exc}") return writer = _QueueWriter(lambda line: self._emit_to_log(self.mud_log, line)) with redirect_stdout(writer), redirect_stderr(writer): login( self.client, user=os.environ.get("MISTLE_USER", ""), password=os.environ.get("MISTLE_PASSWORD", ""), login_prompt=os.environ.get("MISTLE_LOGIN_PROMPT", ""), state=self.state, ) writer.flush() self.reader_thread = Thread(target=self._reader_loop, daemon=True) self.reader_thread.start() self.input.focus() self.log_mud(f"Connected to {host}:{port}") def on_input_submitted(self, event: Input.Submitted) -> None: command = event.value.strip() event.input.value = "" if not command: return if command.startswith("#execute"): parts = command.split(maxsplit=1) if len(parts) == 1: self.log_agent("Usage: #execute ") else: self._start_tool(parts[1]) return if command.startswith("#agent"): parts = command.split(maxsplit=1) if len(parts) == 1: self.log_agent("Usage: #agent ") else: self._start_agent(parts[1]) return self.state.send(self.client, command) self.log_agent(f"> {command}") def on_unmount(self) -> None: self._stop_event.set() try: self.client.close() except Exception: pass def _emit_to_log(self, log: Log, message: str) -> None: if current_thread() is self._ui_thread: log.write(message) else: log.write(message) def log_mud(self, message: str) -> None: self._emit_to_log(self.mud_log, message) def log_agent(self, message: str) -> None: self._emit_to_log(self.agent_log, message) def _reader_loop(self) -> None: while not self._stop_event.is_set(): data = self.client.receive(timeout=0.3) if data: self.state.update_output(data) self._emit_to_log(self.mud_log, data) def _wrap_run(self, func: Callable[[], None]) -> Thread: def runner() -> None: writer = _QueueWriter(lambda line: self._emit_to_log(self.agent_log, line)) with redirect_stdout(writer), redirect_stderr(writer): func() writer.flush() thread = Thread(target=runner, daemon=True) thread.start() return thread def _start_tool(self, raw_spec: str) -> None: spec = raw_spec.strip() if not spec: self.log_agent("Usage: #execute ") return self.log_agent(f"[Tool] Executing {spec!r}") def worker() -> None: try: tool = build_tool(spec) except RuntimeError as exc: print(f"[Agent] Failed to load tool {spec}: {exc}") return run_tool_loop( self.client, self.state, tool, self._stop_event, min_send_interval=1.0, auto_stop=True, auto_stop_idle=2.0, ) self._wrap_run(worker) def _start_agent(self, raw_spec: str) -> None: spec = raw_spec.strip() if not spec: self.log_agent("Usage: #agent ") return self.log_agent(f"[Agent] Executing {spec!r}") def build(spec_str: str) -> Tool: return build_tool(spec_str) def run_tool_instance(tool: Tool) -> bool: run_tool_loop( self.client, self.state, tool, self._stop_event, min_send_interval=1.0, auto_stop=True, auto_stop_idle=2.0, ) output_after = self.state.snapshot_output() if output_after: observe = getattr(agent, "observe", None) if callable(observe): try: observe(output_after) except Exception as exc: # pragma: no cover print(f"[Agent] observe failed: {exc}") return True def send_command(command: str) -> None: self.state.send(self.client, command) self._emit_to_log(self.agent_log, f"[Agent] command: {command}") try: agent = build_agent(spec, allowed_tools=TOOL_DESCRIPTIONS) except RuntimeError as exc: self.log_agent(f"[Agent] Failed to configure '{spec}': {exc}") return last_output = self.state.snapshot_output() observe = getattr(agent, "observe", None) if last_output and callable(observe): try: observe(last_output) except Exception as exc: # pragma: no cover self.log_agent(f"[Agent] observe failed: {exc}") self._wrap_run( lambda: run_agent( agent, build_tool=build, run_tool=run_tool_instance, send_command=send_command, stop_event=self._stop_event, ) ) if __name__ == "__main__": MudUI().run()