mistle/telnetclient.py
2025-09-27 07:36:45 +02:00

115 lines
3.7 KiB
Python

"""Utility for interacting with Telnet servers.
Example:
from telnetclient import TelnetClient
with TelnetClient("mud.example.com", 23, timeout=5) as client:
client.send("look")
output = client.receive()
print(output)
"""
from __future__ import annotations
import time
from telnetlib import Telnet
from typing import Optional
class TelnetClient:
"""Small helper around :class:`telnetlib.Telnet` that hides encoding details."""
def __init__(
self,
host: str,
port: int = 23,
*,
timeout: Optional[float] = None,
encoding: str = "utf-8",
encoding_errors: str = "ignore",
) -> None:
self.host = host
self.port = port
self.timeout = timeout
self.encoding = encoding
self.encoding_errors = encoding_errors
self._telnet: Optional[Telnet] = None
def connect(self) -> None:
"""Open the Telnet connection if it is not already active."""
if self._telnet is not None:
return
try:
self._telnet = Telnet(self.host, self.port, self.timeout)
except OSError as exc: # pragma: no cover - depends on network environment
raise ConnectionError(f"Failed to connect to {self.host}:{self.port}") from exc
def close(self) -> None:
"""Close the Telnet connection if it is open."""
if self._telnet is not None:
try:
self._telnet.close()
finally:
self._telnet = None
@property
def is_connected(self) -> bool:
return self._telnet is not None
def send(self, command: str, terminator: str = "\n") -> None:
"""Send a command string to the server."""
client = self._ensure_connection()
payload = f"{command}{terminator}".encode(self.encoding)
client.write(payload)
def receive(self, timeout: Optional[float] = 1.0) -> str:
"""Read any data that arrives within *timeout* seconds."""
client = self._ensure_connection()
deadline = time.time() + timeout if timeout is not None else None
chunks: list[bytes] = []
while True:
try:
chunk = client.read_very_eager()
except EOFError:
break
if chunk:
chunks.append(chunk)
continue
if deadline is None:
break
if time.time() >= deadline:
break
time.sleep(0.05)
return b"".join(chunks).decode(self.encoding, self.encoding_errors)
def read_until(self, marker: str, timeout: Optional[float] = None) -> str:
"""Read until *marker* bytes are seen or *timeout* seconds elapse."""
client = self._ensure_connection()
raw = client.read_until(marker.encode(self.encoding), timeout)
return raw.decode(self.encoding, self.encoding_errors)
def send_and_expect(
self,
command: str,
marker: Optional[str] = None,
*,
read_timeout: Optional[float] = None,
terminator: str = "\n",
) -> str:
"""Convenience helper that writes a command and returns the response."""
self.send(command, terminator=terminator)
if marker is None:
return self.receive(timeout=read_timeout)
return self.read_until(marker, timeout=read_timeout)
def __enter__(self) -> "TelnetClient":
self.connect()
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.close()
def _ensure_connection(self) -> Telnet:
if self._telnet is None:
raise RuntimeError("Telnet connection is not established. Call connect() first.")
return self._telnet