mistle/textual_ui.py

260 lines
7.9 KiB
Python

from __future__ import annotations
import io
import os
from contextlib import redirect_stdout, redirect_stderr
from threading import Event, Thread, current_thread
from typing import Callable
from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical
from textual.widgets import Footer, Header, Input, Label, Log
from app import (
TOOL_DESCRIPTIONS,
SessionState,
build_tool,
load_env_file,
login,
require_env,
run_tool_loop,
)
from agent_runtime import run_agent
from agents import build_agent
from telnetclient import TelnetClient
class _QueueWriter(io.TextIOBase):
def __init__(self, emit: Callable[[str], None]) -> None:
super().__init__()
self._emit = emit
self._buffer: str = ""
def write(self, s: str) -> int: # type: ignore[override]
self._buffer += s
while "\n" in self._buffer:
line, self._buffer = self._buffer.split("\n", 1)
if line:
self._emit(line)
return len(s)
def flush(self) -> None: # type: ignore[override]
if self._buffer:
self._emit(self._buffer)
self._buffer = ""
class MudUI(App):
CSS = """
Screen {
layout: vertical;
}
#logs {
height: 1fr;
}
#input-row {
padding: 1 2;
}
Log {
border: round #888881;
padding: 1 1;
}
"""
BINDINGS = [
("ctrl+c", "quit", "Quit"),
]
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Vertical(id="logs"):
yield Label("MUD Output")
self.mud_log = Log(classes="mud")
yield self.mud_log
yield Label("Agent Output")
self.agent_log = Log(classes="agent")
yield self.agent_log
with Horizontal(id="input-row"):
self.input = Input(placeholder="Type command or #execute/#agent ...", id="command")
yield self.input
yield Footer()
def on_mount(self) -> None:
self._stop_event = Event()
self._ui_thread = current_thread()
load_env_file()
host = require_env("MISTLE_HOST")
port = int(require_env("MISTLE_PORT"))
timeout = float(os.environ.get("MISTLE_TIMEOUT", "10"))
self.state = SessionState()
self.client = TelnetClient(host=host, port=port, timeout=timeout)
try:
self.client.connect()
except Exception as exc: # pragma: no cover - network specific
self.log_mud(f"[error] Failed to connect: {exc}")
return
writer = _QueueWriter(lambda line: self._emit_to_log(self.mud_log, line))
with redirect_stdout(writer), redirect_stderr(writer):
login(
self.client,
user=os.environ.get("MISTLE_USER", ""),
password=os.environ.get("MISTLE_PASSWORD", ""),
login_prompt=os.environ.get("MISTLE_LOGIN_PROMPT", ""),
state=self.state,
)
writer.flush()
self.reader_thread = Thread(target=self._reader_loop, daemon=True)
self.reader_thread.start()
self.input.focus()
self.log_mud(f"Connected to {host}:{port}")
def on_input_submitted(self, event: Input.Submitted) -> None:
command = event.value.strip()
event.input.value = ""
if not command:
return
if command.startswith("#execute"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
self.log_agent("Usage: #execute <tool_spec>")
else:
self._start_tool(parts[1])
return
if command.startswith("#agent"):
parts = command.split(maxsplit=1)
if len(parts) == 1:
self.log_agent("Usage: #agent <agent_spec>")
else:
self._start_agent(parts[1])
return
self.state.send(self.client, command)
self.log_agent(f"> {command}")
def on_unmount(self) -> None:
self._stop_event.set()
try:
self.client.close()
except Exception:
pass
def _emit_to_log(self, log: Log, message: str) -> None:
if current_thread() is self._ui_thread:
log.write(message)
else:
log.write(message)
def log_mud(self, message: str) -> None:
self._emit_to_log(self.mud_log, message)
def log_agent(self, message: str) -> None:
self._emit_to_log(self.agent_log, message)
def _reader_loop(self) -> None:
while not self._stop_event.is_set():
data = self.client.receive(timeout=0.3)
if data:
self.state.update_output(data)
self._emit_to_log(self.mud_log, data)
def _wrap_run(self, func: Callable[[], None]) -> Thread:
def runner() -> None:
writer = _QueueWriter(lambda line: self._emit_to_log(self.agent_log, line))
with redirect_stdout(writer), redirect_stderr(writer):
func()
writer.flush()
thread = Thread(target=runner, daemon=True)
thread.start()
return thread
def _start_tool(self, raw_spec: str) -> None:
spec = raw_spec.strip()
if not spec:
self.log_agent("Usage: #execute <tool_spec>")
return
self.log_agent(f"[Tool] Executing {spec!r}")
def worker() -> None:
try:
tool = build_tool(spec)
except RuntimeError as exc:
print(f"[Agent] Failed to load tool {spec}: {exc}")
return
run_tool_loop(
self.client,
self.state,
tool,
self._stop_event,
min_send_interval=1.0,
auto_stop=True,
auto_stop_idle=2.0,
)
self._wrap_run(worker)
def _start_agent(self, raw_spec: str) -> None:
spec = raw_spec.strip()
if not spec:
self.log_agent("Usage: #agent <agent_spec>")
return
self.log_agent(f"[Agent] Executing {spec!r}")
def build(spec_str: str) -> Tool:
return build_tool(spec_str)
def run_tool_instance(tool: Tool) -> bool:
run_tool_loop(
self.client,
self.state,
tool,
self._stop_event,
min_send_interval=1.0,
auto_stop=True,
auto_stop_idle=2.0,
)
output_after = self.state.snapshot_output()
if output_after:
observe = getattr(agent, "observe", None)
if callable(observe):
try:
observe(output_after)
except Exception as exc: # pragma: no cover
print(f"[Agent] observe failed: {exc}")
return True
def send_command(command: str) -> None:
self.state.send(self.client, command)
self._emit_to_log(self.agent_log, f"[Agent] command: {command}")
try:
agent = build_agent(spec, allowed_tools=TOOL_DESCRIPTIONS)
except RuntimeError as exc:
self.log_agent(f"[Agent] Failed to configure '{spec}': {exc}")
return
last_output = self.state.snapshot_output()
observe = getattr(agent, "observe", None)
if last_output and callable(observe):
try:
observe(last_output)
except Exception as exc: # pragma: no cover
self.log_agent(f"[Agent] observe failed: {exc}")
self._wrap_run(
lambda: run_agent(
agent,
build_tool=build,
run_tool=run_tool_instance,
send_command=send_command,
stop_event=self._stop_event,
)
)
if __name__ == "__main__":
MudUI().run()