"""Utility for interacting with Telnet servers. Example: from telnetclient import TelnetClient with TelnetClient("mud.example.com", 23, timeout=5) as client: client.send("look") output = client.receive() print(output) """ from __future__ import annotations import time from telnetlib import Telnet from typing import Optional class TelnetClient: """Small helper around :class:`telnetlib.Telnet` that hides encoding details.""" def __init__( self, host: str, port: int = 23, *, timeout: Optional[float] = None, encoding: str = "utf-8", encoding_errors: str = "ignore", ) -> None: self.host = host self.port = port self.timeout = timeout self.encoding = encoding self.encoding_errors = encoding_errors self._telnet: Optional[Telnet] = None def connect(self) -> None: """Open the Telnet connection if it is not already active.""" if self._telnet is not None: return try: self._telnet = Telnet(self.host, self.port, self.timeout) except OSError as exc: # pragma: no cover - depends on network environment raise ConnectionError(f"Failed to connect to {self.host}:{self.port}") from exc def close(self) -> None: """Close the Telnet connection if it is open.""" if self._telnet is not None: try: self._telnet.close() finally: self._telnet = None @property def is_connected(self) -> bool: return self._telnet is not None def send(self, command: str, terminator: str = "\n") -> None: """Send a command string to the server.""" client = self._ensure_connection() payload = f"{command}{terminator}".encode(self.encoding) client.write(payload) def receive(self, timeout: Optional[float] = 1.0) -> str: """Read any data that arrives within *timeout* seconds.""" client = self._ensure_connection() deadline = time.time() + timeout if timeout is not None else None chunks: list[bytes] = [] while True: try: chunk = client.read_very_eager() except EOFError: break if chunk: chunks.append(chunk) continue if deadline is None: break if time.time() >= deadline: break time.sleep(0.05) return b"".join(chunks).decode(self.encoding, self.encoding_errors) def read_until(self, marker: str, timeout: Optional[float] = None) -> str: """Read until *marker* bytes are seen or *timeout* seconds elapse.""" client = self._ensure_connection() raw = client.read_until(marker.encode(self.encoding), timeout) return raw.decode(self.encoding, self.encoding_errors) def send_and_expect( self, command: str, marker: Optional[str] = None, *, read_timeout: Optional[float] = None, terminator: str = "\n", ) -> str: """Convenience helper that writes a command and returns the response.""" self.send(command, terminator=terminator) if marker is None: return self.receive(timeout=read_timeout) return self.read_until(marker, timeout=read_timeout) def __enter__(self) -> "TelnetClient": self.connect() return self def __exit__(self, exc_type, exc, tb) -> None: self.close() def _ensure_connection(self) -> Telnet: if self._telnet is None: raise RuntimeError("Telnet connection is not established. Call connect() first.") return self._telnet