115 lines
3.7 KiB
Python
115 lines
3.7 KiB
Python
"""Utility for interacting with Telnet servers.
|
|
|
|
Example:
|
|
from telnetclient import TelnetClient
|
|
|
|
with TelnetClient("mud.example.com", 23, timeout=5) as client:
|
|
client.send("look")
|
|
output = client.receive()
|
|
print(output)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from telnetlib import Telnet
|
|
from typing import Optional
|
|
|
|
|
|
class TelnetClient:
|
|
"""Small helper around :class:`telnetlib.Telnet` that hides encoding details."""
|
|
|
|
def __init__(
|
|
self,
|
|
host: str,
|
|
port: int = 23,
|
|
*,
|
|
timeout: Optional[float] = None,
|
|
encoding: str = "utf-8",
|
|
encoding_errors: str = "ignore",
|
|
) -> None:
|
|
self.host = host
|
|
self.port = port
|
|
self.timeout = timeout
|
|
self.encoding = encoding
|
|
self.encoding_errors = encoding_errors
|
|
self._telnet: Optional[Telnet] = None
|
|
|
|
def connect(self) -> None:
|
|
"""Open the Telnet connection if it is not already active."""
|
|
if self._telnet is not None:
|
|
return
|
|
try:
|
|
self._telnet = Telnet(self.host, self.port, self.timeout)
|
|
except OSError as exc: # pragma: no cover - depends on network environment
|
|
raise ConnectionError(f"Failed to connect to {self.host}:{self.port}") from exc
|
|
|
|
def close(self) -> None:
|
|
"""Close the Telnet connection if it is open."""
|
|
if self._telnet is not None:
|
|
try:
|
|
self._telnet.close()
|
|
finally:
|
|
self._telnet = None
|
|
|
|
@property
|
|
def is_connected(self) -> bool:
|
|
return self._telnet is not None
|
|
|
|
def send(self, command: str, terminator: str = "\n") -> None:
|
|
"""Send a command string to the server."""
|
|
client = self._ensure_connection()
|
|
payload = f"{command}{terminator}".encode(self.encoding)
|
|
client.write(payload)
|
|
|
|
def receive(self, timeout: Optional[float] = 1.0) -> str:
|
|
"""Read any data that arrives within *timeout* seconds."""
|
|
client = self._ensure_connection()
|
|
deadline = time.time() + timeout if timeout is not None else None
|
|
chunks: list[bytes] = []
|
|
while True:
|
|
try:
|
|
chunk = client.read_very_eager()
|
|
except EOFError:
|
|
break
|
|
if chunk:
|
|
chunks.append(chunk)
|
|
continue
|
|
if deadline is None:
|
|
break
|
|
if time.time() >= deadline:
|
|
break
|
|
time.sleep(0.05)
|
|
return b"".join(chunks).decode(self.encoding, self.encoding_errors)
|
|
|
|
def read_until(self, marker: str, timeout: Optional[float] = None) -> str:
|
|
"""Read until *marker* bytes are seen or *timeout* seconds elapse."""
|
|
client = self._ensure_connection()
|
|
raw = client.read_until(marker.encode(self.encoding), timeout)
|
|
return raw.decode(self.encoding, self.encoding_errors)
|
|
|
|
def send_and_expect(
|
|
self,
|
|
command: str,
|
|
marker: Optional[str] = None,
|
|
*,
|
|
read_timeout: Optional[float] = None,
|
|
terminator: str = "\n",
|
|
) -> str:
|
|
"""Convenience helper that writes a command and returns the response."""
|
|
self.send(command, terminator=terminator)
|
|
if marker is None:
|
|
return self.receive(timeout=read_timeout)
|
|
return self.read_until(marker, timeout=read_timeout)
|
|
|
|
def __enter__(self) -> "TelnetClient":
|
|
self.connect()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb) -> None:
|
|
self.close()
|
|
|
|
def _ensure_connection(self) -> Telnet:
|
|
if self._telnet is None:
|
|
raise RuntimeError("Telnet connection is not established. Call connect() first.")
|
|
return self._telnet
|