refactor: moved agent decision making into its own class
This commit is contained in:
parent
538801f084
commit
0c88bd5a2a
2 changed files with 42 additions and 14 deletions
32
agent.py
Normal file
32
agent.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class Agent(ABC):
|
||||
"""Interface for autonomous Telnet actors."""
|
||||
|
||||
@abstractmethod
|
||||
def observe(self, output: str) -> None:
|
||||
"""Receive the latest text emitted by the server."""
|
||||
|
||||
@abstractmethod
|
||||
def decide(self) -> Optional[str]:
|
||||
"""Return the next command to send, or ``None`` to stay idle."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class SimpleAgent(Agent):
|
||||
"""Very small stateful agent that decides which command to run next."""
|
||||
|
||||
default_command: str = "schau"
|
||||
last_output: str = field(default="", init=False)
|
||||
|
||||
def observe(self, output: str) -> None:
|
||||
if output:
|
||||
self.last_output = output
|
||||
|
||||
def decide(self) -> Optional[str]:
|
||||
return self.default_command
|
||||
24
app.py
24
app.py
|
|
@ -4,14 +4,12 @@ import sys
|
|||
import time
|
||||
from pathlib import Path
|
||||
from threading import Event, Lock, Thread
|
||||
from typing import Callable, Optional
|
||||
from typing import Optional
|
||||
|
||||
from agent import Agent, SimpleAgent
|
||||
from telnetclient import TelnetClient
|
||||
|
||||
|
||||
AgentFunction = Callable[[str], Optional[str]]
|
||||
|
||||
|
||||
class SessionState:
|
||||
"""Share Telnet session state safely across threads."""
|
||||
|
||||
|
|
@ -68,21 +66,16 @@ class SessionState:
|
|||
self._output_event.clear()
|
||||
|
||||
|
||||
def agent_decision(last_output: str) -> Optional[str]:
|
||||
"""Decide which command to send based on the most recent server output."""
|
||||
return "schau"
|
||||
|
||||
|
||||
def run_agent_loop(
|
||||
client: TelnetClient,
|
||||
state: SessionState,
|
||||
agent_fn: AgentFunction,
|
||||
agent: Agent,
|
||||
stop_event: Event,
|
||||
*,
|
||||
idle_delay: float = 0.5,
|
||||
min_send_interval: float = 1.0,
|
||||
) -> None:
|
||||
"""Invoke *agent_fn* whenever new output arrives and send its response."""
|
||||
"""Invoke *agent* whenever new output arrives and send its response."""
|
||||
while not stop_event.is_set():
|
||||
triggered = state.wait_for_output(timeout=idle_delay)
|
||||
if stop_event.is_set():
|
||||
|
|
@ -94,9 +87,10 @@ def run_agent_loop(
|
|||
if not last_output:
|
||||
continue
|
||||
try:
|
||||
command = agent_fn(last_output)
|
||||
agent.observe(last_output)
|
||||
command = agent.decide()
|
||||
except Exception as exc: # pragma: no cover - defensive logging
|
||||
print(f"Agent function failed: {exc}", file=sys.stderr)
|
||||
print(f"Agent failed: {exc}", file=sys.stderr)
|
||||
continue
|
||||
if not command:
|
||||
continue
|
||||
|
|
@ -243,6 +237,7 @@ def main() -> int:
|
|||
state = SessionState()
|
||||
stop_event = Event()
|
||||
agent_thread: Optional[Thread] = None
|
||||
agent: Optional[Agent] = None
|
||||
|
||||
with TelnetClient(host=host, port=port, timeout=10.0) as client:
|
||||
login(
|
||||
|
|
@ -254,9 +249,10 @@ def main() -> int:
|
|||
)
|
||||
|
||||
if agent_mode:
|
||||
agent = SimpleAgent()
|
||||
agent_thread = Thread(
|
||||
target=run_agent_loop,
|
||||
args=(client, state, agent_decision, stop_event),
|
||||
args=(client, state, agent, stop_event),
|
||||
kwargs={"min_send_interval": 1.0},
|
||||
daemon=True,
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue