import os import select import sys import time from importlib import import_module from pathlib import Path from threading import Event, Lock, Thread from typing import Callable, Optional, Type from tools import Tool, SimpleTool from agents import Agent, build_agent from agent_runtime import run_agent from telnetclient import TelnetClient class SessionState: """Share Telnet session state safely across threads.""" def __init__(self) -> None: self._send_lock = Lock() self._output_lock = Lock() self._output_event = Event() self._last_output = "" self._last_tool_send = 0.0 def send(self, client: TelnetClient, message: str) -> None: with self._send_lock: client.send(message) def tool_send( self, client: TelnetClient, message: str, *, min_interval: float, stop_event: Event, ) -> bool: """Send on behalf of the tool while respecting a minimum cadence.""" while not stop_event.is_set(): with self._send_lock: now = time.time() elapsed = now - self._last_tool_send if elapsed >= min_interval: client.send(message) self._last_tool_send = now return True wait_time = min_interval - elapsed if wait_time <= 0: continue if stop_event.wait(wait_time): break return False def update_output(self, text: str) -> None: if not text: return with self._output_lock: self._last_output = text self._output_event.set() def snapshot_output(self) -> str: with self._output_lock: return self._last_output def wait_for_output(self, timeout: float) -> bool: return self._output_event.wait(timeout) def clear_output_event(self) -> None: self._output_event.clear() def run_tool_loop( client: TelnetClient, state: SessionState, tool: Tool, stop_event: Event, *, idle_delay: float = 0.5, min_send_interval: float = 1.0, auto_stop: bool = False, auto_stop_idle: float = 2.0, ) -> None: """Invoke *tool* whenever new output arrives and send its response.""" idle_started: Optional[float] = None def maybe_send() -> None: nonlocal idle_started try: command = tool.decide() except Exception as exc: # pragma: no cover - defensive logging print(f"[Tool] Failed: {exc}", file=sys.stderr) return if not command: return sent = state.tool_send( client, command, min_interval=min_send_interval, stop_event=stop_event, ) if not sent: return idle_started = None while not stop_event.is_set(): maybe_send() triggered = state.wait_for_output(timeout=idle_delay) if stop_event.is_set(): break if not triggered: if auto_stop: now = time.time() if idle_started is None: idle_started = now elif now - idle_started >= auto_stop_idle: break continue idle_started = None state.clear_output_event() last_output = state.snapshot_output() if not last_output: continue try: tool.observe(last_output) except Exception as exc: # pragma: no cover - defensive logging print(f"[Tool] Failed during observe: {exc}", file=sys.stderr) continue maybe_send() def load_env_file(path: str = ".env") -> None: """Populate ``os.environ`` with key/value pairs from a dotenv file.""" env_path = Path(path) if not env_path.exists(): return for line in env_path.read_text().splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#"): continue if "=" not in stripped: continue key, value = stripped.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = value def require_env(key: str) -> str: value = os.environ.get(key) if value is None: raise RuntimeError(f"Missing required environment variable: {key}") return value def build_tool(spec: str) -> Tool: """Instantiate a tool based on configuration.""" normalized = spec.strip() if not normalized: return SimpleTool() key = normalized.lower() if key == "simple": return SimpleTool() builtin_tools = { "explore": ("tools", "ExploreTool", {}), "communication": ("tools", "CommunicationTool", {}), "movement": ("movement_tool", "MovementTool", {}), "move": ("movement_tool", "MovementTool", {}), "intelligent": ( "intelligent_tool", "IntelligentCommunicationTool", {"model": os.environ.get("MISTLE_LLM_MODEL", "mistral/mistral-small-2407")}, ), "intelligentcommunication": ( "intelligent_tool", "IntelligentCommunicationTool", {"model": os.environ.get("MISTLE_LLM_MODEL", "mistral/mistral-small-2407")}, ), } if key in builtin_tools: module_name, class_name, kwargs = builtin_tools[key] try: module = import_module(module_name) tool_cls = getattr(module, class_name) except AttributeError as exc: # pragma: no cover - optional dependency raise RuntimeError(f"{class_name} is not available in tools module") from exc tool = _instantiate_tool(tool_cls, normalized, kwargs) model_name = kwargs.get("model") if kwargs else None if model_name: print(f"[Tool] Using LLM model: {model_name}") return tool if ":" in normalized: module_name, class_name = normalized.split(":", 1) if not module_name or not class_name: raise RuntimeError("MISTLE_TOOL must be in 'module:ClassName' format") module = import_module(module_name) tool_cls = getattr(module, class_name) return _instantiate_tool(tool_cls, normalized) raise RuntimeError(f"Unknown tool spec '{spec}'.") def _instantiate_tool( tool_cls: Type[Tool], tool_spec: str, kwargs: Optional[dict] = None ) -> Tool: if not issubclass(tool_cls, Tool): raise RuntimeError(f"{tool_spec} is not a Tool subclass") try: kwargs = kwargs or {} return tool_cls(**kwargs) except TypeError as exc: raise RuntimeError(f"Failed to instantiate {tool_spec}: {exc}") from exc def login( client: TelnetClient, *, user: str, password: str, login_prompt: str, banner_timeout: float = 10.0, response_timeout: float = 2.0, state: Optional[SessionState] = None, ) -> None: """Handle the banner/prompt exchange and send credentials.""" if login_prompt: banner = client.read_until(login_prompt, timeout=banner_timeout) else: banner = client.receive(timeout=response_timeout) if banner: print(banner, end="" if banner.endswith("\n") else "\n") if state: state.update_output(banner) if user: client.send(user) time.sleep(0.2) if password: client.send(password) response = client.receive(timeout=response_timeout) if response: print(response, end="" if response.endswith("\n") else "\n") if state: state.update_output(response) def interactive_session( client: TelnetClient, state: SessionState, stop_event: Event, *, poll_interval: float = 0.2, receive_timeout: float = 0.2, exit_command: str, tool_command: Optional[Callable[[str], None]] = None, agent_command: Optional[Callable[[str], None]] = None, ) -> None: """Keep the Telnet session running, proxying input/output until interrupted.""" if exit_command: print(f"Connected. Press Ctrl-C to exit (will send {exit_command!r}).") else: print("Connected. Press Ctrl-C to exit.") while not stop_event.is_set(): incoming = client.receive(timeout=receive_timeout) if incoming: print(incoming, end="" if incoming.endswith("\n") else "\n") state.update_output(incoming) readable, _, _ = select.select([sys.stdin], [], [], poll_interval) if sys.stdin in readable: line = sys.stdin.readline() if line == "": stop_event.set() break line = line.rstrip("\r\n") if not line: continue lowered = line.lower() if agent_command and lowered.startswith("#agent"): parts = line.split(maxsplit=1) if len(parts) == 1: print("[Agent] Usage: #agent ") else: agent_command(parts[1]) continue if tool_command and lowered.startswith("#execute"): parts = line.split(maxsplit=1) if len(parts) == 1: print("[Tool] Usage: #execute ") else: tool_command(parts[1]) continue state.send(client, line) def graceful_shutdown( client: TelnetClient, exit_command: str, *, state: Optional[SessionState] = None, ) -> None: if not exit_command: return try: if state: state.send(client, exit_command) else: client.send(exit_command) farewell = client.receive(timeout=2.0) if farewell: print(farewell, end="" if farewell.endswith("\n") else "\n") if state: state.update_output(farewell) except Exception as exc: # pragma: no cover - best effort logging print(f"Failed to send exit command: {exc}", file=sys.stderr) def main() -> int: load_env_file() host = require_env("MISTLE_HOST") port_raw = require_env("MISTLE_PORT") try: port = int(port_raw) except ValueError as exc: raise RuntimeError("MISTLE_PORT must be an integer") from exc user = os.environ.get("MISTLE_USER", "") password = os.environ.get("MISTLE_PASSWORD", "") login_prompt = os.environ.get("MISTLE_LOGIN_PROMPT", "") exit_command = os.environ.get("MISTLE_EXIT_COMMAND", "") tool_mode_env = os.environ.get("MISTLE_TOOL_MODE", "") tool_mode = tool_mode_env.lower() in {"1", "true", "yes", "on"} tool_spec = os.environ.get("MISTLE_TOOL", "") state = SessionState() stop_event = Event() tool_thread: Optional[Thread] = None tool: Optional[Tool] = None ephemeral_tools: list[Thread] = [] agent_threads: list[Thread] = [] with TelnetClient(host=host, port=port, timeout=10.0) as client: login( client, user=user, password=password, login_prompt=login_prompt, state=state, ) if tool_mode: tool = build_tool(tool_spec) tool_thread = Thread( target=run_tool_loop, args=(client, state, tool, stop_event), kwargs={"min_send_interval": 1.0}, daemon=True, ) tool_thread.start() def run_ephemeral_tool(spec: str) -> None: spec = spec.strip() if not spec: print("[Tool] Usage: #execute ") return try: temp_tool = build_tool(spec) except RuntimeError as exc: print(f"[Tool] Failed to load '{spec}': {exc}", file=sys.stderr) return thread = Thread( target=run_tool_loop, args=(client, state, temp_tool, stop_event), kwargs={ "min_send_interval": 1.0, "auto_stop": True, "auto_stop_idle": 2.0, }, daemon=True, ) ephemeral_tools.append(thread) print(f"[Tool] Executing {spec!r} once") thread.start() def run_ephemeral_agent(spec: str) -> None: spec = spec.strip() if not spec: print("[Agent] Usage: #agent ") return try: temp_agent = build_agent(spec) except RuntimeError as exc: print(f"[Agent] Failed to configure '{spec}': {exc}", file=sys.stderr) return def run_tool_instance(tool: Tool) -> None: run_tool_loop( client, state, tool, stop_event, min_send_interval=1.0, auto_stop=True, auto_stop_idle=2.0, ) thread = Thread( target=run_agent, args=(temp_agent,), kwargs={ "build_tool": build_tool, "run_tool": run_tool_instance, "stop_event": stop_event, }, daemon=True, ) agent_threads.append(thread) print(f"[Agent] Executing {spec!r}") thread.start() interrupted = False try: interactive_session( client, state=state, stop_event=stop_event, exit_command=exit_command, tool_command=None if tool_mode else run_ephemeral_tool, agent_command=run_ephemeral_agent, ) except KeyboardInterrupt: print() interrupted = True finally: stop_event.set() if tool_thread: tool_thread.join(timeout=1.0) for thread in ephemeral_tools: thread.join(timeout=1.0) for thread in agent_threads: thread.join(timeout=1.0) if interrupted: graceful_shutdown(client, exit_command, state=state) return 0 if __name__ == "__main__": raise SystemExit(main())