from __future__ import annotations import sys from threading import Event, Thread from typing import Optional from agent_runtime import run_agent from agents import Agent, build_agent from goal_loader import load_goal_instructions from mud_env import load_env_file, read_connection_settings, read_tool_settings from mud_session import ( SessionState, graceful_shutdown, interactive_session, login, run_tool_loop, ) from mud_tools import TOOL_DESCRIPTIONS, build_tool from telnetclient import TelnetClient from tools import Tool DEFAULT_SEND_INTERVAL = 1.0 AUTO_STOP_IDLE_SECONDS = 2.0 def _start_tool_thread( client: TelnetClient, state: SessionState, tool: Tool, stop_event: Event, *, auto_stop: bool = False, auto_stop_idle: float = AUTO_STOP_IDLE_SECONDS, ) -> Thread: thread = Thread( target=run_tool_loop, args=(client, state, tool, stop_event), kwargs={ "min_send_interval": DEFAULT_SEND_INTERVAL, "auto_stop": auto_stop, "auto_stop_idle": auto_stop_idle, }, daemon=True, ) thread.start() return thread def _build_sideload_tools(specs: list[str]) -> list[tuple[str, Tool]]: sideload_tools: list[tuple[str, Tool]] = [] seen_sideloads: set[str] = set() for spec in specs: lowered = spec.lower() if lowered in seen_sideloads: continue seen_sideloads.add(lowered) try: tool_instance = build_tool(spec) except RuntimeError as exc: print(f"[Tool] Failed to load sideload '{spec}': {exc}", file=sys.stderr) continue sideload_tools.append((spec, tool_instance)) return sideload_tools def _prime_agent(agent: Agent, output: str) -> None: if not output: return observe = getattr(agent, "observe", None) if callable(observe): try: observe(output) except Exception as exc: # pragma: no cover - defensive print(f"[Agent] observe failed: {exc}", file=sys.stderr) def _join_threads(threads: list[Thread], *, timeout: float = 1.0) -> None: for thread in threads: thread.join(timeout=timeout) def main() -> int: load_env_file() connection = read_connection_settings() tool_settings = read_tool_settings() sideload_tools = _build_sideload_tools(tool_settings.sideload_specs) state = SessionState() stop_event = Event() tool_thread: Optional[Thread] = None tool: Optional[Tool] = None ephemeral_tools: list[Thread] = [] sidecar_threads: list[Thread] = [] agent_threads: list[Thread] = [] with TelnetClient(host=connection.host, port=connection.port, timeout=10.0) as client: login( client, user=connection.user, password=connection.password, login_prompt=connection.login_prompt, state=state, ) if tool_settings.tool_mode: tool = build_tool(tool_settings.tool_spec) tool_thread = _start_tool_thread(client, state, tool, stop_event) if sideload_tools: for sidecar_spec, sidecar_tool in sideload_tools: thread = _start_tool_thread(client, state, sidecar_tool, stop_event) sidecar_threads.append(thread) print( f"[Tool] Sideloading '{sidecar_spec}' ({sidecar_tool.__class__.__name__})" ) def run_ephemeral_tool(spec: str) -> None: spec = spec.strip() if not spec: print("[Tool] Usage: #execute ") return try: temp_tool = build_tool(spec) except RuntimeError as exc: print(f"[Tool] Failed to load '{spec}': {exc}", file=sys.stderr) return thread = _start_tool_thread( client, state, temp_tool, stop_event, auto_stop=True, auto_stop_idle=AUTO_STOP_IDLE_SECONDS, ) ephemeral_tools.append(thread) print(f"[Tool] Executing {spec!r} once") def run_ephemeral_agent(spec: str) -> None: spec = spec.strip() if not spec: print("[Agent] Usage: #agent ") return try: temp_agent = build_agent(spec, allowed_tools=TOOL_DESCRIPTIONS) except RuntimeError as exc: print(f"[Agent] Failed to configure '{spec}': {exc}", file=sys.stderr) return _start_agent_thread(temp_agent, label=spec) def _start_agent_thread(agent: Agent, *, label: str) -> None: _prime_agent(agent, state.snapshot_output()) def run_tool_instance(tool: Tool) -> bool: run_tool_loop( client, state, tool, stop_event, min_send_interval=DEFAULT_SEND_INTERVAL, auto_stop=True, auto_stop_idle=AUTO_STOP_IDLE_SECONDS, ) _prime_agent(agent, state.snapshot_output()) return True thread = Thread( target=run_agent, args=(agent,), kwargs={ "build_tool": build_tool, "run_tool": run_tool_instance, "send_command": lambda cmd: state.send(client, cmd), "stop_event": stop_event, }, daemon=True, ) agent_threads.append(thread) print(f"[Agent] Executing {label!r}") thread.start() if tool_settings.autoplay: try: autoplay_agent = build_agent("intelligent", allowed_tools=TOOL_DESCRIPTIONS) except RuntimeError as exc: print(f"[Autoplay] Failed to configure intelligent agent: {exc}", file=sys.stderr) else: goal = load_goal_instructions() if goal: setattr(autoplay_agent, "instruction", goal) print("[Autoplay] Loaded instructions from GOAL.md") else: print( "[Autoplay] GOAL.md missing or empty; starting without extra instructions", file=sys.stderr, ) _start_agent_thread(autoplay_agent, label="intelligent (autoplay)") interrupted = False try: interactive_session( client, state=state, stop_event=stop_event, exit_command=connection.exit_command, tool_command=None if tool_settings.tool_mode else run_ephemeral_tool, agent_command=run_ephemeral_agent, ) except KeyboardInterrupt: print() interrupted = True finally: stop_event.set() if tool_thread: tool_thread.join(timeout=1.0) _join_threads(sidecar_threads) _join_threads(ephemeral_tools) _join_threads(agent_threads) if interrupted: graceful_shutdown(client, connection.exit_command, state=state) return 0 if __name__ == "__main__": raise SystemExit(main())