import os import select import sys import time from importlib import import_module from pathlib import Path from threading import Event, Lock, Thread from typing import Optional, Type from agent import Agent, SimpleAgent from telnetclient import TelnetClient class SessionState: """Share Telnet session state safely across threads.""" def __init__(self) -> None: self._send_lock = Lock() self._output_lock = Lock() self._output_event = Event() self._last_output = "" self._last_agent_send = 0.0 def send(self, client: TelnetClient, message: str) -> None: with self._send_lock: client.send(message) def agent_send( self, client: TelnetClient, message: str, *, min_interval: float, stop_event: Event, ) -> bool: """Send on behalf of the agent while respecting a minimum cadence.""" while not stop_event.is_set(): with self._send_lock: now = time.time() elapsed = now - self._last_agent_send if elapsed >= min_interval: client.send(message) self._last_agent_send = now return True wait_time = min_interval - elapsed if wait_time <= 0: continue if stop_event.wait(wait_time): break return False def update_output(self, text: str) -> None: if not text: return with self._output_lock: self._last_output = text self._output_event.set() def snapshot_output(self) -> str: with self._output_lock: return self._last_output def wait_for_output(self, timeout: float) -> bool: return self._output_event.wait(timeout) def clear_output_event(self) -> None: self._output_event.clear() def run_agent_loop( client: TelnetClient, state: SessionState, agent: Agent, stop_event: Event, *, idle_delay: float = 0.5, min_send_interval: float = 1.0, ) -> None: """Invoke *agent* whenever new output arrives and send its response.""" while not stop_event.is_set(): triggered = state.wait_for_output(timeout=idle_delay) if stop_event.is_set(): break if not triggered: continue state.clear_output_event() last_output = state.snapshot_output() if not last_output: continue try: agent.observe(last_output) command = agent.decide() except Exception as exc: # pragma: no cover - defensive logging print(f"Agent failed: {exc}", file=sys.stderr) continue if not command: continue sent = state.agent_send( client, command, min_interval=min_send_interval, stop_event=stop_event, ) if not sent: break def load_env_file(path: str = ".env") -> None: """Populate ``os.environ`` with key/value pairs from a dotenv file.""" env_path = Path(path) if not env_path.exists(): return for line in env_path.read_text().splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#"): continue if "=" not in stripped: continue key, value = stripped.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = value def require_env(key: str) -> str: value = os.environ.get(key) if value is None: raise RuntimeError(f"Missing required environment variable: {key}") return value def build_agent(agent_spec: str) -> Agent: """Instantiate an agent based on ``MISTLE_AGENT`` contents.""" normalized = agent_spec.strip() if not normalized: return SimpleAgent() key = normalized.lower() if key == "simple": return SimpleAgent() if key == "explore": try: module = import_module("agent") agent_cls = getattr(module, "ExploreAgent") except AttributeError as exc: # pragma: no cover - optional dependency raise RuntimeError("ExploreAgent is not available in agent module") from exc return _instantiate_agent(agent_cls, normalized) if ":" in normalized: module_name, class_name = normalized.split(":", 1) if not module_name or not class_name: raise RuntimeError("MISTLE_AGENT must be in 'module:ClassName' format") module = import_module(module_name) agent_cls = getattr(module, class_name) return _instantiate_agent(agent_cls, normalized) raise RuntimeError(f"Unknown agent spec '{agent_spec}'.") def _instantiate_agent(agent_cls: Type[Agent], agent_spec: str) -> Agent: if not issubclass(agent_cls, Agent): raise RuntimeError(f"{agent_spec} is not an Agent subclass") try: return agent_cls() except TypeError as exc: raise RuntimeError(f"Failed to instantiate {agent_spec}: {exc}") from exc def login( client: TelnetClient, *, user: str, password: str, login_prompt: str, banner_timeout: float = 10.0, response_timeout: float = 2.0, state: Optional[SessionState] = None, ) -> None: """Handle the banner/prompt exchange and send credentials.""" if login_prompt: banner = client.read_until(login_prompt, timeout=banner_timeout) else: banner = client.receive(timeout=response_timeout) if banner: print(banner, end="" if banner.endswith("\n") else "\n") if state: state.update_output(banner) if user: client.send(user) time.sleep(0.2) if password: client.send(password) response = client.receive(timeout=response_timeout) if response: print(response, end="" if response.endswith("\n") else "\n") if state: state.update_output(response) def interactive_session( client: TelnetClient, state: SessionState, stop_event: Event, *, poll_interval: float = 0.2, receive_timeout: float = 0.2, exit_command: str, ) -> None: """Keep the Telnet session running, proxying input/output until interrupted.""" if exit_command: print(f"Connected. Press Ctrl-C to exit (will send {exit_command!r}).") else: print("Connected. Press Ctrl-C to exit.") while not stop_event.is_set(): incoming = client.receive(timeout=receive_timeout) if incoming: print(incoming, end="" if incoming.endswith("\n") else "\n") state.update_output(incoming) readable, _, _ = select.select([sys.stdin], [], [], poll_interval) if sys.stdin in readable: line = sys.stdin.readline() if line == "": stop_event.set() break line = line.rstrip("\r\n") if not line: continue state.send(client, line) def graceful_shutdown( client: TelnetClient, exit_command: str, *, state: Optional[SessionState] = None, ) -> None: if not exit_command: return try: if state: state.send(client, exit_command) else: client.send(exit_command) farewell = client.receive(timeout=2.0) if farewell: print(farewell, end="" if farewell.endswith("\n") else "\n") if state: state.update_output(farewell) except Exception as exc: # pragma: no cover - best effort logging print(f"Failed to send exit command: {exc}", file=sys.stderr) def main() -> int: load_env_file() host = require_env("MISTLE_HOST") port_raw = require_env("MISTLE_PORT") try: port = int(port_raw) except ValueError as exc: raise RuntimeError("MISTLE_PORT must be an integer") from exc user = os.environ.get("MISTLE_USER", "") password = os.environ.get("MISTLE_PASSWORD", "") login_prompt = os.environ.get("MISTLE_LOGIN_PROMPT", "") exit_command = os.environ.get("MISTLE_EXIT_COMMAND", "") agent_mode = os.environ.get("MISTLE_AGENT_MODE", "").lower() in {"1", "true", "yes", "on"} agent_spec = os.environ.get("MISTLE_AGENT", "") state = SessionState() stop_event = Event() agent_thread: Optional[Thread] = None agent: Optional[Agent] = None with TelnetClient(host=host, port=port, timeout=10.0) as client: login( client, user=user, password=password, login_prompt=login_prompt, state=state, ) if agent_mode: agent = build_agent(agent_spec) agent_thread = Thread( target=run_agent_loop, args=(client, state, agent, stop_event), kwargs={"min_send_interval": 1.0}, daemon=True, ) agent_thread.start() interrupted = False try: interactive_session( client, state=state, stop_event=stop_event, exit_command=exit_command, ) except KeyboardInterrupt: print() interrupted = True finally: stop_event.set() if agent_thread: agent_thread.join(timeout=1.0) if interrupted: graceful_shutdown(client, exit_command, state=state) return 0 if __name__ == "__main__": raise SystemExit(main())