From 2a6953db9dced58c37448bb3ef185d5b5cf1e0c3 Mon Sep 17 00:00:00 2001 From: Daniel Eder Date: Mon, 9 Feb 2026 09:48:55 +0100 Subject: [PATCH] feat: improved TUI --- README.md | 5 +- textual_ui.py | 386 +++++++++++++++++++++++++++++++++++--------------- 2 files changed, 274 insertions(+), 117 deletions(-) diff --git a/README.md b/README.md index 023f0ea..1c77b43 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,10 @@ Python-based Telnet helper for connecting to MUD servers, handling login flows, Add guidance text after the type and optional modifiers separated by `|`, e.g. `#agent intelligent explore carefully|model=mistral/mistral-large-2407|delay=2`. The agent only calls built-in tools (`look`, `move`, `movement`, `explore`, `communication`, `intelligentcommunication`) and refuses unknown names. -9. Prefer a TUI? Run `python textual_ui.py` to open a two-pane interface (MUD output on the left, agent output on the right) with an input box at the bottom. It accepts the same commands as the CLI (`#execute …`, `#agent …`, or raw MUD commands). +9. Prefer a TUI? Run `python textual_ui.py` to open a two-pane interface with a bottom input field: + - Left pane: live MUD stream (server output plus outgoing actions from you, tools, and agents). + - Right pane: LLM reasoning plus tool/agent diagnostic output. + It accepts the same commands as the CLI (`#execute …`, `#agent …`, or raw MUD commands). ## Environment Variables diff --git a/textual_ui.py b/textual_ui.py index 430482c..a2defea 100644 --- a/textual_ui.py +++ b/textual_ui.py @@ -2,34 +2,40 @@ from __future__ import annotations import io import os -from contextlib import redirect_stdout, redirect_stderr +from contextlib import redirect_stderr, redirect_stdout from threading import Event, Thread, current_thread -from typing import Callable +from typing import Callable, Optional from textual.app import App, ComposeResult from textual.containers import Horizontal, Vertical -from textual.widgets import Footer, Header, Input, Label, Log +from textual.widgets import Footer, Header, Input, Label, Log, Static -from mud_env import load_env_file, read_connection_settings -from mud_session import SessionState, login, run_tool_loop -from mud_tools import TOOL_DESCRIPTIONS, build_tool from agent_runtime import run_agent -from agents import build_agent +from agents import Agent, build_agent +from mud_env import load_env_file, read_connection_settings, read_tool_settings +from mud_session import SessionState, graceful_shutdown, login, run_tool_loop +from mud_tools import TOOL_DESCRIPTIONS, build_tool from telnetclient import TelnetClient +from tools import Tool + + +SEND_INTERVAL_SECONDS = 1.0 +AUTO_STOP_IDLE_SECONDS = 2.0 class _QueueWriter(io.TextIOBase): + """Send captured stdout/stderr lines into a UI logger callback.""" + def __init__(self, emit: Callable[[str], None]) -> None: super().__init__() self._emit = emit - self._buffer: str = "" + self._buffer = "" def write(self, s: str) -> int: # type: ignore[override] self._buffer += s while "\n" in self._buffer: line, self._buffer = self._buffer.split("\n", 1) - if line: - self._emit(line) + self._emit(line) return len(s) def flush(self) -> None: # type: ignore[override] @@ -38,20 +44,83 @@ class _QueueWriter(io.TextIOBase): self._buffer = "" +class UISessionState(SessionState): + """Session state that emits outgoing commands to the MUD pane.""" + + def __init__(self, emit_action: Callable[[str, str], None]) -> None: + super().__init__() + self._emit_action = emit_action + + def send(self, client: TelnetClient, message: str) -> None: + self.send_with_source(client, message, source="YOU") + + def send_with_source(self, client: TelnetClient, message: str, *, source: str) -> None: + super().send(client, message) + self._emit_action(source, message) + + def tool_send( + self, + client: TelnetClient, + message: str, + *, + min_interval: float, + stop_event: Event, + ) -> bool: + sent = super().tool_send( + client, + message, + min_interval=min_interval, + stop_event=stop_event, + ) + if sent: + self._emit_action("TOOL", message) + return sent + + class MudUI(App): CSS = """ Screen { layout: vertical; } - #logs { + + #main { height: 1fr; + padding: 0 1; } + + .pane { + width: 1fr; + height: 1fr; + padding: 0 1; + } + + .pane-title { + text-style: bold; + padding: 1 0 0 0; + } + + #mud-log { + border: round #4c7a8f; + padding: 0 1; + } + + #brain-log { + border: round #7d6f5a; + padding: 0 1; + } + #input-row { - padding: 1 2; + height: auto; + padding: 0 2 1 2; } - Log { - border: round #888881; - padding: 1 1; + + #command { + width: 1fr; + } + + #status { + padding: 0 2; + color: #999999; } """ @@ -61,37 +130,49 @@ class MudUI(App): def compose(self) -> ComposeResult: yield Header(show_clock=True) - with Vertical(id="logs"): - yield Label("MUD Output") - self.mud_log = Log(classes="mud") - yield self.mud_log - yield Label("Agent Output") - self.agent_log = Log(classes="agent") - yield self.agent_log + with Horizontal(id="main"): + with Vertical(classes="pane"): + yield Label("MUD Stream", classes="pane-title") + self.mud_log = Log(id="mud-log", auto_scroll=True) + yield self.mud_log + with Vertical(classes="pane"): + yield Label("LLM, Agent, and Tool Stream", classes="pane-title") + self.brain_log = Log(id="brain-log", auto_scroll=True) + yield self.brain_log with Horizontal(id="input-row"): - self.input = Input(placeholder="Type command or #execute/#agent ...", id="command") + self.input = Input( + placeholder="Type MUD command, #execute , or #agent ", + id="command", + ) yield self.input + self.status = Static("Disconnected", id="status") + yield self.status yield Footer() def on_mount(self) -> None: - self._stop_event = Event() self._ui_thread = current_thread() + self._stop_event = Event() + self._worker_threads: list[Thread] = [] + self._reader_thread: Optional[Thread] = None + load_env_file() connection = read_connection_settings() + tool_settings = read_tool_settings() timeout = float(os.environ.get("MISTLE_TIMEOUT", "10")) - self.state = SessionState() - self.client = TelnetClient( - host=connection.host, port=connection.port, timeout=timeout - ) + self._exit_command = connection.exit_command + self.state = UISessionState(self._log_action) + self.client = TelnetClient(host=connection.host, port=connection.port, timeout=timeout) + try: self.client.connect() except Exception as exc: # pragma: no cover - network specific self.log_mud(f"[error] Failed to connect: {exc}") + self._set_status("Connection failed") return - writer = _QueueWriter(lambda line: self._emit_to_log(self.mud_log, line)) - with redirect_stdout(writer), redirect_stderr(writer): + login_writer = _QueueWriter(self.log_mud) + with redirect_stdout(login_writer), redirect_stderr(login_writer): login( self.client, user=connection.user, @@ -99,85 +180,100 @@ class MudUI(App): login_prompt=connection.login_prompt, state=self.state, ) - writer.flush() + login_writer.flush() - self.reader_thread = Thread(target=self._reader_loop, daemon=True) - self.reader_thread.start() + self._reader_thread = Thread(target=self._reader_loop, daemon=True, name="mud-reader") + self._reader_thread.start() + if tool_settings.tool_mode: + self._launch_persistent_tool(tool_settings.tool_spec) + + sideload_seen: set[str] = set() + for spec in tool_settings.sideload_specs: + lowered = spec.lower() + if lowered in sideload_seen: + continue + sideload_seen.add(lowered) + self._launch_persistent_tool(spec) + + self._set_status(f"Connected to {connection.host}:{connection.port}") self.input.focus() self.log_mud(f"Connected to {connection.host}:{connection.port}") - def on_input_submitted(self, event: Input.Submitted) -> None: - command = event.value.strip() - event.input.value = "" - if not command: - return - if command.startswith("#execute"): - parts = command.split(maxsplit=1) - if len(parts) == 1: - self.log_agent("Usage: #execute ") - else: - self._start_tool(parts[1]) - return - if command.startswith("#agent"): - parts = command.split(maxsplit=1) - if len(parts) == 1: - self.log_agent("Usage: #agent ") - else: - self._start_agent(parts[1]) - return - self.state.send(self.client, command) - self.log_agent(f"> {command}") - def on_unmount(self) -> None: self._stop_event.set() + + try: + graceful_shutdown(self.client, self._exit_command, state=self.state) + except Exception: + pass + + if self._reader_thread: + self._reader_thread.join(timeout=1.0) + for thread in self._worker_threads: + thread.join(timeout=1.0) + try: self.client.close() except Exception: pass - def _emit_to_log(self, log: Log, message: str) -> None: - if current_thread() is self._ui_thread: - log.write(message) - else: - log.write(message) + def on_input_submitted(self, event: Input.Submitted) -> None: + raw = event.value + event.input.value = "" + command = raw.strip() + if not command: + return - def log_mud(self, message: str) -> None: - self._emit_to_log(self.mud_log, message) + if command.startswith("#execute"): + parts = command.split(maxsplit=1) + if len(parts) == 1: + self.log_brain("[Tool] Usage: #execute ") + else: + self._start_tool(parts[1]) + return - def log_agent(self, message: str) -> None: - self._emit_to_log(self.agent_log, message) + if command.startswith("#agent"): + parts = command.split(maxsplit=1) + if len(parts) == 1: + self.log_brain("[Agent] Usage: #agent ") + else: + self._start_agent(parts[1]) + return + + self.state.send(self.client, command) def _reader_loop(self) -> None: while not self._stop_event.is_set(): data = self.client.receive(timeout=0.3) - if data: - self.state.update_output(data) - self._emit_to_log(self.mud_log, data) + if not data: + continue + self.state.update_output(data) + self.log_mud(data) - def _wrap_run(self, func: Callable[[], None]) -> Thread: + def _start_worker(self, fn: Callable[[], None], *, name: str) -> Thread: def runner() -> None: - writer = _QueueWriter(lambda line: self._emit_to_log(self.agent_log, line)) + writer = _QueueWriter(self.log_brain) with redirect_stdout(writer), redirect_stderr(writer): - func() + fn() writer.flush() - thread = Thread(target=runner, daemon=True) + thread = Thread(target=runner, daemon=True, name=name) thread.start() + self._worker_threads.append(thread) return thread - def _start_tool(self, raw_spec: str) -> None: - spec = raw_spec.strip() - if not spec: - self.log_agent("Usage: #execute ") + def _launch_persistent_tool(self, spec: str) -> None: + clean_spec = spec.strip() + if not clean_spec: return - self.log_agent(f"[Tool] Executing {spec!r}") + self.log_brain(f"[Tool] Starting persistent tool {clean_spec!r}") def worker() -> None: try: - tool = build_tool(spec) + tool = build_tool(clean_spec) except RuntimeError as exc: - print(f"[Agent] Failed to load tool {spec}: {exc}") + print(f"[Tool] Failed to load '{clean_spec}': {exc}") return run_tool_loop( @@ -185,22 +281,53 @@ class MudUI(App): self.state, tool, self._stop_event, - min_send_interval=1.0, - auto_stop=True, - auto_stop_idle=2.0, + min_send_interval=SEND_INTERVAL_SECONDS, + auto_stop=False, ) - self._wrap_run(worker) + self._start_worker(worker, name=f"tool-{clean_spec}") + + def _start_tool(self, raw_spec: str) -> None: + spec = raw_spec.strip() + if not spec: + self.log_brain("[Tool] Usage: #execute ") + return + + self.log_brain(f"[Tool] Executing {spec!r}") + + def worker() -> None: + try: + tool = build_tool(spec) + except RuntimeError as exc: + print(f"[Tool] Failed to load '{spec}': {exc}") + return + + run_tool_loop( + self.client, + self.state, + tool, + self._stop_event, + min_send_interval=SEND_INTERVAL_SECONDS, + auto_stop=True, + auto_stop_idle=AUTO_STOP_IDLE_SECONDS, + ) + + self._start_worker(worker, name=f"ephemeral-tool-{spec}") def _start_agent(self, raw_spec: str) -> None: spec = raw_spec.strip() if not spec: - self.log_agent("Usage: #agent ") + self.log_brain("[Agent] Usage: #agent ") return - self.log_agent(f"[Agent] Executing {spec!r}") - def build(spec_str: str) -> Tool: - return build_tool(spec_str) + try: + agent = build_agent(spec, allowed_tools=TOOL_DESCRIPTIONS) + except RuntimeError as exc: + self.log_brain(f"[Agent] Failed to configure '{spec}': {exc}") + return + + self.log_brain(f"[Agent] Executing {spec!r}") + self._prime_agent(agent) def run_tool_instance(tool: Tool) -> bool: run_tool_loop( @@ -208,47 +335,74 @@ class MudUI(App): self.state, tool, self._stop_event, - min_send_interval=1.0, + min_send_interval=SEND_INTERVAL_SECONDS, auto_stop=True, - auto_stop_idle=2.0, + auto_stop_idle=AUTO_STOP_IDLE_SECONDS, ) - output_after = self.state.snapshot_output() - if output_after: - observe = getattr(agent, "observe", None) - if callable(observe): - try: - observe(output_after) - except Exception as exc: # pragma: no cover - print(f"[Agent] observe failed: {exc}") + self._prime_agent(agent) return True def send_command(command: str) -> None: - self.state.send(self.client, command) - self._emit_to_log(self.agent_log, f"[Agent] command: {command}") + self.state.send_with_source(self.client, command, source="AGENT") + self.log_brain(f"[Agent] command: {command}") - try: - agent = build_agent(spec, allowed_tools=TOOL_DESCRIPTIONS) - except RuntimeError as exc: - self.log_agent(f"[Agent] Failed to configure '{spec}': {exc}") - return - - last_output = self.state.snapshot_output() - observe = getattr(agent, "observe", None) - if last_output and callable(observe): - try: - observe(last_output) - except Exception as exc: # pragma: no cover - self.log_agent(f"[Agent] observe failed: {exc}") - - self._wrap_run( - lambda: run_agent( + def run() -> None: + run_agent( agent, - build_tool=build, + build_tool=build_tool, run_tool=run_tool_instance, send_command=send_command, stop_event=self._stop_event, ) - ) + + self._start_worker(run, name=f"agent-{spec}") + + def _prime_agent(self, agent: Agent) -> None: + last_output = self.state.snapshot_output() + if not last_output: + return + observe = getattr(agent, "observe", None) + if not callable(observe): + return + try: + observe(last_output) + except Exception as exc: # pragma: no cover - defensive guard + self.log_brain(f"[Agent] observe failed: {exc}") + + def _set_status(self, text: str) -> None: + def update() -> None: + self.status.update(text) + + if current_thread() is self._ui_thread: + update() + return + try: + self.call_from_thread(update) + except RuntimeError: + pass + + def _write_log(self, log: Log, message: str) -> None: + def update() -> None: + normalized = message.replace("\r\n", "\n").replace("\r", "\n") + for line in normalized.split("\n"): + log.write_line(line) + + if current_thread() is self._ui_thread: + update() + return + try: + self.call_from_thread(update) + except RuntimeError: + pass + + def _log_action(self, source: str, message: str) -> None: + self.log_mud(f"[{source}] {message}") + + def log_mud(self, message: str) -> None: + self._write_log(self.mud_log, message) + + def log_brain(self, message: str) -> None: + self._write_log(self.brain_log, message) if __name__ == "__main__":