mistle/app.py

409 lines
12 KiB
Python

import os
import select
import sys
import time
from importlib import import_module
from pathlib import Path
from threading import Event, Lock, Thread
from typing import Callable, Optional, Type
from tools import Tool, SimpleTool
from telnetclient import TelnetClient
class SessionState:
"""Share Telnet session state safely across threads."""
def __init__(self) -> None:
self._send_lock = Lock()
self._output_lock = Lock()
self._output_event = Event()
self._last_output = ""
self._last_tool_send = 0.0
def send(self, client: TelnetClient, message: str) -> None:
with self._send_lock:
client.send(message)
def tool_send(
self,
client: TelnetClient,
message: str,
*,
min_interval: float,
stop_event: Event,
) -> bool:
"""Send on behalf of the tool while respecting a minimum cadence."""
while not stop_event.is_set():
with self._send_lock:
now = time.time()
elapsed = now - self._last_tool_send
if elapsed >= min_interval:
client.send(message)
self._last_tool_send = now
return True
wait_time = min_interval - elapsed
if wait_time <= 0:
continue
if stop_event.wait(wait_time):
break
return False
def update_output(self, text: str) -> None:
if not text:
return
with self._output_lock:
self._last_output = text
self._output_event.set()
def snapshot_output(self) -> str:
with self._output_lock:
return self._last_output
def wait_for_output(self, timeout: float) -> bool:
return self._output_event.wait(timeout)
def clear_output_event(self) -> None:
self._output_event.clear()
def run_tool_loop(
client: TelnetClient,
state: SessionState,
tool: Tool,
stop_event: Event,
*,
idle_delay: float = 0.5,
min_send_interval: float = 1.0,
auto_stop: bool = False,
auto_stop_idle: float = 2.0,
) -> None:
"""Invoke *tool* whenever new output arrives and send its response."""
idle_started: Optional[float] = None
def maybe_send() -> None:
nonlocal idle_started
try:
command = tool.decide()
except Exception as exc: # pragma: no cover - defensive logging
print(f"[Tool] Failed: {exc}", file=sys.stderr)
return
if not command:
return
sent = state.tool_send(
client,
command,
min_interval=min_send_interval,
stop_event=stop_event,
)
if not sent:
return
idle_started = None
while not stop_event.is_set():
maybe_send()
triggered = state.wait_for_output(timeout=idle_delay)
if stop_event.is_set():
break
if not triggered:
if auto_stop:
now = time.time()
if idle_started is None:
idle_started = now
elif now - idle_started >= auto_stop_idle:
break
continue
idle_started = None
state.clear_output_event()
last_output = state.snapshot_output()
if not last_output:
continue
try:
tool.observe(last_output)
except Exception as exc: # pragma: no cover - defensive logging
print(f"[Tool] Failed during observe: {exc}", file=sys.stderr)
continue
maybe_send()
def load_env_file(path: str = ".env") -> None:
"""Populate ``os.environ`` with key/value pairs from a dotenv file."""
env_path = Path(path)
if not env_path.exists():
return
for line in env_path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
def require_env(key: str) -> str:
value = os.environ.get(key)
if value is None:
raise RuntimeError(f"Missing required environment variable: {key}")
return value
def build_tool(spec: str) -> Tool:
"""Instantiate a tool based on configuration."""
normalized = spec.strip()
if not normalized:
return SimpleTool()
key = normalized.lower()
if key == "simple":
return SimpleTool()
builtin_tools = {
"explore": ("tools", "ExploreTool", {}),
"communication": ("tools", "CommunicationTool", {}),
"movement": ("movement_tool", "MovementTool", {}),
"move": ("movement_tool", "MovementTool", {}),
"intelligent": (
"intelligent_tool",
"IntelligentCommunicationTool",
{"model": os.environ.get("MISTLE_LLM_MODEL", "mistral/mistral-small-2407")},
),
"intelligentcommunication": (
"intelligent_tool",
"IntelligentCommunicationTool",
{"model": os.environ.get("MISTLE_LLM_MODEL", "mistral/mistral-small-2407")},
),
}
if key in builtin_tools:
module_name, class_name, kwargs = builtin_tools[key]
try:
module = import_module(module_name)
tool_cls = getattr(module, class_name)
except AttributeError as exc: # pragma: no cover - optional dependency
raise RuntimeError(f"{class_name} is not available in tools module") from exc
tool = _instantiate_tool(tool_cls, normalized, kwargs)
model_name = kwargs.get("model") if kwargs else None
if model_name:
print(f"[Tool] Using LLM model: {model_name}")
return tool
if ":" in normalized:
module_name, class_name = normalized.split(":", 1)
if not module_name or not class_name:
raise RuntimeError("MISTLE_TOOL must be in 'module:ClassName' format")
module = import_module(module_name)
tool_cls = getattr(module, class_name)
return _instantiate_tool(tool_cls, normalized)
raise RuntimeError(f"Unknown tool spec '{spec}'.")
def _instantiate_tool(
tool_cls: Type[Tool], tool_spec: str, kwargs: Optional[dict] = None
) -> Tool:
if not issubclass(tool_cls, Tool):
raise RuntimeError(f"{tool_spec} is not a Tool subclass")
try:
kwargs = kwargs or {}
return tool_cls(**kwargs)
except TypeError as exc:
raise RuntimeError(f"Failed to instantiate {tool_spec}: {exc}") from exc
def login(
client: TelnetClient,
*,
user: str,
password: str,
login_prompt: str,
banner_timeout: float = 10.0,
response_timeout: float = 2.0,
state: Optional[SessionState] = None,
) -> None:
"""Handle the banner/prompt exchange and send credentials."""
if login_prompt:
banner = client.read_until(login_prompt, timeout=banner_timeout)
else:
banner = client.receive(timeout=response_timeout)
if banner:
print(banner, end="" if banner.endswith("\n") else "\n")
if state:
state.update_output(banner)
if user:
client.send(user)
time.sleep(0.2)
if password:
client.send(password)
response = client.receive(timeout=response_timeout)
if response:
print(response, end="" if response.endswith("\n") else "\n")
if state:
state.update_output(response)
def interactive_session(
client: TelnetClient,
state: SessionState,
stop_event: Event,
*,
poll_interval: float = 0.2,
receive_timeout: float = 0.2,
exit_command: str,
tool_command: Optional[Callable[[str], None]] = None,
) -> None:
"""Keep the Telnet session running, proxying input/output until interrupted."""
if exit_command:
print(f"Connected. Press Ctrl-C to exit (will send {exit_command!r}).")
else:
print("Connected. Press Ctrl-C to exit.")
while not stop_event.is_set():
incoming = client.receive(timeout=receive_timeout)
if incoming:
print(incoming, end="" if incoming.endswith("\n") else "\n")
state.update_output(incoming)
readable, _, _ = select.select([sys.stdin], [], [], poll_interval)
if sys.stdin in readable:
line = sys.stdin.readline()
if line == "":
stop_event.set()
break
line = line.rstrip("\r\n")
if not line:
continue
if tool_command and line.lower().startswith("#execute"):
parts = line.split(maxsplit=1)
if len(parts) == 1:
print("[Tool] Usage: #execute <tool_spec>")
else:
tool_command(parts[1])
continue
state.send(client, line)
def graceful_shutdown(
client: TelnetClient,
exit_command: str,
*,
state: Optional[SessionState] = None,
) -> None:
if not exit_command:
return
try:
if state:
state.send(client, exit_command)
else:
client.send(exit_command)
farewell = client.receive(timeout=2.0)
if farewell:
print(farewell, end="" if farewell.endswith("\n") else "\n")
if state:
state.update_output(farewell)
except Exception as exc: # pragma: no cover - best effort logging
print(f"Failed to send exit command: {exc}", file=sys.stderr)
def main() -> int:
load_env_file()
host = require_env("MISTLE_HOST")
port_raw = require_env("MISTLE_PORT")
try:
port = int(port_raw)
except ValueError as exc:
raise RuntimeError("MISTLE_PORT must be an integer") from exc
user = os.environ.get("MISTLE_USER", "")
password = os.environ.get("MISTLE_PASSWORD", "")
login_prompt = os.environ.get("MISTLE_LOGIN_PROMPT", "")
exit_command = os.environ.get("MISTLE_EXIT_COMMAND", "")
tool_mode_env = os.environ.get("MISTLE_TOOL_MODE", "")
tool_mode = tool_mode_env.lower() in {"1", "true", "yes", "on"}
tool_spec = os.environ.get("MISTLE_TOOL", "")
state = SessionState()
stop_event = Event()
tool_thread: Optional[Thread] = None
tool: Optional[Tool] = None
ephemeral_tools: list[Thread] = []
with TelnetClient(host=host, port=port, timeout=10.0) as client:
login(
client,
user=user,
password=password,
login_prompt=login_prompt,
state=state,
)
if tool_mode:
tool = build_tool(tool_spec)
tool_thread = Thread(
target=run_tool_loop,
args=(client, state, tool, stop_event),
kwargs={"min_send_interval": 1.0},
daemon=True,
)
tool_thread.start()
def run_ephemeral_tool(spec: str) -> None:
spec = spec.strip()
if not spec:
print("[Tool] Usage: #execute <tool_spec>")
return
try:
temp_tool = build_tool(spec)
except RuntimeError as exc:
print(f"[Tool] Failed to load '{spec}': {exc}", file=sys.stderr)
return
thread = Thread(
target=run_tool_loop,
args=(client, state, temp_tool, stop_event),
kwargs={
"min_send_interval": 1.0,
"auto_stop": True,
"auto_stop_idle": 2.0,
},
daemon=True,
)
ephemeral_tools.append(thread)
print(f"[Tool] Executing {spec!r} once")
thread.start()
interrupted = False
try:
interactive_session(
client,
state=state,
stop_event=stop_event,
exit_command=exit_command,
tool_command=None if tool_mode else run_ephemeral_tool,
)
except KeyboardInterrupt:
print()
interrupted = True
finally:
stop_event.set()
if tool_thread:
tool_thread.join(timeout=1.0)
for thread in ephemeral_tools:
thread.join(timeout=1.0)
if interrupted:
graceful_shutdown(client, exit_command, state=state)
return 0
if __name__ == "__main__":
raise SystemExit(main())