mistle/app.py

288 lines
8.1 KiB
Python

import os
import select
import sys
import time
from pathlib import Path
from threading import Event, Lock, Thread
from typing import Callable, Optional
from telnetclient import TelnetClient
AgentFunction = Callable[[str], Optional[str]]
class SessionState:
"""Share Telnet session state safely across threads."""
def __init__(self) -> None:
self._send_lock = Lock()
self._output_lock = Lock()
self._output_event = Event()
self._last_output = ""
self._last_agent_send = 0.0
def send(self, client: TelnetClient, message: str) -> None:
with self._send_lock:
client.send(message)
def agent_send(
self,
client: TelnetClient,
message: str,
*,
min_interval: float,
stop_event: Event,
) -> bool:
"""Send on behalf of the agent while respecting a minimum cadence."""
while not stop_event.is_set():
with self._send_lock:
now = time.time()
elapsed = now - self._last_agent_send
if elapsed >= min_interval:
client.send(message)
self._last_agent_send = now
return True
wait_time = min_interval - elapsed
if wait_time <= 0:
continue
if stop_event.wait(wait_time):
break
return False
def update_output(self, text: str) -> None:
if not text:
return
with self._output_lock:
self._last_output = text
self._output_event.set()
def snapshot_output(self) -> str:
with self._output_lock:
return self._last_output
def wait_for_output(self, timeout: float) -> bool:
return self._output_event.wait(timeout)
def clear_output_event(self) -> None:
self._output_event.clear()
def agent_decision(last_output: str) -> Optional[str]:
"""Decide which command to send based on the most recent server output."""
return "schau"
def run_agent_loop(
client: TelnetClient,
state: SessionState,
agent_fn: AgentFunction,
stop_event: Event,
*,
idle_delay: float = 0.5,
min_send_interval: float = 1.0,
) -> None:
"""Invoke *agent_fn* whenever new output arrives and send its response."""
while not stop_event.is_set():
triggered = state.wait_for_output(timeout=idle_delay)
if stop_event.is_set():
break
if not triggered:
continue
state.clear_output_event()
last_output = state.snapshot_output()
if not last_output:
continue
try:
command = agent_fn(last_output)
except Exception as exc: # pragma: no cover - defensive logging
print(f"Agent function failed: {exc}", file=sys.stderr)
continue
if not command:
continue
sent = state.agent_send(
client,
command,
min_interval=min_send_interval,
stop_event=stop_event,
)
if not sent:
break
def load_env_file(path: str = ".env") -> None:
"""Populate ``os.environ`` with key/value pairs from a dotenv file."""
env_path = Path(path)
if not env_path.exists():
return
for line in env_path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
def require_env(key: str) -> str:
value = os.environ.get(key)
if value is None:
raise RuntimeError(f"Missing required environment variable: {key}")
return value
def login(
client: TelnetClient,
*,
user: str,
password: str,
login_prompt: str,
banner_timeout: float = 10.0,
response_timeout: float = 2.0,
state: Optional[SessionState] = None,
) -> None:
"""Handle the banner/prompt exchange and send credentials."""
if login_prompt:
banner = client.read_until(login_prompt, timeout=banner_timeout)
else:
banner = client.receive(timeout=response_timeout)
if banner:
print(banner, end="" if banner.endswith("\n") else "\n")
if state:
state.update_output(banner)
if user:
client.send(user)
time.sleep(0.2)
if password:
client.send(password)
response = client.receive(timeout=response_timeout)
if response:
print(response, end="" if response.endswith("\n") else "\n")
if state:
state.update_output(response)
def interactive_session(
client: TelnetClient,
state: SessionState,
stop_event: Event,
*,
poll_interval: float = 0.2,
receive_timeout: float = 0.2,
exit_command: str,
) -> None:
"""Keep the Telnet session running, proxying input/output until interrupted."""
if exit_command:
print(f"Connected. Press Ctrl-C to exit (will send {exit_command!r}).")
else:
print("Connected. Press Ctrl-C to exit.")
while not stop_event.is_set():
incoming = client.receive(timeout=receive_timeout)
if incoming:
print(incoming, end="" if incoming.endswith("\n") else "\n")
state.update_output(incoming)
readable, _, _ = select.select([sys.stdin], [], [], poll_interval)
if sys.stdin in readable:
line = sys.stdin.readline()
if line == "":
stop_event.set()
break
line = line.rstrip("\r\n")
if not line:
continue
state.send(client, line)
def graceful_shutdown(
client: TelnetClient,
exit_command: str,
*,
state: Optional[SessionState] = None,
) -> None:
if not exit_command:
return
try:
if state:
state.send(client, exit_command)
else:
client.send(exit_command)
farewell = client.receive(timeout=2.0)
if farewell:
print(farewell, end="" if farewell.endswith("\n") else "\n")
if state:
state.update_output(farewell)
except Exception as exc: # pragma: no cover - best effort logging
print(f"Failed to send exit command: {exc}", file=sys.stderr)
def main() -> int:
load_env_file()
host = require_env("MISTLE_HOST")
port_raw = require_env("MISTLE_PORT")
try:
port = int(port_raw)
except ValueError as exc:
raise RuntimeError("MISTLE_PORT must be an integer") from exc
user = os.environ.get("MISTLE_USER", "")
password = os.environ.get("MISTLE_PASSWORD", "")
login_prompt = os.environ.get("MISTLE_LOGIN_PROMPT", "")
exit_command = os.environ.get("MISTLE_EXIT_COMMAND", "")
agent_mode = os.environ.get("MISTLE_AGENT_MODE", "").lower() in {"1", "true", "yes", "on"}
state = SessionState()
stop_event = Event()
agent_thread: Optional[Thread] = None
with TelnetClient(host=host, port=port, timeout=10.0) as client:
login(
client,
user=user,
password=password,
login_prompt=login_prompt,
state=state,
)
if agent_mode:
agent_thread = Thread(
target=run_agent_loop,
args=(client, state, agent_decision, stop_event),
kwargs={"min_send_interval": 1.0},
daemon=True,
)
agent_thread.start()
interrupted = False
try:
interactive_session(
client,
state=state,
stop_event=stop_event,
exit_command=exit_command,
)
except KeyboardInterrupt:
print()
interrupted = True
finally:
stop_event.set()
if agent_thread:
agent_thread.join(timeout=1.0)
if interrupted:
graceful_shutdown(client, exit_command, state=state)
return 0
if __name__ == "__main__":
raise SystemExit(main())