mistle/app.py

383 lines
12 KiB
Python

import os
import select
import sys
import time
from importlib import import_module
from pathlib import Path
from threading import Event, Lock, Thread
from typing import Callable, Optional, Type
from agent import Agent, SimpleAgent
from telnetclient import TelnetClient
class SessionState:
"""Share Telnet session state safely across threads."""
def __init__(self) -> None:
self._send_lock = Lock()
self._output_lock = Lock()
self._output_event = Event()
self._last_output = ""
self._last_agent_send = 0.0
def send(self, client: TelnetClient, message: str) -> None:
with self._send_lock:
client.send(message)
def agent_send(
self,
client: TelnetClient,
message: str,
*,
min_interval: float,
stop_event: Event,
) -> bool:
"""Send on behalf of the agent while respecting a minimum cadence."""
while not stop_event.is_set():
with self._send_lock:
now = time.time()
elapsed = now - self._last_agent_send
if elapsed >= min_interval:
client.send(message)
self._last_agent_send = now
return True
wait_time = min_interval - elapsed
if wait_time <= 0:
continue
if stop_event.wait(wait_time):
break
return False
def update_output(self, text: str) -> None:
if not text:
return
with self._output_lock:
self._last_output = text
self._output_event.set()
def snapshot_output(self) -> str:
with self._output_lock:
return self._last_output
def wait_for_output(self, timeout: float) -> bool:
return self._output_event.wait(timeout)
def clear_output_event(self) -> None:
self._output_event.clear()
def run_agent_loop(
client: TelnetClient,
state: SessionState,
agent: Agent,
stop_event: Event,
*,
idle_delay: float = 0.5,
min_send_interval: float = 1.0,
auto_stop: bool = False,
auto_stop_idle: float = 2.0,
) -> None:
"""Invoke *agent* whenever new output arrives and send its response."""
idle_started: Optional[float] = None
while not stop_event.is_set():
triggered = state.wait_for_output(timeout=idle_delay)
if stop_event.is_set():
break
if not triggered:
if auto_stop:
now = time.time()
if idle_started is None:
idle_started = now
elif now - idle_started >= auto_stop_idle:
break
continue
idle_started = None
state.clear_output_event()
last_output = state.snapshot_output()
if not last_output:
continue
try:
agent.observe(last_output)
command = agent.decide()
except Exception as exc: # pragma: no cover - defensive logging
print(f"Agent failed: {exc}", file=sys.stderr)
continue
if not command:
continue
sent = state.agent_send(
client,
command,
min_interval=min_send_interval,
stop_event=stop_event,
)
if not sent:
break
def load_env_file(path: str = ".env") -> None:
"""Populate ``os.environ`` with key/value pairs from a dotenv file."""
env_path = Path(path)
if not env_path.exists():
return
for line in env_path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
def require_env(key: str) -> str:
value = os.environ.get(key)
if value is None:
raise RuntimeError(f"Missing required environment variable: {key}")
return value
def build_agent(agent_spec: str) -> Agent:
"""Instantiate an agent based on ``MISTLE_AGENT`` contents."""
normalized = agent_spec.strip()
if not normalized:
return SimpleAgent()
key = normalized.lower()
if key == "simple":
return SimpleAgent()
builtin_agents = {
"explore": ("agent", "ExploreAgent"),
"communication": ("agent", "CommunicationAgent"),
"intelligent": ("intelligent_agent", "IntelligentCommunicationAgent"),
"intelligentcommunication": (
"intelligent_agent",
"IntelligentCommunicationAgent",
),
}
if key in builtin_agents:
module_name, class_name = builtin_agents[key]
try:
module = import_module(module_name)
agent_cls = getattr(module, class_name)
except AttributeError as exc: # pragma: no cover - optional dependency
raise RuntimeError(f"{class_name} is not available in agent module") from exc
return _instantiate_agent(agent_cls, normalized)
if ":" in normalized:
module_name, class_name = normalized.split(":", 1)
if not module_name or not class_name:
raise RuntimeError("MISTLE_AGENT must be in 'module:ClassName' format")
module = import_module(module_name)
agent_cls = getattr(module, class_name)
return _instantiate_agent(agent_cls, normalized)
raise RuntimeError(f"Unknown agent spec '{agent_spec}'.")
def _instantiate_agent(agent_cls: Type[Agent], agent_spec: str) -> Agent:
if not issubclass(agent_cls, Agent):
raise RuntimeError(f"{agent_spec} is not an Agent subclass")
try:
return agent_cls()
except TypeError as exc:
raise RuntimeError(f"Failed to instantiate {agent_spec}: {exc}") from exc
def login(
client: TelnetClient,
*,
user: str,
password: str,
login_prompt: str,
banner_timeout: float = 10.0,
response_timeout: float = 2.0,
state: Optional[SessionState] = None,
) -> None:
"""Handle the banner/prompt exchange and send credentials."""
if login_prompt:
banner = client.read_until(login_prompt, timeout=banner_timeout)
else:
banner = client.receive(timeout=response_timeout)
if banner:
print(banner, end="" if banner.endswith("\n") else "\n")
if state:
state.update_output(banner)
if user:
client.send(user)
time.sleep(0.2)
if password:
client.send(password)
response = client.receive(timeout=response_timeout)
if response:
print(response, end="" if response.endswith("\n") else "\n")
if state:
state.update_output(response)
def interactive_session(
client: TelnetClient,
state: SessionState,
stop_event: Event,
*,
poll_interval: float = 0.2,
receive_timeout: float = 0.2,
exit_command: str,
agent_command: Optional[Callable[[str], None]] = None,
) -> None:
"""Keep the Telnet session running, proxying input/output until interrupted."""
if exit_command:
print(f"Connected. Press Ctrl-C to exit (will send {exit_command!r}).")
else:
print("Connected. Press Ctrl-C to exit.")
while not stop_event.is_set():
incoming = client.receive(timeout=receive_timeout)
if incoming:
print(incoming, end="" if incoming.endswith("\n") else "\n")
state.update_output(incoming)
readable, _, _ = select.select([sys.stdin], [], [], poll_interval)
if sys.stdin in readable:
line = sys.stdin.readline()
if line == "":
stop_event.set()
break
line = line.rstrip("\r\n")
if not line:
continue
if agent_command and line.lower().startswith("#execute"):
parts = line.split(maxsplit=1)
if len(parts) == 1:
print("[Agent] Usage: #execute <agent_spec>")
else:
agent_command(parts[1])
continue
state.send(client, line)
def graceful_shutdown(
client: TelnetClient,
exit_command: str,
*,
state: Optional[SessionState] = None,
) -> None:
if not exit_command:
return
try:
if state:
state.send(client, exit_command)
else:
client.send(exit_command)
farewell = client.receive(timeout=2.0)
if farewell:
print(farewell, end="" if farewell.endswith("\n") else "\n")
if state:
state.update_output(farewell)
except Exception as exc: # pragma: no cover - best effort logging
print(f"Failed to send exit command: {exc}", file=sys.stderr)
def main() -> int:
load_env_file()
host = require_env("MISTLE_HOST")
port_raw = require_env("MISTLE_PORT")
try:
port = int(port_raw)
except ValueError as exc:
raise RuntimeError("MISTLE_PORT must be an integer") from exc
user = os.environ.get("MISTLE_USER", "")
password = os.environ.get("MISTLE_PASSWORD", "")
login_prompt = os.environ.get("MISTLE_LOGIN_PROMPT", "")
exit_command = os.environ.get("MISTLE_EXIT_COMMAND", "")
agent_mode = os.environ.get("MISTLE_AGENT_MODE", "").lower() in {"1", "true", "yes", "on"}
agent_spec = os.environ.get("MISTLE_AGENT", "")
state = SessionState()
stop_event = Event()
agent_thread: Optional[Thread] = None
agent: Optional[Agent] = None
ephemeral_agents: list[Thread] = []
with TelnetClient(host=host, port=port, timeout=10.0) as client:
login(
client,
user=user,
password=password,
login_prompt=login_prompt,
state=state,
)
if agent_mode:
agent = build_agent(agent_spec)
agent_thread = Thread(
target=run_agent_loop,
args=(client, state, agent, stop_event),
kwargs={"min_send_interval": 1.0},
daemon=True,
)
agent_thread.start()
def run_ephemeral_agent(spec: str) -> None:
spec = spec.strip()
if not spec:
print("[Agent] Usage: #execute <agent_spec>")
return
try:
temp_agent = build_agent(spec)
except RuntimeError as exc:
print(f"[Agent] Failed to load '{spec}': {exc}", file=sys.stderr)
return
thread = Thread(
target=run_agent_loop,
args=(client, state, temp_agent, stop_event),
kwargs={
"min_send_interval": 1.0,
"auto_stop": True,
"auto_stop_idle": 2.0,
},
daemon=True,
)
ephemeral_agents.append(thread)
print(f"[Agent] Executing {spec!r} once")
thread.start()
interrupted = False
try:
interactive_session(
client,
state=state,
stop_event=stop_event,
exit_command=exit_command,
agent_command=None if agent_mode else run_ephemeral_agent,
)
except KeyboardInterrupt:
print()
interrupted = True
finally:
stop_event.set()
if agent_thread:
agent_thread.join(timeout=1.0)
for thread in ephemeral_agents:
thread.join(timeout=1.0)
if interrupted:
graceful_shutdown(client, exit_command, state=state)
return 0
if __name__ == "__main__":
raise SystemExit(main())