mistle/textual_ui.py
2026-02-09 09:48:55 +01:00

409 lines
12 KiB
Python

from __future__ import annotations
import io
import os
from contextlib import redirect_stderr, redirect_stdout
from threading import Event, Thread, current_thread
from typing import Callable, Optional
from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical
from textual.widgets import Footer, Header, Input, Label, Log, Static
from agent_runtime import run_agent
from agents import Agent, build_agent
from mud_env import load_env_file, read_connection_settings, read_tool_settings
from mud_session import SessionState, graceful_shutdown, login, run_tool_loop
from mud_tools import TOOL_DESCRIPTIONS, build_tool
from telnetclient import TelnetClient
from tools import Tool
SEND_INTERVAL_SECONDS = 1.0
AUTO_STOP_IDLE_SECONDS = 2.0
class _QueueWriter(io.TextIOBase):
"""Send captured stdout/stderr lines into a UI logger callback."""
def __init__(self, emit: Callable[[str], None]) -> None:
super().__init__()
self._emit = emit
self._buffer = ""
def write(self, s: str) -> int: # type: ignore[override]
self._buffer += s
while "\n" in self._buffer:
line, self._buffer = self._buffer.split("\n", 1)
self._emit(line)
return len(s)
def flush(self) -> None: # type: ignore[override]
if self._buffer:
self._emit(self._buffer)
self._buffer = ""
class UISessionState(SessionState):
"""Session state that emits outgoing commands to the MUD pane."""
def __init__(self, emit_action: Callable[[str, str], None]) -> None:
super().__init__()
self._emit_action = emit_action
def send(self, client: TelnetClient, message: str) -> None:
self.send_with_source(client, message, source="YOU")
def send_with_source(self, client: TelnetClient, message: str, *, source: str) -> None:
super().send(client, message)
self._emit_action(source, message)
def tool_send(
self,
client: TelnetClient,
message: str,
*,
min_interval: float,
stop_event: Event,
) -> bool:
sent = super().tool_send(
client,
message,
min_interval=min_interval,
stop_event=stop_event,
)
if sent:
self._emit_action("TOOL", message)
return sent
class MudUI(App):
CSS = """
Screen {
layout: vertical;
}
#main {
height: 1fr;
padding: 0 1;
}
.pane {
width: 1fr;
height: 1fr;
padding: 0 1;
}
.pane-title {
text-style: bold;
padding: 1 0 0 0;
}
#mud-log {
border: round #4c7a8f;
padding: 0 1;
}
#brain-log {
border: round #7d6f5a;
padding: 0 1;
}
#input-row {
height: auto;
padding: 0 2 1 2;
}
#command {
width: 1fr;
}
#status {
padding: 0 2;
color: #999999;
}
"""
BINDINGS = [
("ctrl+c", "quit", "Quit"),
]
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Horizontal(id="main"):
with Vertical(classes="pane"):
yield Label("MUD Stream", classes="pane-title")
self.mud_log = Log(id="mud-log", auto_scroll=True)
yield self.mud_log
with Vertical(classes="pane"):
yield Label("LLM, Agent, and Tool Stream", classes="pane-title")
self.brain_log = Log(id="brain-log", auto_scroll=True)
yield self.brain_log
with Horizontal(id="input-row"):
self.input = Input(
placeholder="Type MUD command, #execute <tool>, or #agent <spec>",
id="command",
)
yield self.input
self.status = Static("Disconnected", id="status")
yield self.status
yield Footer()
def on_mount(self) -> None:
self._ui_thread = current_thread()
self._stop_event = Event()
self._worker_threads: list[Thread] = []
self._reader_thread: Optional[Thread] = None
load_env_file()
connection = read_connection_settings()
tool_settings = read_tool_settings()
timeout = float(os.environ.get("MISTLE_TIMEOUT", "10"))
self._exit_command = connection.exit_command
self.state = UISessionState(self._log_action)
self.client = TelnetClient(host=connection.host, port=connection.port, timeout=timeout)
try:
self.client.connect()
except Exception as exc: # pragma: no cover - network specific
self.log_mud(f"[error] Failed to connect: {exc}")
self._set_status("Connection failed")
return
login_writer = _QueueWriter(self.log_mud)
with redirect_stdout(login_writer), redirect_stderr(login_writer):
login(
self.client,
user=connection.user,
password=connection.password,
login_prompt=connection.login_prompt,
state=self.state,
)
login_writer.flush()
self._reader_thread = Thread(target=self._reader_loop, daemon=True, name="mud-reader")
self._reader_thread.start()
if tool_settings.tool_mode:
self._launch_persistent_tool(tool_settings.tool_spec)
sideload_seen: set[str] = set()
for spec in tool_settings.sideload_specs:
lowered = spec.lower()
if lowered in sideload_seen:
continue
sideload_seen.add(lowered)
self._launch_persistent_tool(spec)
self._set_status(f"Connected to {connection.host}:{connection.port}")
self.input.focus()
self.log_mud(f"Connected to {connection.host}:{connection.port}")
def on_unmount(self) -> None:
self._stop_event.set()
try:
graceful_shutdown(self.client, self._exit_command, state=self.state)
except Exception:
pass
if self._reader_thread:
self._reader_thread.join(timeout=1.0)
for thread in self._worker_threads:
thread.join(timeout=1.0)
try:
self.client.close()
except Exception:
pass
def on_input_submitted(self, event: Input.Submitted) -> None:
raw = event.value
event.input.value = ""
command = raw.strip()
if not command:
return
if command.startswith("#execute"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
self.log_brain("[Tool] Usage: #execute <tool_spec>")
else:
self._start_tool(parts[1])
return
if command.startswith("#agent"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
self.log_brain("[Agent] Usage: #agent <agent_spec>")
else:
self._start_agent(parts[1])
return
self.state.send(self.client, command)
def _reader_loop(self) -> None:
while not self._stop_event.is_set():
data = self.client.receive(timeout=0.3)
if not data:
continue
self.state.update_output(data)
self.log_mud(data)
def _start_worker(self, fn: Callable[[], None], *, name: str) -> Thread:
def runner() -> None:
writer = _QueueWriter(self.log_brain)
with redirect_stdout(writer), redirect_stderr(writer):
fn()
writer.flush()
thread = Thread(target=runner, daemon=True, name=name)
thread.start()
self._worker_threads.append(thread)
return thread
def _launch_persistent_tool(self, spec: str) -> None:
clean_spec = spec.strip()
if not clean_spec:
return
self.log_brain(f"[Tool] Starting persistent tool {clean_spec!r}")
def worker() -> None:
try:
tool = build_tool(clean_spec)
except RuntimeError as exc:
print(f"[Tool] Failed to load '{clean_spec}': {exc}")
return
run_tool_loop(
self.client,
self.state,
tool,
self._stop_event,
min_send_interval=SEND_INTERVAL_SECONDS,
auto_stop=False,
)
self._start_worker(worker, name=f"tool-{clean_spec}")
def _start_tool(self, raw_spec: str) -> None:
spec = raw_spec.strip()
if not spec:
self.log_brain("[Tool] Usage: #execute <tool_spec>")
return
self.log_brain(f"[Tool] Executing {spec!r}")
def worker() -> None:
try:
tool = build_tool(spec)
except RuntimeError as exc:
print(f"[Tool] Failed to load '{spec}': {exc}")
return
run_tool_loop(
self.client,
self.state,
tool,
self._stop_event,
min_send_interval=SEND_INTERVAL_SECONDS,
auto_stop=True,
auto_stop_idle=AUTO_STOP_IDLE_SECONDS,
)
self._start_worker(worker, name=f"ephemeral-tool-{spec}")
def _start_agent(self, raw_spec: str) -> None:
spec = raw_spec.strip()
if not spec:
self.log_brain("[Agent] Usage: #agent <agent_spec>")
return
try:
agent = build_agent(spec, allowed_tools=TOOL_DESCRIPTIONS)
except RuntimeError as exc:
self.log_brain(f"[Agent] Failed to configure '{spec}': {exc}")
return
self.log_brain(f"[Agent] Executing {spec!r}")
self._prime_agent(agent)
def run_tool_instance(tool: Tool) -> bool:
run_tool_loop(
self.client,
self.state,
tool,
self._stop_event,
min_send_interval=SEND_INTERVAL_SECONDS,
auto_stop=True,
auto_stop_idle=AUTO_STOP_IDLE_SECONDS,
)
self._prime_agent(agent)
return True
def send_command(command: str) -> None:
self.state.send_with_source(self.client, command, source="AGENT")
self.log_brain(f"[Agent] command: {command}")
def run() -> None:
run_agent(
agent,
build_tool=build_tool,
run_tool=run_tool_instance,
send_command=send_command,
stop_event=self._stop_event,
)
self._start_worker(run, name=f"agent-{spec}")
def _prime_agent(self, agent: Agent) -> None:
last_output = self.state.snapshot_output()
if not last_output:
return
observe = getattr(agent, "observe", None)
if not callable(observe):
return
try:
observe(last_output)
except Exception as exc: # pragma: no cover - defensive guard
self.log_brain(f"[Agent] observe failed: {exc}")
def _set_status(self, text: str) -> None:
def update() -> None:
self.status.update(text)
if current_thread() is self._ui_thread:
update()
return
try:
self.call_from_thread(update)
except RuntimeError:
pass
def _write_log(self, log: Log, message: str) -> None:
def update() -> None:
normalized = message.replace("\r\n", "\n").replace("\r", "\n")
for line in normalized.split("\n"):
log.write_line(line)
if current_thread() is self._ui_thread:
update()
return
try:
self.call_from_thread(update)
except RuntimeError:
pass
def _log_action(self, source: str, message: str) -> None:
self.log_mud(f"[{source}] {message}")
def log_mud(self, message: str) -> None:
self._write_log(self.mud_log, message)
def log_brain(self, message: str) -> None:
self._write_log(self.brain_log, message)
if __name__ == "__main__":
MudUI().run()