145 lines
4 KiB
Python
145 lines
4 KiB
Python
import os
|
|
import select
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from telnetclient import TelnetClient
|
|
|
|
|
|
def load_env_file(path: str = ".env") -> None:
|
|
"""Populate ``os.environ`` with key/value pairs from a dotenv file."""
|
|
env_path = Path(path)
|
|
if not env_path.exists():
|
|
return
|
|
for line in env_path.read_text().splitlines():
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith("#"):
|
|
continue
|
|
if "=" not in stripped:
|
|
continue
|
|
key, value = stripped.split("=", 1)
|
|
key = key.strip()
|
|
value = value.strip().strip('"').strip("'")
|
|
if key and key not in os.environ:
|
|
os.environ[key] = value
|
|
|
|
|
|
def require_env(key: str) -> str:
|
|
value = os.environ.get(key)
|
|
if value is None:
|
|
raise RuntimeError(f"Missing required environment variable: {key}")
|
|
return value
|
|
|
|
|
|
def login(
|
|
client: TelnetClient,
|
|
*,
|
|
user: str,
|
|
password: str,
|
|
login_prompt: str,
|
|
banner_timeout: float = 10.0,
|
|
response_timeout: float = 2.0,
|
|
) -> None:
|
|
"""Handle the banner/prompt exchange and send credentials."""
|
|
if login_prompt:
|
|
banner = client.read_until(login_prompt, timeout=banner_timeout)
|
|
else:
|
|
banner = client.receive(timeout=response_timeout)
|
|
if banner:
|
|
print(banner, end="" if banner.endswith("\n") else "\n")
|
|
|
|
if user:
|
|
client.send(user)
|
|
time.sleep(0.2)
|
|
if password:
|
|
client.send(password)
|
|
|
|
response = client.receive(timeout=response_timeout)
|
|
if response:
|
|
print(response, end="" if response.endswith("\n") else "\n")
|
|
|
|
|
|
def interactive_session(
|
|
client: TelnetClient,
|
|
*,
|
|
exit_command: str,
|
|
poll_interval: float = 0.2,
|
|
receive_timeout: float = 0.2,
|
|
) -> None:
|
|
"""Keep the Telnet session running, proxying input/output until interrupted."""
|
|
if exit_command:
|
|
print(f"Connected. Press Ctrl-C to exit (will send {exit_command!r}).")
|
|
else:
|
|
print("Connected. Press Ctrl-C to exit.")
|
|
|
|
while True:
|
|
incoming = client.receive(timeout=receive_timeout)
|
|
if incoming:
|
|
print(incoming, end="" if incoming.endswith("\n") else "\n")
|
|
|
|
readable, _, _ = select.select([sys.stdin], [], [], poll_interval)
|
|
if sys.stdin in readable:
|
|
line = sys.stdin.readline()
|
|
if line == "":
|
|
break
|
|
line = line.rstrip("\r\n")
|
|
if not line:
|
|
continue
|
|
client.send(line)
|
|
|
|
|
|
def graceful_shutdown(client: TelnetClient, exit_command: str) -> None:
|
|
if not exit_command:
|
|
return
|
|
try:
|
|
client.send(exit_command)
|
|
farewell = client.receive(timeout=2.0)
|
|
if farewell:
|
|
print(farewell, end="" if farewell.endswith("\n") else "\n")
|
|
except Exception as exc: # pragma: no cover - best effort logging
|
|
print(f"Failed to send exit command: {exc}", file=sys.stderr)
|
|
|
|
|
|
def main() -> int:
|
|
load_env_file()
|
|
|
|
host = require_env("MISTLE_HOST")
|
|
|
|
port_raw = require_env("MISTLE_PORT")
|
|
try:
|
|
port = int(port_raw)
|
|
except ValueError as exc:
|
|
raise RuntimeError("MISTLE_PORT must be an integer") from exc
|
|
|
|
user = os.environ.get("MISTLE_USER", "")
|
|
password = os.environ.get("MISTLE_PASSWORD", "")
|
|
login_prompt = os.environ.get("MISTLE_LOGIN_PROMPT", "")
|
|
exit_command = os.environ.get("MISTLE_EXIT_COMMAND", "")
|
|
|
|
with TelnetClient(host=host, port=port, timeout=10.0) as client:
|
|
login(
|
|
client,
|
|
user=user,
|
|
password=password,
|
|
login_prompt=login_prompt,
|
|
)
|
|
|
|
interrupted = False
|
|
try:
|
|
interactive_session(
|
|
client,
|
|
exit_command=exit_command,
|
|
)
|
|
except KeyboardInterrupt:
|
|
print()
|
|
interrupted = True
|
|
|
|
if interrupted:
|
|
graceful_shutdown(client, exit_command)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|